diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d209ac..c2b0f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * improve compatibility with cameras with non-compliant SDP, including Anpviz ([#26](https://github.com/scottlamb/retina/issues/26) and Geovision ([#33])(https://github.com/scottlamb/retina/issues/33)). +* new mechanism to more reliably send `TEARDOWN` requests. ## `v0.3.0` (2021-08-31) diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 8eee2dc..713a5dc 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -26,10 +26,10 @@ use retina::{ codec::{AudioParameters, CodecItem, VideoParameters}, }; -use std::io::SeekFrom; use std::num::NonZeroU32; use std::path::PathBuf; use std::{convert::TryFrom, pin::Pin}; +use std::{io::SeekFrom, sync::Arc}; use tokio::{ fs::File, io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}, @@ -621,11 +621,20 @@ async fn copy<'a>( /// Writes the `.mp4`, including trying to finish or clean up the file. async fn write_mp4<'a>( opts: &'a Opts, - session: &'a mut retina::client::Demuxed, + session: retina::client::Session, video_stream: Option<(usize, VideoParameters)>, audio_stream: Option<(usize, AudioParameters)>, stop_signal: Pin>>>, ) -> Result<(), Error> { + let mut session = session + .play( + retina::client::PlayOptions::default() + .initial_timestamp(opts.initial_timestamp) + .enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()), + ) + .await? + .demuxed()?; + // Append into a filename suffixed with ".partial", then try to either rename it into // place if it's complete or delete it otherwise. const PARTIAL_SUFFIX: &str = ".partial"; @@ -641,7 +650,7 @@ async fn write_mp4<'a>( ) .await?; - let result = copy(opts, session, stop_signal, &mut mp4).await; + let result = copy(opts, &mut session, stop_signal, &mut mp4).await; if let Err(e) = result { // Log errors about finishing, returning the original error. if let Err(e) = mp4.finish().await { @@ -677,10 +686,12 @@ pub async fn run(opts: Opts) -> Result<(), Error> { let creds = super::creds(opts.src.username.clone(), opts.src.password.clone()); let stop_signal = Box::pin(tokio::signal::ctrl_c()); + let session_group = Arc::new(retina::client::SessionGroup::default()); let mut session = retina::client::Session::describe( opts.src.url.clone(), retina::client::SessionOptions::default() .creds(creds) + .session_group(session_group.clone()) .user_agent("Retina mp4 example".to_owned()) .transport(opts.transport), ) @@ -715,20 +726,11 @@ pub async fn run(opts: Opts) -> Result<(), Error> { if let Some((i, _)) = audio_stream { session.setup(i).await?; } - let mut session = session - .play( - retina::client::PlayOptions::default() - .initial_timestamp(opts.initial_timestamp) - .enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()), - ) - .await? - .demuxed()?; + let result = write_mp4(&opts, session, video_stream, audio_stream, stop_signal).await; - // TODO: should also send a TEARDOWN if the PLAY response won't parse or if - // demuxed() fails. The former isn't even possible with the current API. - - let result = write_mp4(&opts, &mut session, video_stream, audio_stream, stop_signal).await; - if let Err(e) = session.teardown().await { + // Session has now been dropped, on success or failure. A TEARDOWN should + // be pending if necessary. session_group.teardown() will wait for it. + if let Err(e) = session_group.teardown().await { log::error!("TEARDOWN failed: {}", e); } result diff --git a/src/client/mod.rs b/src/client/mod.rs index 3ce6d80..1383952 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -4,6 +4,7 @@ use std::mem::MaybeUninit; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::num::NonZeroU32; +use std::sync::{Arc, Mutex}; use std::task::Poll; use std::time::Instant; use std::{borrow::Cow, fmt::Debug, num::NonZeroU16, pin::Pin}; @@ -18,6 +19,7 @@ use rtsp_types::Method; use tokio::net::UdpSocket; use url::Url; +use crate::client::parse::SessionHeader; use crate::codec::CodecItem; use crate::{Error, ErrorInt, RtspMessageContext}; @@ -29,6 +31,169 @@ mod timeline; /// Duration between keepalive RTSP requests during [Playing] state. pub const KEEPALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(30); +/// A stale RTP session. +struct StaleSession { + /// If this stale session was created from a dropped [`Session`], the + /// a channel which receives the result of a single `TEARDOWN` attempt. + teardown_receiver: Option>>>, + + id: Option>, + + is_tcp: bool, + + /// Upper bound of advertised expiration time. + expires: tokio::time::Instant, +} + +/// A grouping of sessions, currently used only to track stale sessions. +/// +/// This is an experimental API which may change in an upcoming Retina version. +/// +/// Stale sessions are ones that may still be attempting data transmission. +/// They are tracked in three cases: +/// +/// 1. UDP sessions for which we've sent a `PLAY` request and dropped the +/// `Session` without receiving a `TEARDOWN` response. These may still be +/// consuming bandwidth. +/// 2. TCP sessions in a similar situation, if the server advertises a specific +/// buggy version of live555 (see [`has_live555_tcp_bug`]). These sessions +/// at least consume CPU on the server, and they will cause problems if +/// another connection claims the same file descriptor. +/// 3. TCP sessions we learn about via unexpected RTSP interleaved data +/// packets. These are assumed to be caused by the same bug as #2 but might +/// have been started by a process unknown to us. +/// +/// Currently in cases #1 and #2, a single `TEARDOWN` will be attempted in the +/// background after the `Session` is dropped. [`SessionGroup::teardown`] can +/// be used to wait for it to conclude. +/// +/// Stale sessions are forgotten either on teardown or expiration. In general, +/// the tracked expiration time is worst-case. The exception is if the sender +/// hasn't responded to a keepalive request. In that case there's theoretically +/// no bound on when the server could see the request and extend the session. +/// Retina ignores this possibility. +/// +/// A SessionGroup can be of any granularity, but a typical use is to ensure +/// there are no stale sessions before starting a fresh session. Groups should +/// be sized to match that idea. If connecting to a live555 server affected by +/// the stale TCP session bug, it might be wise to have one group per server, so +/// that all such sessions can be drained before initiating new connections. +/// Otherwise it might be useful to have one group per describe URL (potentially +/// several per server) and have at most one active session per URL. +#[derive(Default)] +pub struct SessionGroup(Mutex); + +#[derive(Default)] +struct SessionGroupInner { + sessions: Vec, +} + +/// The overall status of stale sessions belonging to a group. +pub struct StaleSessionStatus { + /// The maximum expire time of any stale session in this group. + pub max_expires: Option, + + /// The total number of stale sessions. + pub num_sessions: usize, +} + +impl SessionGroup { + /// Returns the status of stale sessions in this group. + /// + /// The caller might use this in a loop to sleep until there are no expired + /// sessions. + pub fn stale_sessions(&self) -> StaleSessionStatus { + let mut l = self.0.lock().unwrap(); + l.prune(tokio::time::Instant::now()); + StaleSessionStatus { + max_expires: l.sessions.iter().map(|s| s.expires).max(), + num_sessions: l.sessions.len(), + } + } + + /// Waits for a `TEARDOWN` to be attempted on all stale sessions that exist + /// as of when this method is called, returning an error if any fail. + /// + /// This has no timeout other than the sessions' expiration times. The + /// caller can wrap the call in `tokio::time::timeout` for an earlier time. + /// + /// Currently on `Session::drop`, a `TEARDOWN` is started in the background. + /// This method waits for that to conclude. It doesn't attempt any new + /// `TEARDOWN` requests, even if called repeatedly. This may change. + /// + /// Ignores the case #3 sessions, as it's not possible to tear them down. If + /// desired, the caller can learn of those through + /// [`SessionGroup::stale_sessions`] and sleep until they expire. + pub async fn teardown(&self) -> Result<(), Error> { + let mut watches: Vec<_>; + { + let mut l = self.0.lock().unwrap(); + l.prune(tokio::time::Instant::now()); + watches = l + .sessions + .iter() + .filter_map(|s| s.teardown_receiver.clone()) + .collect(); + } + + let mut overall_result = Ok(()); + for w in &mut watches { + let mut r = (*w.borrow_and_update()).clone(); + + if r.is_none() { + // An attempt hasn't finished yet. Wait for it. + w.changed() + .await + .expect("teardown Sender shouldn't be dropped"); + r = (*w.borrow()).clone(); + } + + // Now an attempt has finished, success or failure. + let r = r.expect("teardown result should be populated after change"); + overall_result = overall_result.and(r); + } + + overall_result.map_err(|description| wrap!(ErrorInt::Teardown(description))) + } + + /// Notes an unexpected RTSP interleaved data message. + /// + /// This is assumed to be due to a live555 RTP/AVP/TCP session that belonged + /// to a since-closed RTSP connection. If there's no known session which + /// explains this, adds an unknown session with live555's default timeout, + /// 65 seconds. + fn note_stale_live555_data(&self) { + let mut lock = self.0.lock().unwrap(); + let now = tokio::time::Instant::now(); + lock.prune(now); + for s in &lock.sessions { + if s.is_tcp { + // This session plausibly explains the packet. + // (We could go so far as to examine the data packet's SSRC to + // see if it matches one associated with this session. But + // retrying once per expiration is probably good enough.) + return; + } + } + lock.sessions.push(StaleSession { + // The caller *might* have a better guess than 65 seconds via a + // SETUP response, but it's also possible for + // note_stale_live555_data to be called prior to SETUP. + expires: tokio::time::Instant::now() + std::time::Duration::from_secs(65), + teardown_receiver: None, + is_tcp: true, + id: None, + }); + } +} + +impl SessionGroupInner { + /// Removes expired sessions. + fn prune(&mut self, now: tokio::time::Instant) { + self.sessions.retain(|session| session.expires > now); + } +} + /// Policy for handling the `rtptime` parameter normally seem in the `RTP-Info` header. /// This parameter is used to map each stream's RTP timestamp to NPT ("normal play time"), /// allowing multiple streams to be played in sync. @@ -96,6 +261,7 @@ pub struct SessionOptions { creds: Option, user_agent: Option>, transport: Transport, + session_group: Option>, } #[derive(Copy, Clone, Debug)] @@ -165,6 +331,11 @@ impl SessionOptions { self.transport = transport; self } + + pub fn session_group(mut self, session_group: Arc) -> Self { + self.session_group = Some(session_group); + self + } } /// Options which must be decided at `PLAY` time. @@ -376,19 +547,25 @@ enum ResponseMode { /// attempt to interpret them before having `RTP-Info`. Play, - /// Silently discard data messages and responses to the given keepalive - /// while awaiting the response to this request. - Teardown { keepalive_cseq: Option }, + /// Discard data messages and unrelated responses while awaiting the + /// response to this request. + Teardown, } /// An RTSP session, or a connection that may be used in a proscriptive way. /// See discussion at [State]. pub struct Session(Pin>, S); -#[pin_project] +#[pin_project(PinnedDrop)] struct SessionInner { - // TODO: allow this to be closed and reopened during a UDP session? - conn: RtspConnection, + /// The connection. Currently there's expected to always be a RTSP + /// connection, even while playing a RTP/AVP/UDP session. The only + /// exception is during drop. + conn: Option, + + /// A handle to the tokio runtime which created this session. It will + /// be used to asynchronously send a `TEARDOWN` on drop. + runtime_handle: Option, options: SessionOptions, requested_auth: Option, @@ -397,7 +574,7 @@ struct SessionInner { /// This will be set iff one or more `SETUP` calls have been issued. /// This is sometimes true in state `Described` and always true in state /// `Playing`. - session_id: Option>, + session: Option, // Keep some information about the DESCRIBE response. If a depacketizer // couldn't be constructed correctly for one or more streams, this will be @@ -413,6 +590,9 @@ struct SessionInner { keepalive_timer: Option>>, + maybe_playing: bool, + has_live555_tcp_bug: bool, + /// The index within `presentation.streams` to start the next poll at. /// Round-robining between them rather than always starting at 0 should /// prevent one stream from starving the others. @@ -485,14 +665,9 @@ impl RtspConnection { if response_cseq == cseq { break (r, msg_ctx); } - if let ResponseMode::Teardown { - keepalive_cseq: Some(k), - } = mode - { - if response_cseq == k { - debug!("ignoring keepalive {} response during TEARDOWN", k); - continue; - } + if matches!(mode, ResponseMode::Teardown) { + debug!("ignoring unrelated response during TEARDOWN"); + continue; } format!("{} response with CSeq {}", r.reason_phrase(), response_cseq) } else { @@ -517,6 +692,11 @@ impl RtspConnection { continue; } } + + if let Some(session_group) = options.session_group.as_ref() { + session_group.note_stale_live555_data(); + } + format!( "{}-byte interleaved data message on channel {}", d.len(), @@ -662,6 +842,8 @@ impl Session { /// depacketized correctly. If those streams are setup via /// `Session::setup`, the erorrs in question will be ultimately /// returned from `Stream::demuxed`. + /// + /// Expects to be called from a tokio runtime. pub async fn describe(url: Url, options: SessionOptions) -> Result { let conn = RtspConnection::connect(&url).await?; Self::describe_with_conn(conn, options, url).await @@ -695,18 +877,26 @@ impl Session { description, }) })?; + let has_live555_tcp_bug = presentation + .tool + .as_deref() + .map(has_live555_tcp_bug) + .unwrap_or(false); Ok(Session( Box::pin(SessionInner { - conn, + conn: Some(conn), options, + runtime_handle: tokio::runtime::Handle::try_current().ok(), requested_auth, presentation, - session_id: None, + session: None, describe_ctx: msg_ctx, describe_cseq: cseq, describe_status: response.status(), keepalive_state: KeepaliveState::Idle, keepalive_timer: None, + maybe_playing: false, + has_live555_tcp_bug, udp_next_poll_i: 0, }), Described(()), @@ -730,7 +920,7 @@ impl Session { let inner = &mut self.0.as_mut().project(); let presentation = &mut inner.presentation; let options = &inner.options; - let conn = &mut inner.conn; + let conn = inner.conn.as_mut().unwrap(); let stream = &mut presentation.streams[stream_i]; if !matches!(stream.state, StreamState::Uninit) { bail!(ErrorInt::FailedPrecondition("stream already set up".into())); @@ -784,8 +974,8 @@ impl Session { ); } } - if let Some(ref s) = inner.session_id { - req = req.header(rtsp_types::headers::SESSION, s.to_string()); + if let Some(ref s) = inner.session { + req = req.header(rtsp_types::headers::SESSION, s.id.to_string()); } let (msg_ctx, cseq, response) = conn .send( @@ -808,22 +998,22 @@ impl Session { description, }) })?; - match inner.session_id.as_ref() { - Some(old) if old.as_ref() != response.session_id => { + match inner.session.as_ref() { + Some(SessionHeader { id, .. }) if id.as_ref() != &*response.session.id => { bail!(ErrorInt::RtspResponseError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Setup, cseq, status, description: format!( "session id changed from {:?} to {:?}", - old, response.session_id, + id, response.session.id, ), }); } Some(_) => {} - None => *inner.session_id = Some(response.session_id.into()), + None => *inner.session = Some(response.session), }; let conn_ctx = conn.inner.ctx(); match options.transport { @@ -831,7 +1021,7 @@ impl Session { let channel_id = match response.channel_id { Some(id) => id, None => bail!(ErrorInt::RtspResponseError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Setup, cseq, @@ -904,15 +1094,23 @@ impl Session { /// /// The presentation must support aggregate control, as defined in [RFC 2326 /// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3). + /// + /// When a `Session` is dropped—including when `play` returns + /// error, a `TEARDOWN` request may be necessary. This will happen in the + /// background. The caller may need to wait for the session to be + /// successfully destroyed, for example before exiting the program or before + /// starting another session. See [`SessionOptions::session_group`] and + /// [`SessionGroup::teardown`]. pub async fn play(mut self, policy: PlayOptions) -> Result, Error> { let inner = self.0.as_mut().project(); - let session_id = inner.session_id.as_deref().ok_or_else(|| { + let conn = inner.conn.as_mut().unwrap(); + let session = inner.session.as_ref().ok_or_else(|| { wrap!(ErrorInt::FailedPrecondition( "must SETUP before PLAY".into() )) })?; if let Some(tool) = inner.presentation.tool.as_deref() { - if matches!(inner.options.transport, Transport::Tcp) && has_live555_tcp_bug(tool) { + if matches!(inner.options.transport, Transport::Tcp) && *inner.has_live555_tcp_bug { warn!( "Connecting via TCP to known-broken RTSP server {:?}. \ See . \ @@ -922,23 +1120,23 @@ impl Session { } } - trace!("PLAY with channel mappings: {:#?}", &inner.conn.channels); - let (msg_ctx, cseq, response) = inner - .conn + trace!("PLAY with channel mappings: {:#?}", &conn.channels); + *inner.maybe_playing = true; + let (msg_ctx, cseq, response) = conn .send( ResponseMode::Play, &inner.options, inner.requested_auth, &mut rtsp_types::Request::builder(Method::Play, rtsp_types::Version::V1_0) .request_uri(inner.presentation.control.clone()) - .header(rtsp_types::headers::SESSION, session_id.clone()) + .header(rtsp_types::headers::SESSION, &*session.id) .header(rtsp_types::headers::RANGE, "npt=0.000-".to_owned()) .build(Bytes::new()), ) .await?; parse::parse_play(&response, inner.presentation).map_err(|description| { wrap!(ErrorInt::RtspResponseError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Play, cseq, @@ -978,7 +1176,7 @@ impl Session { { if initial_rtptime.is_none() { bail!(ErrorInt::RtspResponseError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Play, cseq, @@ -1007,7 +1205,7 @@ impl Session { } o => o, }; - let conn_ctx = inner.conn.inner.ctx(); + let conn_ctx = conn.inner.ctx(); s.state = StreamState::Playing { timeline: Timeline::new( initial_rtptime, @@ -1097,7 +1295,7 @@ impl Session { if matches!(s.state, StreamState::Playing { .. }) { if let Err(ref description) = s.depacketizer { bail!(ErrorInt::RtspResponseError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *inner.conn.as_ref().unwrap().inner.ctx(), msg_ctx: *inner.describe_ctx, method: rtsp_types::Method::Describe, cseq: *inner.describe_cseq, @@ -1113,45 +1311,37 @@ impl Session { }) } - /// Tears down the session. + /// Sends a `TEARDOWN`, ending the session. /// - /// This attempts to send a `TEARDOWN` request to end the session: - /// * When using UDP, this signals the server to end the data stream - /// promptly rather than wait for timeout. - /// * Even when using TCP, some old versions of `live555` servers will - /// incorrectly keep trying to send packets to this connection, burning - /// CPU until timeout, and potentially sending packets to a freshly - /// opened connection that happens to claim the same file descriptor - /// number. - /// - /// Discards any RTSP interleaved data messages received on the socket - /// while waiting for `TEARDOWN` response. - /// - /// Currently closes the socket(s) immediately after `TEARDOWN` response, - /// even on failure. This behavior may change in a future release. + /// Note relying on this method to tear down the session misses some cases + /// in which a `TEARDOWN` is necessary: + /// * `Session::play` may fail parsing the response. It + /// consumes the session, so calling this method is not possible. + /// * `Session::demuxed` similarly consumes the session. + /// See [`SessionOptions::session_group`] and [`SessionGroup::teardown`] + /// for a more robust mechanism. pub async fn teardown(mut self) -> Result<(), Error> { let inner = self.0.as_mut().project(); let mut req = rtsp_types::Request::builder(Method::Teardown, rtsp_types::Version::V1_0) .request_uri(inner.presentation.base_url.clone()) .header( rtsp_types::headers::SESSION, - inner.session_id.as_deref().unwrap().to_string(), + inner.session.as_ref().unwrap().id.to_string(), ) .build(Bytes::new()); - let keepalive_cseq = match inner.keepalive_state { - KeepaliveState::Idle => None, - KeepaliveState::Flushing(cseq) => Some(*cseq), - KeepaliveState::Waiting(cseq) => Some(*cseq), - }; inner .conn + .as_mut() + .unwrap() .send( - ResponseMode::Teardown { keepalive_cseq }, + ResponseMode::Teardown, inner.options, inner.requested_auth, &mut req, ) .await?; + *inner.session = None; + *inner.maybe_playing = false; Ok(()) } @@ -1164,10 +1354,11 @@ impl Session { cx: &mut std::task::Context<'_>, ) -> Result<(), Error> { let inner = self.0.as_mut().project(); + let conn = inner.conn.as_mut().unwrap(); // Expect the previous keepalive request to have finished. match inner.keepalive_state { KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), source: std::io::Error::new( std::io::ErrorKind::TimedOut, format!( @@ -1177,8 +1368,8 @@ impl Session { ), }), KeepaliveState::Waiting(cseq) => bail!(ErrorInt::RtspReadError { - conn_ctx: *inner.conn.inner.ctx(), - msg_ctx: inner.conn.inner.eof_ctx(), + conn_ctx: *conn.inner.ctx(), + msg_ctx: conn.inner.eof_ctx(), source: std::io::Error::new( std::io::ErrorKind::TimedOut, format!( @@ -1192,7 +1383,7 @@ impl Session { // Currently the only outbound data should be keepalives, and the previous one // has already been flushed, so there's no reason the Sink shouldn't be ready. - if matches!(inner.conn.inner.poll_ready_unpin(cx), Poll::Pending) { + if matches!(conn.inner.poll_ready_unpin(cx), Poll::Pending) { bail!(ErrorInt::Internal( "Unexpectedly not ready to send keepalive".into() )); @@ -1201,20 +1392,16 @@ impl Session { // Send a new one and reset the timer. // Use a SET_PARAMETER with no body for keepalives, as recommended in the // ONVIF Streaming Specification version version 21.06 section 5.2.2.2. - let session_id = inner.session_id.as_deref().unwrap(); + let session_id = &*inner.session.as_ref().unwrap().id; let mut req = rtsp_types::Request::builder(Method::SetParameter, rtsp_types::Version::V1_0) .request_uri(inner.presentation.base_url.clone()) .header(rtsp_types::headers::SESSION, session_id.to_string()) .build(Bytes::new()); - let cseq = inner - .conn - .fill_req(inner.options, inner.requested_auth, &mut req)?; - inner - .conn - .inner + let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?; + conn.inner .start_send_unpin(rtsp_types::Message::Request(req)) .expect("encoding is infallible"); - *inner.keepalive_state = match inner.conn.inner.poll_flush_unpin(cx) { + *inner.keepalive_state = match conn.inner.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq), Poll::Ready(Err(e)) => bail!(e), Poll::Pending => KeepaliveState::Flushing(cseq), @@ -1245,7 +1432,7 @@ impl Session { // The only response we expect in this state is to our keepalive request. bail!(ErrorInt::RtspFramingError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *inner.conn.as_ref().unwrap().inner.ctx(), msg_ctx: *msg_ctx, description: format!("Unexpected RTSP response {:#?}", response), }) @@ -1257,15 +1444,16 @@ impl Session { data: rtsp_types::Data, ) -> Result, Error> { let inner = self.0.as_mut().project(); + let conn = inner.conn.as_ref().unwrap(); let channel_id = data.channel_id(); let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp { msg_ctx: *msg_ctx, channel_id, }); - let m = match inner.conn.channels.lookup(channel_id) { + let m = match conn.channels.lookup(channel_id) { Some(m) => m, None => bail!(ErrorInt::RtspUnassignedChannelError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), msg_ctx: *msg_ctx, channel_id, }), @@ -1284,17 +1472,23 @@ impl Session { match m.channel_type { ChannelType::Rtp => Ok(rtp_handler.rtp( &inner.options, - inner.conn.inner.ctx(), + conn.inner.ctx(), &pkt_ctx, &mut timeline, m.stream_i, data.into_body(), )?), ChannelType::Rtcp => { - match rtp_handler.rtcp(&pkt_ctx, &mut timeline, m.stream_i, data.into_body()) { + match rtp_handler.rtcp( + &inner.options, + &pkt_ctx, + &mut timeline, + m.stream_i, + data.into_body(), + ) { Ok(p) => Ok(p), Err(description) => Err(wrap!(ErrorInt::PacketError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx: *conn.inner.ctx(), pkt_ctx: pkt_ctx, stream_id: m.stream_i, description, @@ -1317,6 +1511,7 @@ impl Session { ) -> Poll>> { debug_assert!(buf.filled().is_empty()); let inner = self.0.as_mut().project(); + let conn_ctx = *inner.conn.as_ref().unwrap().inner.ctx(); let s = &mut inner.presentation.streams[i]; if let Some(sockets) = &mut s.sockets { let (mut timeline, rtp_handler) = match &mut s.state { @@ -1337,12 +1532,12 @@ impl Session { match r { Ok(()) => { let msg = Bytes::copy_from_slice(buf.filled()); - match rtp_handler.rtcp(&pkt_ctx, &mut timeline, i, msg) { + match rtp_handler.rtcp(&inner.options, &pkt_ctx, &mut timeline, i, msg) { Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), Ok(None) => buf.clear(), Err(description) => { return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx, pkt_ctx, stream_id: i, description, @@ -1352,7 +1547,7 @@ impl Session { } Err(source) => { return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx, pkt_ctx, source, })))) @@ -1371,7 +1566,7 @@ impl Session { let msg = Bytes::copy_from_slice(buf.filled()); match rtp_handler.rtp( &inner.options, - inner.conn.inner.ctx(), + &conn_ctx, &pkt_ctx, &mut timeline, i, @@ -1384,7 +1579,7 @@ impl Session { } Err(source) => { return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { - conn_ctx: *inner.conn.inner.ctx(), + conn_ctx, pkt_ctx, source, })))) @@ -1430,6 +1625,111 @@ impl Session { } } +#[pin_project::pinned_drop] +impl PinnedDrop for SessionInner { + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + + let is_tcp = matches!(this.options.transport, Transport::Tcp); + if !*this.maybe_playing || (is_tcp && !*this.has_live555_tcp_bug) { + // No TEARDOWN is necessary. + return; + } + + let session = match this.session.take() { + Some(s) => s, + None => { + log::warn!("Session::drop: maybe_playing set without a session id"); + return; + } + }; + + // For now, assume the whole timeout is left. + let expires = tokio::time::Instant::now() + + std::time::Duration::from_secs(session.timeout_sec.into()); + + // Track the session, if there is a group. + let (teardown_sender, teardown_receiver) = tokio::sync::watch::channel(None); + if let Some(session_group) = this.options.session_group.as_ref() { + let mut lock = session_group.0.lock().unwrap(); + lock.sessions.push(StaleSession { + expires: expires.clone(), + teardown_receiver: Some(teardown_receiver), + is_tcp, + id: Some(session.id.clone()), + }); + } + + let handle = match this.runtime_handle.take() { + Some(h) => h, + None => { + const MSG: &str = "Unable to start async TEARDOWN because describe wasn't called \ + from a tokio runtime"; + log::warn!("{}", MSG); + let _ = teardown_sender.send(Some(Err(MSG.into()))); + return; + } + }; + + handle.spawn(background_teardown( + this.presentation.base_url.clone(), + session.id, + std::mem::take(this.options), + this.requested_auth.take(), + this.conn.take().unwrap(), + teardown_sender, + expires, + )); + } +} + +async fn background_teardown( + base_url: Url, + session_id: Box, + options: SessionOptions, + mut requested_auth: Option, + mut conn: RtspConnection, + sender: tokio::sync::watch::Sender>>, + expires: tokio::time::Instant, +) { + let mut req = rtsp_types::Request::builder(Method::Teardown, rtsp_types::Version::V1_0) + .request_uri(base_url) + .header(rtsp_types::headers::SESSION, session_id.to_string()) + .build(Bytes::new()); + let r = match tokio::time::timeout_at( + expires, + conn.send( + ResponseMode::Teardown, + &options, + &mut requested_auth, + &mut req, + ), + ) + .await + { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => Err(e.to_string()), + Err(_) => Err("unable to complete TEARDOWN before session expiration".to_owned()), + }; + log::debug!("Background TEARDOWN: {:?}", &r); + if let (Some(ref mut session_group), Ok(_)) = (options.session_group, &r) { + let mut l = session_group.0.lock().unwrap(); + let i = l + .sessions + .iter() + .position(|s| matches!(&s.id, Some(id) if &**id == &*session_id)); + match i { + Some(i) => { + l.sessions.swap_remove(i); + } + None => log::warn!("Unable to find session {:?} on TEARDOWN", &*session_id), + } + } + + // In the most common case, the send will fail because all receivers have been dropped. + let _ = sender.send(Some(r)); +} + impl futures::Stream for Session { type Item = Result; @@ -1441,7 +1741,7 @@ impl futures::Stream for Session { // First try receiving data on the RTSP connection. Let this starve // sending keepalives; if we can't keep up, the server should // probably drop us. - match Pin::new(&mut self.0.conn.inner).poll_next(cx) { + match Pin::new(&mut self.0.conn.as_mut().unwrap().inner).poll_next(cx) { Poll::Ready(Some(Ok(msg))) => match msg.msg { rtsp_types::Message::Data(data) => { match self.as_mut().handle_data(&msg.ctx, data) { @@ -1484,7 +1784,7 @@ impl futures::Stream for Session { // Then finish flushing the current keepalive if necessary. if let KeepaliveState::Flushing(cseq) = self.0.keepalive_state { - match self.0.conn.inner.poll_flush_unpin(cx) { + match self.0.conn.as_mut().unwrap().inner.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => self.0.keepalive_state = KeepaliveState::Waiting(cseq), Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Box::new(e))))), Poll::Pending => {} @@ -1541,7 +1841,7 @@ impl futures::Stream for Demuxed { Ok(d) => d, Err(_) => unreachable!("depacketizer was Ok"), }; - let conn_ctx = inner.conn.inner.ctx(); + let conn_ctx = inner.conn.as_ref().unwrap().inner.ctx(); if let Some(p) = pkt { let pkt_ctx = p.ctx; let stream_id = p.stream_id; diff --git a/src/client/parse.rs b/src/client/parse.rs index cba837c..7d56d91 100644 --- a/src/client/parse.rs +++ b/src/client/parse.rs @@ -438,8 +438,14 @@ pub(crate) fn parse_describe( }) } -pub(crate) struct SetupResponse<'a> { - pub(crate) session_id: &'a str, +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct SessionHeader { + pub(crate) id: Box, + pub(crate) timeout_sec: u32, +} + +pub(crate) struct SetupResponse { + pub(crate) session: SessionHeader, pub(crate) ssrc: Option, pub(crate) channel_id: Option, pub(crate) source: Option, @@ -451,12 +457,27 @@ pub(crate) struct SetupResponse<'a> { /// Returns an assigned interleaved channel id (implying the next channel id /// is also assigned) or errors. pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result { + // https://datatracker.ietf.org/doc/html/rfc2326#section-12.37 let session = response .header(&rtsp_types::headers::SESSION) .ok_or_else(|| "Missing Session header".to_string())?; - let session_id = match session.as_str().find(';') { - None => session.as_str(), - Some(i) => &session.as_str()[..i], + let session = match session.as_str().split_once(';') { + None => SessionHeader { + id: session.as_str().into(), + timeout_sec: 60, // default + }, + Some((id, timeout_str)) => { + if let Some(v) = timeout_str.trim().strip_prefix("timeout=") { + let timeout_sec = + u32::from_str_radix(v, 10).map_err(|_| format!("Unparseable timeout {}", v))?; + SessionHeader { + id: id.into(), + timeout_sec, + } + } else { + return Err(format!("Unparseable Session header {:?}", session.as_str())); + } + } }; let transport = response .header(&rtsp_types::headers::TRANSPORT) @@ -504,7 +525,7 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result), + + /// Silly kind used by `SessionGroup::teardown`. + /// + /// TODO: Currently the teardown process needs to clone its `Result`, and + /// `ErrorInt` isn't cloneable due to `Internal` above. We should come up + /// with some more satisfactory solution. Maybe `RtspConnection::send` + /// should return a more restricted error type which is cloneable. + #[error("{0}")] + Teardown(String), }