From 8744bf52d39ecfa877193e37dc0588648cb6b2e8 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Tue, 26 Apr 2022 13:02:10 -0700 Subject: [PATCH] split PacketContext and StreamContext As listed in #47. This improves throughput in the client/h264 benchmark by about 20%, I think because we copy fewer bytes around. CodecItem went from 256 bytes to 176. --- CHANGELOG.md | 3 + benches/depacketize.rs | 7 +- fuzz/fuzz_targets/depacketize_h264.rs | 3 +- fuzz/fuzz_targets/roundtrip_h264.rs | 5 +- src/client/mod.rs | 301 ++++++++++++++------------ src/client/parse.rs | 97 +++++---- src/client/rtp.rs | 37 +++- src/codec/aac.rs | 135 +++++++++--- src/codec/mod.rs | 9 +- src/error.rs | 33 +-- src/lib.rs | 128 +++++++++-- 11 files changed, 501 insertions(+), 257 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 262b748..776e57c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ `#[doc(hidden)]`. * BREAKING: `retina::client::Session::setup` takes a new `SetupOptions` argument for future expansion. +* BREAKING: `retina::StreamContext` has been split out of `retina::PacketContext`. + Both must be printed to provide the same information as before. This change + reduces how much data needs to be copied with each packet. ## `v0.3.9` (2022-04-12) diff --git a/benches/depacketize.rs b/benches/depacketize.rs index 7de1a59..9c9fabe 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -32,6 +32,7 @@ fn h264_aac ()>(mut f: F) { Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(), ]; let conn_ctx = retina::ConnectionContext::dummy(); + let stream_ctx = retina::StreamContextRef::dummy(); let pkt_ctx = retina::PacketContext::dummy(); while !remaining.is_empty() { assert!(remaining.len() > 4); @@ -49,6 +50,7 @@ fn h264_aac ()>(mut f: F) { }; let pkt = match rtps[stream_id].rtp( &retina::client::SessionOptions::default(), + stream_ctx, None, &conn_ctx, &pkt_ctx, @@ -60,7 +62,10 @@ fn h264_aac ()>(mut f: F) { _ => unreachable!(), }; depacketizers[stream_id].push(pkt).unwrap(); - while let Some(pkt) = depacketizers[stream_id].pull(&conn_ctx).unwrap() { + while let Some(pkt) = depacketizers[stream_id] + .pull(&conn_ctx, stream_ctx) + .unwrap() + { f(pkt); } } diff --git a/fuzz/fuzz_targets/depacketize_h264.rs b/fuzz/fuzz_targets/depacketize_h264.rs index d124d73..52f7793 100644 --- a/fuzz/fuzz_targets/depacketize_h264.rs +++ b/fuzz/fuzz_targets/depacketize_h264.rs @@ -13,6 +13,7 @@ fuzz_target!(|data: &[u8]| { let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap(); let mut sequence_number: u16 = 0; let conn_ctx = retina::ConnectionContext::dummy(); + let stream_ctx = retina::StreamContextRef::dummy(); let pkt_ctx = retina::PacketContext::dummy(); while data.has_remaining() { let hdr = data.get_u8(); @@ -43,7 +44,7 @@ fuzz_target!(|data: &[u8]| { if depacketizer.push(pkt).is_err() { return; } - while let Some(item) = depacketizer.pull(&conn_ctx).transpose() { + while let Some(item) = depacketizer.pull(&conn_ctx, stream_ctx).transpose() { if item.is_err() { return; } diff --git a/fuzz/fuzz_targets/roundtrip_h264.rs b/fuzz/fuzz_targets/roundtrip_h264.rs index 7f739a8..04b984c 100644 --- a/fuzz/fuzz_targets/roundtrip_h264.rs +++ b/fuzz/fuzz_targets/roundtrip_h264.rs @@ -16,6 +16,7 @@ fuzz_target!(|data: &[u8]| { return; } let conn_ctx = retina::ConnectionContext::dummy(); + let stream_ctx = retina::StreamContextRef::dummy(); let max_payload_size = u16::from_be_bytes([data[0], data[1]]); let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0) { Ok(p) => p, @@ -42,7 +43,7 @@ fuzz_target!(|data: &[u8]| { if d.push(pkt).is_err() { return; } - match d.pull(&conn_ctx) { + match d.pull(&conn_ctx, stream_ctx) { Err(_) => return, Ok(Some(retina::codec::CodecItem::VideoFrame(f))) => { assert!(mark); @@ -68,6 +69,6 @@ fuzz_target!(|data: &[u8]| { } }; assert_eq!(&data[2..], &frame.data()[..]); - assert!(matches!(d.pull(&conn_ctx), Ok(None))); + assert!(matches!(d.pull(&conn_ctx, stream_ctx), Ok(None))); assert!(matches!(p.pull(), Ok(None))); }); diff --git a/src/client/mod.rs b/src/client/mod.rs index f33c50e..fb022fe 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -24,7 +24,10 @@ use url::Url; use crate::client::parse::SessionHeader; use crate::codec::CodecItem; -use crate::{Error, ErrorInt, RtspMessageContext}; +use crate::{ + Error, ErrorInt, RtspMessageContext, StreamContextRef, StreamContextRefInner, TcpStreamContext, + UdpStreamContext, +}; mod channel_mapping; mod parse; @@ -703,9 +706,6 @@ pub struct Stream { /// says the server is allowed to omit it when there is only a single stream. pub control: Option, - /// The sockets for `Transport::Udp`. - sockets: Option, - state: StreamState, } @@ -720,36 +720,38 @@ impl std::fmt::Debug for Stream { .field("channels", &self.channels) .field("framerate", &self.framerate) .field("depacketizer", &self.depacketizer) - .field("UDP", &self.sockets) .field("state", &self.state) .finish() } } -struct UdpSockets { - local_ip: IpAddr, - local_rtp_port: u16, - remote_ip: IpAddr, - remote_rtp_port: u16, - rtp_socket: UdpSocket, - remote_rtcp_port: u16, - rtcp_socket: UdpSocket, +enum StreamTransport { + Udp(UdpStreamTransport), // the ...Transport wrapper struct keeps ...Context and the sockets. + Tcp(TcpStreamContext), // no sockets; ...Context suffices. } -impl std::fmt::Debug for UdpSockets { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - f.debug_struct("UDP") - .field("local_ip", &self.local_ip) - .field("local_rtp_port", &self.local_rtp_port) - .field("remote_ip", &self.remote_ip) - .field("remote_rtp_port", &self.remote_rtp_port) - .field("rtp_socket", &self.rtp_socket) - .field("remote_rtcp_port", &self.remote_rtcp_port) - .field("rtcp_socket", &self.rtcp_socket) - .finish() +impl std::fmt::Debug for StreamTransport { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(&self.ctx(), f) } } +impl StreamTransport { + fn ctx(&self) -> StreamContextRef { + StreamContextRef(match self { + StreamTransport::Tcp(tcp) => StreamContextRefInner::Tcp(*tcp), + StreamTransport::Udp(udp) => StreamContextRefInner::Udp(&udp.ctx), + }) + } +} + +#[derive(Debug)] +struct UdpStreamTransport { + ctx: UdpStreamContext, + rtp_socket: UdpSocket, + rtcp_socket: UdpSocket, +} + impl Stream { /// Returns codec-specified parameters for this stream, if available. /// @@ -785,6 +787,15 @@ impl Stream { pub fn parameters(&self) -> Option { self.depacketizer.as_ref().ok().and_then(|d| d.parameters()) } + + /// Returns a context for this stream, if it has been set up. + pub fn ctx(&self) -> Option { + match &self.state { + StreamState::Uninit => None, + StreamState::Init(init) => Some(init.transport.ctx()), + StreamState::Playing { transport, .. } => Some(transport.ctx()), + } + } } #[derive(Debug)] @@ -799,10 +810,11 @@ enum StreamState { Playing { timeline: Timeline, rtp_handler: rtp::InorderParser, + transport: StreamTransport, }, } -#[derive(Copy, Clone, Debug, Default)] +#[derive(Debug)] struct StreamStateInit { /// The RTP synchronization source (SSRC), as defined in /// [RFC 3550](https://tools.ietf.org/html/rfc3550). This is normally @@ -819,6 +831,8 @@ struct StreamStateInit { /// `RTP-Info` header. This field is only used during the `play()` call /// itself; by the time it returns, the stream will be in state `Playing`. initial_rtptime: Option, + + transport: StreamTransport, } /// Username and password authentication credentials. @@ -1324,7 +1338,7 @@ impl Session { .clone(); let mut req = rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0).request_uri(url); - match options.transport { + let udp = match options.transport { Transport::Tcp => { let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| { wrap!(ErrorInt::FailedPrecondition( @@ -1339,22 +1353,14 @@ impl Session { proposed_channel_id + 1 ), ); + None } Transport::Udp => { // Bind an ephemeral UDP port on the same local address used to connect // to the RTSP server. - let ip_addr = conn.inner.ctx().local_addr.ip(); - let pair = crate::tokio::UdpPair::for_ip(ip_addr) + let local_ip = conn.inner.ctx().local_addr.ip(); + let pair = crate::tokio::UdpPair::for_ip(local_ip) .map_err(|e| wrap!(ErrorInt::Internal(e.into())))?; - stream.sockets = Some(UdpSockets { - local_ip: ip_addr, - local_rtp_port: pair.rtp_port, - remote_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), - remote_rtp_port: 0, - rtp_socket: pair.rtp_socket, - remote_rtcp_port: 0, - rtcp_socket: pair.rtcp_socket, - }); req = req.header( rtsp_types::headers::TRANSPORT, format!( @@ -1363,8 +1369,18 @@ impl Session { pair.rtp_port + 1, ), ); + Some(UdpStreamTransport { + ctx: UdpStreamContext { + local_ip, + peer_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), + local_rtp_port: pair.rtp_port, + peer_rtp_port: 0, + }, + rtp_socket: pair.rtp_socket, + rtcp_socket: pair.rtcp_socket, + }) } - } + }; if let Some(ref s) = inner.session { req = req.header(rtsp_types::headers::SESSION, s.id.to_string()); } @@ -1408,8 +1424,8 @@ impl Session { None => *inner.session = Some(response.session), }; let conn_ctx = conn.inner.ctx(); - match options.transport { - Transport::Tcp => { + let transport = match udp { + None => { let channel_id = match response.channel_id { Some(id) => id, None => bail!(ErrorInt::RtspResponseError { @@ -1433,8 +1449,11 @@ impl Session { description, }) })?; + StreamTransport::Tcp(TcpStreamContext { + rtp_channel_id: channel_id, + }) } - Transport::Udp => { + Some(mut udp) => { // TODO: RFC 2326 section 12.39 says "If the source address for // the stream is different than can be derived from the RTSP // endpoint address (the server in playback or the client in @@ -1455,29 +1474,27 @@ impl Session { description: "Transport header is missing server_port parameter".to_owned(), }) })?; - let udp_sockets = stream.sockets.as_mut().unwrap(); - udp_sockets.remote_ip = source; - udp_sockets.remote_rtp_port = server_port.0; - udp_sockets.remote_rtcp_port = server_port.1; - udp_sockets - .rtp_socket - .connect(SocketAddr::new(source, udp_sockets.remote_rtp_port)) + udp.ctx.peer_ip = source; + udp.ctx.peer_rtp_port = server_port; + udp.rtp_socket + .connect(SocketAddr::new(source, server_port)) .await .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; - udp_sockets - .rtcp_socket - .connect(SocketAddr::new(source, udp_sockets.remote_rtcp_port)) + udp.rtcp_socket + .connect(SocketAddr::new(source, server_port + 1)) .await .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; - punch_firewall_hole(&udp_sockets.rtp_socket, &udp_sockets.rtcp_socket) + punch_firewall_hole(&udp.rtp_socket, &udp.rtcp_socket) .await .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; + StreamTransport::Udp(udp) } - } + }; stream.state = StreamState::Init(StreamStateInit { ssrc: response.ssrc, initial_seq: None, initial_rtptime: None, + transport, }); Ok(()) } @@ -1552,11 +1569,12 @@ impl Session { // Move all streams that have been set up from Init to Playing state. Check that required // parameters are present while doing so. for (i, s) in inner.presentation.streams.iter_mut().enumerate() { - match s.state { + match std::mem::replace(&mut s.state, StreamState::Uninit) { StreamState::Init(StreamStateInit { initial_rtptime, initial_seq, ssrc, + transport, .. }) => { let initial_rtptime = match policy.initial_timestamp { @@ -1612,6 +1630,7 @@ impl Session { }) })?, rtp_handler: rtp::InorderParser::new(ssrc, initial_seq), + transport, }; } StreamState::Uninit => {} @@ -1893,10 +1912,7 @@ impl Session { .as_mut() .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?; let channel_id = data.channel_id(); - let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp { - msg_ctx: *msg_ctx, - channel_id, - }); + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp { msg_ctx: *msg_ctx }); let m = match conn.channels.lookup(channel_id) { Some(m) => m, None => { @@ -1910,11 +1926,12 @@ impl Session { } }; let stream = &mut inner.presentation.streams[m.stream_i]; - let (timeline, rtp_handler) = match &mut stream.state { + let (timeline, rtp_handler, stream_ctx) = match &mut stream.state { StreamState::Playing { timeline, rtp_handler, - } => (timeline, rtp_handler), + transport, + } => (timeline, rtp_handler, transport.ctx()), _ => unreachable!( "Session's {}->{:?} not in Playing state", channel_id, m @@ -1923,6 +1940,7 @@ impl Session { match m.channel_type { ChannelType::Rtp => Ok(rtp_handler.rtp( inner.options, + stream_ctx, inner.presentation.tool.as_ref(), conn.inner.ctx(), &pkt_ctx, @@ -1933,6 +1951,7 @@ impl Session { ChannelType::Rtcp => { match rtp_handler.rtcp( inner.options, + stream_ctx, inner.presentation.tool.as_ref(), &pkt_ctx, timeline, @@ -1942,6 +1961,7 @@ impl Session { Ok(p) => Ok(p), Err(description) => Err(wrap!(ErrorInt::PacketError { conn_ctx: *conn.inner.ctx(), + stream_ctx: stream.ctx().unwrap().to_owned(), pkt_ctx, stream_id: m.stream_i, description, @@ -1964,91 +1984,96 @@ impl Session { ) -> Poll>> { debug_assert!(buf.filled().is_empty()); let inner = self.0.as_mut().project(); + let s = &mut inner.presentation.streams[i]; + let (timeline, rtp_handler, udp) = match &mut s.state { + StreamState::Playing { + timeline, + rtp_handler, + transport: StreamTransport::Udp(udp), + } => (timeline, rtp_handler, udp), + _ => return Poll::Pending, + }; let conn_ctx = inner .conn .as_ref() .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))? .inner .ctx(); - let s = &mut inner.presentation.streams[i]; - if let Some(sockets) = &mut s.sockets { - let (timeline, rtp_handler) = match &mut s.state { - StreamState::Playing { - timeline, - rtp_handler, - } => (timeline, rtp_handler), - _ => unreachable!("Session's {}->{:?} not in Playing state", i, s), - }; - // Prioritize RTCP over RTP within a stream. - while let Poll::Ready(r) = sockets.rtcp_socket.poll_recv(cx, buf) { - let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { - local_addr: SocketAddr::new(sockets.local_ip, sockets.local_rtp_port + 1), - peer_addr: SocketAddr::new(sockets.remote_ip, sockets.remote_rtcp_port), - received_wall: crate::WallTime::now(), - }); - match r { - Ok(()) => { - let msg = Bytes::copy_from_slice(buf.filled()); - match rtp_handler.rtcp( - inner.options, - inner.presentation.tool.as_ref(), - &pkt_ctx, - timeline, - i, - msg, - ) { - Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), - Ok(None) => buf.clear(), - Err(description) => { - return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError { - conn_ctx: *conn_ctx, - pkt_ctx, - stream_id: i, - description, - })))) - } + let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp.ctx)); + + // Prioritize RTCP over RTP within a stream. + while let Poll::Ready(r) = udp.rtcp_socket.poll_recv(cx, buf) { + let when = crate::WallTime::now(); + match r { + Ok(()) => { + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { + received_wall: when, + }); + let msg = Bytes::copy_from_slice(buf.filled()); + match rtp_handler.rtcp( + inner.options, + stream_ctx, + inner.presentation.tool.as_ref(), + &pkt_ctx, + timeline, + i, + msg, + ) { + Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), + Ok(None) => buf.clear(), + Err(description) => { + return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError { + conn_ctx: *conn_ctx, + stream_ctx: stream_ctx.to_owned(), + pkt_ctx, + stream_id: i, + description, + })))) } } - Err(source) => { - return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { - conn_ctx: *conn_ctx, - pkt_ctx, - source, - })))) - } + } + Err(source) => { + return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { + conn_ctx: *conn_ctx, + stream_ctx: stream_ctx.to_owned(), + when, + source, + })))) } } - while let Poll::Ready(r) = sockets.rtp_socket.poll_recv(cx, buf) { - let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { - local_addr: SocketAddr::new(sockets.local_ip, sockets.local_rtp_port), - peer_addr: SocketAddr::new(sockets.remote_ip, sockets.remote_rtp_port), - received_wall: crate::WallTime::now(), - }); - match r { - Ok(()) => { - let msg = Bytes::copy_from_slice(buf.filled()); - match rtp_handler.rtp( - inner.options, - inner.presentation.tool.as_ref(), - conn_ctx, - &pkt_ctx, - timeline, - i, - msg, - ) { - Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), - Ok(None) => buf.clear(), - Err(e) => return Poll::Ready(Some(Err(e))), - } - } - Err(source) => { - return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { - conn_ctx: *conn_ctx, - pkt_ctx, - source, - })))) + } + while let Poll::Ready(r) = udp.rtp_socket.poll_recv(cx, buf) { + let when = crate::WallTime::now(); + let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp.ctx)); + match r { + Ok(()) => { + let msg = Bytes::copy_from_slice(buf.filled()); + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { + received_wall: when, + }); + match rtp_handler.rtp( + inner.options, + stream_ctx, + inner.presentation.tool.as_ref(), + conn_ctx, + &pkt_ctx, + timeline, + i, + msg, + ) { + Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), + Ok(None) => buf.clear(), + Err(e) => return Poll::Ready(Some(Err(e))), } } + Err(source) => { + return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { + conn_ctx: *conn_ctx, + stream_ctx: stream_ctx.to_owned(), + when, + source, + })))) + } } } Poll::Pending @@ -2273,7 +2298,12 @@ impl futures::Stream for Demuxed { DemuxedState::Fused => return Poll::Ready(None), }; let inner = self.session.0.as_mut().project(); - let depacketizer = match &mut inner.presentation.streams[stream_id].depacketizer { + let stream = &mut inner.presentation.streams[stream_id]; + let stream_ctx = match stream.state { + StreamState::Playing { ref transport, .. } => transport.ctx(), + _ => unreachable!(), + }; + let depacketizer = match &mut stream.depacketizer { Ok(d) => d, Err(_) => unreachable!("depacketizer was Ok"), }; @@ -2291,6 +2321,7 @@ impl futures::Stream for Demuxed { depacketizer.push(p).map_err(|description| { wrap!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, + stream_ctx: stream_ctx.to_owned(), pkt_ctx, stream_id, ssrc, @@ -2299,7 +2330,7 @@ impl futures::Stream for Demuxed { }) })?; } - match depacketizer.pull(conn_ctx) { + match depacketizer.pull(conn_ctx, stream_ctx) { Ok(Some(item)) => { self.state = DemuxedState::Pulling(stream_id); return Poll::Ready(Some(Ok(item))); diff --git a/src/client/parse.rs b/src/client/parse.rs index 35f11b9..d0ad999 100644 --- a/src/client/parse.rs +++ b/src/client/parse.rs @@ -375,7 +375,6 @@ fn parse_media(base_url: &Url, media_description: &Media) -> Result, pub(crate) channel_id: Option, pub(crate) source: Option, - pub(crate) server_port: Option<(u16, u16)>, + pub(crate) server_port: Option, +} + +fn parse_server_port(server_port: &str) -> Result { + if let Some((a, b)) = server_port.split_once('-') { + let a = u16::from_str_radix(a, 10).map_err(|_| ())?; + let b = u16::from_str_radix(b, 10).map_err(|_| ())?; + if a.checked_add(1) != Some(b) { + // It's unclear what a non-consecutive range means. + return Err(()); + } + return Ok(a); + } + + // Note returning a range is allowed by RFC 2326's grammar, but I'm not sure + // what it means. RTSP 2.0 allows "RTCP-mux" for using a single port for + // both RTP and RTCP, but it's only by client request, and RTSP 1.0 doesn't + // reference this. + Err(()) } /// Parses a `SETUP` response. @@ -546,19 +563,12 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result) -> StreamState { + StreamState::Init(StreamStateInit { + ssrc, + initial_seq: None, + initial_rtptime: None, + transport: StreamTransport::Tcp(TcpStreamContext { rtp_channel_id: 0 }), + }) + } + #[test] fn anvpiz_sdp() { let url = Url::parse("rtsp://127.0.0.1/").unwrap(); @@ -771,11 +792,7 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(0)); assert_eq!(setup_response.ssrc, Some(0x30a98ee7)); - p.streams[0].state = StreamState::Init(StreamStateInit { - ssrc: setup_response.ssrc, - initial_seq: None, - initial_rtptime: None, - }); + p.streams[0].state = dummy_stream_state_init(Some(0x30a98ee7)); // PLAY. super::parse_play(&response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap(); @@ -873,11 +890,7 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(0)); assert_eq!(setup_response.ssrc, Some(0x4cacc3d1)); - p.streams[0].state = StreamState::Init(StreamStateInit { - ssrc: setup_response.ssrc, - initial_seq: None, - initial_rtptime: None, - }); + p.streams[0].state = dummy_stream_state_init(Some(0x4cacc3d1)); // PLAY. super::parse_play( @@ -885,7 +898,7 @@ mod tests { &mut p, ) .unwrap(); - match p.streams[0].state { + match &p.streams[0].state { StreamState::Init(state) => { assert_eq!(state.initial_seq, Some(24104)); assert_eq!(state.initial_rtptime, Some(1270711678)); @@ -957,8 +970,8 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(0)); assert_eq!(setup_response.ssrc, None); - p.streams[0].state = StreamState::Init(StreamStateInit::default()); - p.streams[1].state = StreamState::Init(StreamStateInit::default()); + p.streams[0].state = dummy_stream_state_init(None); + p.streams[1].state = dummy_stream_state_init(None); // PLAY. super::parse_play( @@ -966,14 +979,14 @@ mod tests { &mut p, ) .unwrap(); - match p.streams[0].state { + match &p.streams[0].state { StreamState::Init(state) => { assert_eq!(state.initial_seq, Some(16852)); assert_eq!(state.initial_rtptime, Some(1070938629)); } _ => panic!(), }; - match p.streams[1].state { + match &p.streams[1].state { StreamState::Init(state) => { assert_eq!(state.initial_rtptime, Some(3075976528)); assert_eq!(state.ssrc, Some(0x9fc9fff8)); @@ -1040,12 +1053,12 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(0)); assert_eq!(setup_response.ssrc, None); - p.streams[0].state = StreamState::Init(StreamStateInit::default()); - p.streams[1].state = StreamState::Init(StreamStateInit::default()); + p.streams[0].state = dummy_stream_state_init(None); + p.streams[1].state = dummy_stream_state_init(None); // PLAY. super::parse_play(&response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap(); - match p.streams[1].state { + match &p.streams[1].state { StreamState::Init(state) => { assert_eq!(state.initial_rtptime, Some(0)); assert_eq!(state.initial_seq, Some(1)); @@ -1207,11 +1220,7 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(0)); assert_eq!(setup_response.ssrc, None); - p.streams[0].state = StreamState::Init(StreamStateInit { - ssrc: None, - initial_seq: None, - initial_rtptime: None, - }); + p.streams[0].state = dummy_stream_state_init(None); let setup_response = response(include_bytes!("testdata/gw_main_setup_audio.txt")); let setup_response = super::parse_setup(&setup_response).unwrap(); @@ -1224,11 +1233,7 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(2)); assert_eq!(setup_response.ssrc, None); - p.streams[1].state = StreamState::Init(StreamStateInit { - ssrc: None, - initial_seq: None, - initial_rtptime: None, - }); + p.streams[1].state = dummy_stream_state_init(None); // PLAY. super::parse_play( @@ -1297,11 +1302,7 @@ mod tests { ); assert_eq!(setup_response.channel_id, Some(0)); assert_eq!(setup_response.ssrc, None); - p.streams[0].state = StreamState::Init(StreamStateInit { - ssrc: None, - initial_seq: None, - initial_rtptime: None, - }); + p.streams[0].state = dummy_stream_state_init(None); // PLAY. super::parse_play( diff --git a/src/client/rtp.rs b/src/client/rtp.rs index c35b151..ffd7519 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -7,7 +7,9 @@ use bytes::{Buf, Bytes}; use log::{debug, trace}; use crate::client::PacketItem; -use crate::{ConnectionContext, Error, ErrorInt, PacketContext}; +use crate::{ + ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner, +}; use super::{SessionOptions, Timeline}; @@ -102,6 +104,7 @@ impl InorderParser { pub fn rtp( &mut self, session_options: &SessionOptions, + stream_ctx: StreamContextRef, tool: Option<&super::Tool>, conn_ctx: &ConnectionContext, pkt_ctx: &PacketContext, @@ -112,6 +115,7 @@ impl InorderParser { let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| { wrap!(ErrorInt::PacketError { conn_ctx: *conn_ctx, + stream_ctx: stream_ctx.to_owned(), pkt_ctx: *pkt_ctx, stream_id, description: format!( @@ -138,12 +142,13 @@ impl InorderParser { let ssrc = reader.ssrc(); let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); if matches!(self.ssrc, Some(s) if s != ssrc) { - if matches!(session_options.transport, super::Transport::Tcp) { + if matches!(stream_ctx.0, StreamContextRefInner::Udp(_)) { super::note_stale_live555_data(tool, session_options); } bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, pkt_ctx: *pkt_ctx, + stream_ctx: stream_ctx.to_owned(), stream_id, ssrc, sequence_number, @@ -155,10 +160,11 @@ impl InorderParser { }); } if loss > 0x80_00 { - if matches!(session_options.transport, super::Transport::Tcp) { + if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) { bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, pkt_ctx: *pkt_ctx, + stream_ctx: stream_ctx.to_owned(), stream_id, ssrc, sequence_number, @@ -182,6 +188,7 @@ impl InorderParser { Err(description) => bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, pkt_ctx: *pkt_ctx, + stream_ctx: stream_ctx.to_owned(), stream_id, ssrc, sequence_number, @@ -193,6 +200,7 @@ impl InorderParser { let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| { wrap!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, + stream_ctx: stream_ctx.to_owned(), pkt_ctx: *pkt_ctx, stream_id, ssrc, @@ -220,6 +228,7 @@ impl InorderParser { pub fn rtcp( &mut self, session_options: &SessionOptions, + stream_ctx: StreamContextRef, tool: Option<&super::Tool>, pkt_ctx: &PacketContext, timeline: &mut Timeline, @@ -247,7 +256,7 @@ impl InorderParser { let ssrc = pkt.ssrc(); if matches!(self.ssrc, Some(s) if s != ssrc) { - if matches!(session_options.transport, super::Transport::Tcp) { + if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) { super::note_stale_live555_data(tool, session_options); } return Err(format!( @@ -274,6 +283,10 @@ impl InorderParser { #[cfg(test)] mod tests { + use std::net::{IpAddr, Ipv4Addr}; + + use crate::client::UdpStreamContext; + use super::*; /// Checks dropping and logging Geovision's extra payload type 50 packets. @@ -284,10 +297,12 @@ mod tests { fn geovision_pt50_packet() { let mut timeline = Timeline::new(None, 90_000, None).unwrap(); let mut parser = InorderParser::new(Some(0xd25614e), None); + let stream_ctx = StreamContextRef::dummy(); // Normal packet. match parser.rtp( &SessionOptions::default(), + stream_ctx, None, &ConnectionContext::dummy(), &PacketContext::dummy(), @@ -311,6 +326,7 @@ mod tests { // Mystery pt=50 packet with same sequence number. match parser.rtp( &SessionOptions::default(), + stream_ctx, None, &ConnectionContext::dummy(), &PacketContext::dummy(), @@ -336,10 +352,17 @@ mod tests { fn out_of_order() { let mut timeline = Timeline::new(None, 90_000, None).unwrap(); let mut parser = InorderParser::new(Some(0xd25614e), None); - - let session_options = SessionOptions::default().transport(crate::client::Transport::Udp); + let udp = UdpStreamContext { + local_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), + peer_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), + local_rtp_port: 0, + peer_rtp_port: 0, + }; + let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp)); + let session_options = SessionOptions::default(); match parser.rtp( &session_options, + stream_ctx, None, &ConnectionContext::dummy(), &PacketContext::dummy(), @@ -364,6 +387,7 @@ mod tests { match parser.rtp( &session_options, + stream_ctx, None, &ConnectionContext::dummy(), &PacketContext::dummy(), @@ -386,6 +410,7 @@ mod tests { match parser.rtp( &session_options, + stream_ctx, None, &ConnectionContext::dummy(), &PacketContext::dummy(), diff --git a/src/codec/aac.rs b/src/codec/aac.rs index cdbf06f..9d87536 100644 --- a/src/codec/aac.rs +++ b/src/codec/aac.rs @@ -21,7 +21,7 @@ use std::{ num::{NonZeroU16, NonZeroU32}, }; -use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error}; +use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error, StreamContextRef}; use super::CodecItem; @@ -670,6 +670,7 @@ impl Depacketizer { pub(super) fn pull( &mut self, conn_ctx: &ConnectionContext, + stream_ctx: StreamContextRef, ) -> Result, Error> { match std::mem::take(&mut self.state) { s @ DepacketizerState::Idle { .. } | s @ DepacketizerState::Fragmented(..) => { @@ -692,6 +693,7 @@ impl Depacketizer { // says "receivers MUST support de-interleaving". return Err(error( *conn_ctx, + stream_ctx, agg, "interleaving not yet supported".to_owned(), )); @@ -701,6 +703,7 @@ impl Depacketizer { if agg.frame_count != 1 { return Err(error( *conn_ctx, + stream_ctx, agg, "fragmented AUs must not share packets".to_owned(), )); @@ -719,6 +722,7 @@ impl Depacketizer { } return Err(error( *conn_ctx, + stream_ctx, agg, "mark can't be set on beginning of fragment".to_owned(), )); @@ -737,6 +741,7 @@ impl Depacketizer { if !agg.mark { return Err(error( *conn_ctx, + stream_ctx, agg, "mark must be set on non-fragmented au".to_owned(), )); @@ -756,6 +761,7 @@ impl Depacketizer { None => { return Err(error( *conn_ctx, + stream_ctx, agg, format!( "aggregate timestamp {} + {} overflows", @@ -778,9 +784,15 @@ impl Depacketizer { } } -fn error(conn_ctx: ConnectionContext, agg: Aggregate, description: String) -> Error { +fn error( + conn_ctx: ConnectionContext, + stream_ctx: StreamContextRef, + agg: Aggregate, + description: String, +) -> Error { Error(std::sync::Arc::new(ErrorInt::RtpPacketError { conn_ctx, + stream_ctx: stream_ctx.to_owned(), pkt_ctx: agg.ctx, stream_id: agg.stream_id, ssrc: agg.ssrc, @@ -840,13 +852,19 @@ mod tests { ]), }) .unwrap(); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.timestamp, timestamp); assert_eq!(&a.data[..], b"asdf"); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Aggregate of 3 frames. d.push(Packet { @@ -868,25 +886,37 @@ mod tests { ]), }) .unwrap(); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.timestamp, timestamp); assert_eq!(&a.data[..], b"foo"); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.timestamp, timestamp.try_add(1_024).unwrap()); assert_eq!(&a.data[..], b"bar"); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.timestamp, timestamp.try_add(2_048).unwrap()); assert_eq!(&a.data[..], b"baz"); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Fragment across 3 packets. d.push(Packet { @@ -907,7 +937,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); d.push(Packet { // fragment 2/3. ctx: crate::PacketContext::dummy(), @@ -926,7 +959,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); d.push(Packet { // fragment 3/3. ctx: crate::PacketContext::dummy(), @@ -945,13 +981,19 @@ mod tests { ]), }) .unwrap(); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.timestamp, timestamp); assert_eq!(&a.data[..], b"foobarbaz"); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); } /// Tests that depacketization skips/reports a frame in which its first packet was lost. @@ -986,7 +1028,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); d.push(Packet { ctx: crate::PacketContext::dummy(), stream_id: 0, @@ -1004,7 +1049,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Following frame reports the loss. d.push(Packet { @@ -1025,13 +1073,19 @@ mod tests { ]), }) .unwrap(); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.loss, 1); assert_eq!(&a.data[..], b"asdf"); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); } /// Tests that depacketization skips/reports a frame in which an interior frame is lost. @@ -1066,7 +1120,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Fragment 2/3 is lost d.push(Packet { // 3/3 reports the loss @@ -1086,7 +1143,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Following frame reports the loss. d.push(Packet { @@ -1107,13 +1167,19 @@ mod tests { ]), }) .unwrap(); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.loss, 1); assert_eq!(&a.data[..], b"asdf"); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); } /// Tests that depacketization skips/reports a frame in which the interior frame is lost. @@ -1148,7 +1214,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Fragment 2/3 is lost d.push(Packet { // 3/3 reports the loss @@ -1168,7 +1237,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Following frame reports the loss. d.push(Packet { @@ -1189,13 +1261,19 @@ mod tests { ]), }) .unwrap(); - let a = match d.pull(&ConnectionContext::dummy()).unwrap() { + let a = match d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + { Some(CodecItem::AudioFrame(a)) => a, _ => unreachable!(), }; assert_eq!(a.loss, 1); assert_eq!(&a.data[..], b"asdf"); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); } /// Tests the distinction between `loss` and `loss_since_last_mark`. @@ -1233,7 +1311,10 @@ mod tests { ]), }) .unwrap(); - assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none()); + assert!(d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap() + .is_none()); // Incomplete fragment with no reported loss. d.push(Packet { @@ -1254,7 +1335,9 @@ mod tests { ]), }) .unwrap(); - let e = d.pull(&ConnectionContext::dummy()).unwrap_err(); + let e = d + .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) + .unwrap_err(); let e_str = e.to_string(); assert!( e_str.contains("mark can't be set on beginning of fragment"), diff --git a/src/codec/mod.rs b/src/codec/mod.rs index ea97417..c56b80f 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -12,6 +12,7 @@ use std::num::{NonZeroU16, NonZeroU32}; use crate::client::rtp; use crate::ConnectionContext; use crate::Error; +use crate::StreamContextRef; use bytes::{Buf, Bytes}; pub(crate) mod aac; @@ -470,9 +471,13 @@ impl Depacketizer { /// /// Some packetization formats support aggregating multiple frames into one packet, so a single /// `push` call may cause `pull` to return `Ok(Some(...))` more than once. - pub fn pull(&mut self, conn_ctx: &ConnectionContext) -> Result, Error> { + pub fn pull( + &mut self, + conn_ctx: &ConnectionContext, + stream_ctx: StreamContextRef, + ) -> Result, Error> { match &mut self.0 { - DepacketizerInner::Aac(d) => d.pull(conn_ctx), + DepacketizerInner::Aac(d) => d.pull(conn_ctx, stream_ctx), DepacketizerInner::G723(d) => Ok(d.pull()), DepacketizerInner::H264(d) => Ok(d.pull()), DepacketizerInner::Onvif(d) => Ok(d.pull()), diff --git a/src/error.rs b/src/error.rs index 6f479d1..6b4717e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, sync::Arc}; -use crate::{ConnectionContext, PacketContext, RtspMessageContext}; +use crate::{ConnectionContext, PacketContext, RtspMessageContext, StreamContext, WallTime}; use bytes::Bytes; use thiserror::Error; @@ -40,15 +40,15 @@ pub(crate) enum ErrorInt { InvalidArgument(String), /// Unparseable or unexpected RTSP message. - #[error("[{conn_ctx}, {msg_ctx}] RTSP framing error: {description}")] + #[error("RTSP framing error: {description}\n\nconn: {conn_ctx}\nmsg: {msg_ctx}")] RtspFramingError { conn_ctx: ConnectionContext, msg_ctx: RtspMessageContext, description: String, }, - #[error("[{conn_ctx}, {msg_ctx}] {status} response to {} CSeq={cseq}: \ - {description}", Into::<&str>::into(.method))] + #[error("{status} response to {} CSeq={cseq}: {description}\n\n\ + conn: {conn_ctx}\nmsg: {msg_ctx}", Into::<&str>::into(.method))] RtspResponseError { conn_ctx: ConnectionContext, msg_ctx: RtspMessageContext, @@ -59,8 +59,8 @@ pub(crate) enum ErrorInt { }, #[error( - "[{conn_ctx}, {msg_ctx}] Received interleaved data on unassigned channel {channel_id}: \n\ - {:?}", + "Received interleaved data on unassigned channel {channel_id}: \n\ + {:?}\n\nconn: {conn_ctx}\nmsg: {msg_ctx}", crate::hex::LimitedHex::new(data, 64) )] RtspUnassignedChannelError { @@ -70,20 +70,23 @@ pub(crate) enum ErrorInt { data: Bytes, }, - #[error("[{conn_ctx}, {pkt_ctx} stream {stream_id}]: {description}")] + #[error("{description}\n\nconn: {conn_ctx}\nstream: {stream_ctx}\npkt: {pkt_ctx}")] PacketError { conn_ctx: ConnectionContext, + stream_ctx: StreamContext, pkt_ctx: PacketContext, stream_id: usize, description: String, }, #[error( - "[{conn_ctx}, {pkt_ctx}, stream={stream_id}, ssrc={ssrc:08x}, \ - seq={sequence_number:08x}] {description}" + "{description}\n\n\ + conn: {conn_ctx}\nstream: {stream_ctx}\n\ + ssrc: {ssrc:08x}\nseq: {sequence_number:08x}\npkt: {pkt_ctx}" )] RtpPacketError { conn_ctx: ConnectionContext, + stream_ctx: StreamContext, pkt_ctx: crate::PacketContext, stream_id: usize, ssrc: u32, @@ -94,21 +97,25 @@ pub(crate) enum ErrorInt { #[error("Unable to connect to RTSP server: {0}")] ConnectError(#[source] std::io::Error), - #[error("[{conn_ctx}, {msg_ctx}] Error reading from RTSP peer: {source}")] + #[error("Error reading from RTSP peer: {source}\n\nconn: {conn_ctx}\nmsg: {msg_ctx}")] RtspReadError { conn_ctx: ConnectionContext, msg_ctx: RtspMessageContext, source: std::io::Error, }, - #[error("[{conn_ctx}, {pkt_ctx}] Error receiving UDP packet: {source}")] + #[error( + "Error receiving UDP packet: {source}\n\n\ + conn: {conn_ctx}\nstream: {stream_ctx}\nat: {when}" + )] UdpRecvError { conn_ctx: ConnectionContext, - pkt_ctx: PacketContext, + stream_ctx: StreamContext, + when: WallTime, source: std::io::Error, }, - #[error("[{conn_ctx}] Error writing to RTSP peer: {source}")] + #[error("Error writing to RTSP peer: {source}\n\nconn: {conn_ctx}")] WriteError { conn_ctx: ConnectionContext, source: std::io::Error, diff --git a/src/lib.rs b/src/lib.rs index 25d376b..ec2f567 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -330,6 +330,107 @@ impl Display for RtspMessageContext { } } +/// Context for an active stream (RTP+RTCP session), either TCP or UDP. Owned version. +#[derive(Copy, Clone, Debug)] +pub struct StreamContext(StreamContextInner); + +impl StreamContext { + #[doc(hidden)] + pub fn dummy() -> Self { + StreamContext(StreamContextInner::Dummy) + } +} + +impl Display for StreamContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.0 { + StreamContextInner::Tcp(tcp) => { + write!( + f, + "TCP, interleaved channel ids {}-{}", + tcp.rtp_channel_id, + tcp.rtp_channel_id + 1 + ) + } + StreamContextInner::Udp(udp) => Display::fmt(udp, f), + StreamContextInner::Dummy => write!(f, "dummy"), + } + } +} + +#[derive(Copy, Clone, Debug)] +enum StreamContextInner { + Tcp(TcpStreamContext), + Udp(UdpStreamContext), + Dummy, +} + +/// Context for an active stream (RTP+RTCP session), either TCP or UDP. Reference version. +#[derive(Copy, Clone, Debug)] +pub struct StreamContextRef<'a>(StreamContextRefInner<'a>); + +impl StreamContextRef<'_> { + #[doc(hidden)] + pub fn dummy() -> Self { + StreamContextRef(StreamContextRefInner::Dummy) + } +} + +#[derive(Copy, Clone, Debug)] +enum StreamContextRefInner<'a> { + Tcp(TcpStreamContext), // this one is by value because TcpStreamContext is small. + Udp(&'a UdpStreamContext), + Dummy, +} + +impl StreamContextRef<'_> { + fn to_owned(self) -> StreamContext { + StreamContext(match self.0 { + StreamContextRefInner::Tcp(tcp) => StreamContextInner::Tcp(tcp), + StreamContextRefInner::Udp(udp) => StreamContextInner::Udp(*udp), + StreamContextRefInner::Dummy => StreamContextInner::Dummy, + }) + } +} + +/// Context for a UDP stream (aka UDP-based RTP transport). Unstable/internal. Exposed for benchmarks. +/// +/// This stores only the RTP addresses; the RTCP addresses are assumed to use +/// the same IP and one port higher. +#[doc(hidden)] +#[derive(Copy, Clone, Debug)] +pub struct UdpStreamContext { + local_ip: IpAddr, + peer_ip: IpAddr, + local_rtp_port: u16, + peer_rtp_port: u16, +} + +/// Context for a TCP stream. Unstable/internal. Exposed for benchmarks. +/// +/// This stores only the RTP channel id; the RTCP channel id is assumed to be one higher. +#[doc(hidden)] +#[derive(Copy, Clone, Debug)] +pub struct TcpStreamContext { + rtp_channel_id: u8, +} + +impl Display for UdpStreamContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // TODO: this assumes we are the client. Revisit when adding server support. + write!( + f, + "{}:{}-{}(me) -> {}:{}-{}", + self.local_ip, + self.local_rtp_port, + self.local_rtp_port + 1, + self.peer_ip, + self.peer_rtp_port, + self.peer_rtp_port + 1 + ) + } +} + /// Context for an RTP or RTCP packet, received either via RTSP interleaved data or UDP. /// /// Should be paired with an [`ConnectionContext`] of the RTSP connection that started @@ -347,35 +448,16 @@ impl PacketContext { #[derive(Copy, Clone, Debug)] enum PacketContextInner { - Tcp { - msg_ctx: RtspMessageContext, - channel_id: u8, - }, - Udp { - local_addr: SocketAddr, - peer_addr: SocketAddr, - received_wall: WallTime, - }, + Tcp { msg_ctx: RtspMessageContext }, + Udp { received_wall: WallTime }, Dummy, } impl Display for PacketContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.0 { - PacketContextInner::Udp { - local_addr, - peer_addr, - received_wall, - .. - } => { - write!(f, "{}->{}@{}", peer_addr, local_addr, received_wall) - } - PacketContextInner::Tcp { - msg_ctx, - channel_id, - } => { - write!(f, "{} ch={}", msg_ctx, channel_id) - } + PacketContextInner::Udp { received_wall } => std::fmt::Display::fmt(&received_wall, f), + PacketContextInner::Tcp { msg_ctx } => std::fmt::Display::fmt(&msg_ctx, f), PacketContextInner::Dummy => write!(f, "dummy"), } }