diff --git a/CHANGELOG.md b/CHANGELOG.md index 776e57c..3f46f91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,13 @@ `#[doc(hidden)]`. * BREAKING: `retina::client::Session::setup` takes a new `SetupOptions` argument for future expansion. -* BREAKING: `retina::StreamContext` has been split out of `retina::PacketContext`. - Both must be printed to provide the same information as before. This change - reduces how much data needs to be copied with each packet. +* BREAKING: the transport to use is configured per-stream as part of + `retina::SetupOptions` (rather than the prior `retina::SessionOptions`) and + takes per-transport options for future expansion. +* BREAKING: `retina::StreamContext` has been split out of + `retina::PacketContext`. Both must be printed to provide the same + information as before. This change reduces how much data needs to be copied + with each packet. ## `v0.3.9` (2022-04-12) diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 35097b9..de15fb2 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -726,7 +726,7 @@ async fn write_mp4<'a>( } pub async fn run(opts: Opts) -> Result<(), Error> { - if matches!(opts.transport, Transport::Udp) && !opts.allow_loss { + if matches!(opts.transport, Transport::Udp(_)) && !opts.allow_loss { warn!("Using --transport=udp without strongly recommended --allow-loss!"); } @@ -739,7 +739,6 @@ pub async fn run(opts: Opts) -> Result<(), Error> { .creds(creds) .session_group(session_group.clone()) .user_agent("Retina mp4 example".to_owned()) - .transport(opts.transport) .teardown(opts.teardown), ) .await?; @@ -766,7 +765,9 @@ pub async fn run(opts: Opts) -> Result<(), Error> { None }; if let Some(i) = video_stream_i { - session.setup(i, SetupOptions::default()).await?; + session + .setup(i, SetupOptions::default().transport(opts.transport.clone())) + .await?; } let audio_stream = if !opts.no_audio { let s = session @@ -795,7 +796,9 @@ pub async fn run(opts: Opts) -> Result<(), Error> { None }; if let Some((i, _)) = audio_stream { - session.setup(i, SetupOptions::default()).await?; + session + .setup(i, SetupOptions::default().transport(opts.transport.clone())) + .await?; } if video_stream_i.is_none() && audio_stream.is_none() { bail!("Exiting because no video or audio stream was selected; see info log messages above"); diff --git a/src/client/mod.rs b/src/client/mod.rs index fb022fe..a05af78 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -55,7 +55,7 @@ struct StaleSession { teardown_rx: Option>>>, maybe_playing: bool, - is_tcp: bool, + has_tcp: bool, /// Upper bound of advertised expiration time. expires: tokio::time::Instant, @@ -398,12 +398,11 @@ impl std::str::FromStr for InitialTimestampPolicy { /// Options which must be known right as a session is created. /// -/// Decisions which can be deferred are in [PlayOptions] instead. +/// Decisions which can be deferred are in [`SetupOptions`] or [`PlayOptions`] instead. #[derive(Default)] pub struct SessionOptions { creds: Option, user_agent: Option>, - transport: Transport, session_group: Option>, teardown: TeardownPolicy, unassigned_channel_data: UnassignedChannelDataPolicy, @@ -480,10 +479,11 @@ impl std::str::FromStr for UnassignedChannelDataPolicy { /// The RTP packet transport to request. /// /// Defaults to `Transport::Tcp`. -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] +#[non_exhaustive] pub enum Transport { /// Sends RTP packets over the RTSP TCP connection via interleaved data. - Tcp, + Tcp(TcpTransportOptions), /// Sends RTP packets over UDP (experimental). /// @@ -492,20 +492,20 @@ pub enum Transport { /// * There's no support for sending RTCP RRs (receiver reports), so /// servers won't have the correct information to measure packet loss /// and pace packets appropriately. - Udp, + Udp(UdpTransportOptions), } impl Default for Transport { fn default() -> Self { - Transport::Tcp + Transport::Tcp(TcpTransportOptions::default()) } } impl std::fmt::Display for Transport { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.pad(match self { - Transport::Tcp => "tcp", - Transport::Udp => "udp", + Transport::Tcp(_) => "tcp", + Transport::Udp(_) => "udp", }) } } @@ -515,8 +515,8 @@ impl std::str::FromStr for Transport { fn from_str(s: &str) -> Result { Ok(match s { - "tcp" => Transport::Tcp, - "udp" => Transport::Udp, + "tcp" => Transport::Tcp(TcpTransportOptions::default()), + "udp" => Transport::Udp(UdpTransportOptions::default()), _ => bail!(ErrorInt::InvalidArgument(format!( "bad Transport {}; \ expected tcp or udp", @@ -526,8 +526,19 @@ impl std::str::FromStr for Transport { } } +/// Per-stream TCP transport options (placeholder for future expansion). +#[derive(Clone, Default, Debug)] +#[non_exhaustive] +pub struct TcpTransportOptions; + +/// Per-stream UDP transport options (placeholder for future expansion). +#[derive(Clone, Default, Debug)] +#[non_exhaustive] +pub struct UdpTransportOptions; + impl SessionOptions { /// Uses the given credentials when/if the server requests digest authentication. + #[inline] pub fn creds(mut self, creds: Option) -> Self { self.creds = creds; self @@ -543,12 +554,6 @@ impl SessionOptions { self } - /// Sets the underlying transport to use. - pub fn transport(mut self, transport: Transport) -> Self { - self.transport = transport; - self - } - pub fn session_group(mut self, session_group: Arc) -> Self { self.session_group = Some(session_group); self @@ -570,8 +575,18 @@ impl SessionOptions { /// There's nothing here yet. Likely in the future this will allow configuring /// the output format for H.264 (Annex B or not). #[derive(Default)] -#[non_exhaustive] -pub struct SetupOptions {} +pub struct SetupOptions { + transport: Transport, +} + +impl SetupOptions { + /// Sets the underlying transport to use. + #[inline] + pub fn transport(mut self, transport: Transport) -> Self { + self.transport = transport; + self + } +} /// Options which must be decided at `PLAY` time. /// @@ -952,9 +967,8 @@ struct SessionInner { keepalive_timer: Option>>, - /// Set if the server may be in state Playing: we have sent a `PLAY` - /// request, regardless of if the response has been received. - maybe_playing: bool, + /// Bitmask of [`SessionFlag`]s. + flags: u8, /// The index within `presentation.streams` to start the next poll at. /// Round-robining between them rather than always starting at 0 should @@ -962,6 +976,20 @@ struct SessionInner { udp_next_poll_i: usize, } +#[derive(Copy, Clone)] +#[repr(u8)] +enum SessionFlag { + /// Set if the server may be in state Playing: we have sent a `PLAY` + /// request, regardless of if the response has been received. + MaybePlaying = 0x1, + + /// Set if one or more streams are configured to use TCP. + TcpStreams = 0x2, + + /// Set if one or more streams are configured to use UDP. + UdpStreams = 0x4, +} + impl RtspConnection { async fn connect(url: &Url) -> Result { let host = @@ -1293,7 +1321,7 @@ impl Session { describe_status, keepalive_state: KeepaliveState::Idle, keepalive_timer: None, - maybe_playing: false, + flags: 0, udp_next_poll_i: 0, }), Described { sdp }, @@ -1319,27 +1347,24 @@ impl Session { /// /// Panics if `stream_i >= self.streams().len()`. pub async fn setup(&mut self, stream_i: usize, options: SetupOptions) -> Result<(), Error> { - let _ = options; // not yet used. let inner = &mut self.0.as_mut().project(); - let presentation = &mut inner.presentation; - let options = &inner.options; let conn = inner .conn .as_mut() .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?; - let stream = &mut presentation.streams[stream_i]; + let stream = &mut inner.presentation.streams[stream_i]; if !matches!(stream.state, StreamState::Uninit) { bail!(ErrorInt::FailedPrecondition("stream already set up".into())); } let url = stream .control .as_ref() - .unwrap_or(&presentation.control) + .unwrap_or(&inner.presentation.control) .clone(); let mut req = rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0).request_uri(url); let udp = match options.transport { - Transport::Tcp => { + Transport::Tcp(_) => { let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| { wrap!(ErrorInt::FailedPrecondition( "no unassigned channels".into() @@ -1353,9 +1378,10 @@ impl Session { proposed_channel_id + 1 ), ); + *inner.flags |= SessionFlag::TcpStreams as u8; None } - Transport::Udp => { + Transport::Udp(_) => { // Bind an ephemeral UDP port on the same local address used to connect // to the RTSP server. let local_ip = conn.inner.ctx().local_addr.ip(); @@ -1369,6 +1395,7 @@ impl Session { pair.rtp_port + 1, ), ); + *inner.flags |= SessionFlag::UdpStreams as u8; Some(UdpStreamTransport { ctx: UdpStreamContext { local_ip, @@ -1388,7 +1415,7 @@ impl Session { .send( ResponseMode::Normal, inner.options, - presentation.tool.as_ref(), + inner.presentation.tool.as_ref(), inner.requested_auth, &mut req.build(Bytes::new()), ) @@ -1515,7 +1542,7 @@ impl Session { )) })?; if let Some(ref t) = inner.presentation.tool { - if matches!(inner.options.transport, Transport::Tcp) { + if (*inner.flags & (SessionFlag::TcpStreams as u8)) != 0 { warn!( "Connecting via TCP to known-broken RTSP server {:?}. \ See . \ @@ -1526,7 +1553,7 @@ impl Session { } trace!("PLAY with channel mappings: {:#?}", &conn.channels); - *inner.maybe_playing = true; + *inner.flags |= SessionFlag::MaybePlaying as u8; let (msg_ctx, cseq, response) = conn .send( ResponseMode::Play, @@ -1675,7 +1702,7 @@ fn note_stale_live555_data(tool: Option<&Tool>, options: &SessionOptions) { { let mut lock = group.sessions.lock().unwrap(); for s in &lock.sessions { - if s.is_tcp { + if s.has_tcp { // This session plausibly explains the packet. // (We could go so far as to examine the data packet's SSRC to // see if it matches one associated with this session. But @@ -1704,7 +1731,7 @@ fn note_stale_live555_data(tool: Option<&Tool>, options: &SessionOptions) { seqnum, expires, teardown_rx: None, - is_tcp: true, + has_tcp: true, maybe_playing: true, }); } @@ -2119,9 +2146,9 @@ impl PinnedDrop for SessionInner { fn drop(self: Pin<&mut Self>) { let this = self.project(); - let is_tcp = matches!(this.options.transport, Transport::Tcp); + let has_tcp = (*this.flags & (SessionFlag::TcpStreams as u8)) != 0; let just_try_once = match this.options.teardown { - TeardownPolicy::Auto if is_tcp => { + TeardownPolicy::Auto if has_tcp => { // If the server is known to have the live555 bug, try really hard to send a // TEARDOWN before considering the session cleaned up. Otherwise, try once on // the existing connection, primarily in case the server has @@ -2156,8 +2183,8 @@ impl PinnedDrop for SessionInner { seqnum, expires, teardown_rx: Some(teardown_rx), - is_tcp, - maybe_playing: *this.maybe_playing, + has_tcp, + maybe_playing: *this.flags & (SessionFlag::MaybePlaying as u8) != 0, }); log::debug!( "{:?}/{} tracking TEARDOWN of session id {}", @@ -2222,7 +2249,7 @@ impl futures::Stream for Session { } // Next try receiving data on the UDP sockets, if any. - if matches!(self.0.options.transport, Transport::Udp) { + if self.0.flags & (SessionFlag::UdpStreams as u8) != 0 { if let Poll::Ready(result) = self.as_mut().poll_udp(cx) { return Poll::Ready(result); }