From 7cbe39213d79f3cab5101b19433551bc7d6e63a3 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 8 Apr 2022 15:37:28 -0700 Subject: [PATCH] support ignoring msgs on unassigned RTSP channels https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093 --- CHANGELOG.md | 8 +++ src/client/mod.rs | 171 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 147 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fa2a05..3ebab05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## unreleased + +* camera interop: eliminate `bad clockrate in rtpmap` errors with cameras that + (incorrectly) add trailing spaces to this SDP parameter, as described at + [scottlamb/moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issue-1190760423). +* camera interop: allow ignoring RTSP interleaved data messages on unassigned + channels, also described at [scottlamb-moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093). + ## `v0.3.8` (2022-03-08) * fix depacketization of fragmented AAC frames diff --git a/src/client/mod.rs b/src/client/mod.rs index 148b82c..a66f8e7 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -17,7 +17,7 @@ use bytes::Bytes; use futures::{ready, Future, SinkExt, StreamExt}; use log::{debug, trace, warn}; use pin_project::pin_project; -use rtsp_types::Method; +use rtsp_types::{Data, Method}; use tokio::net::UdpSocket; use tokio::sync::Notify; use url::Url; @@ -399,6 +399,46 @@ pub struct SessionOptions { transport: Transport, session_group: Option>, teardown: TeardownPolicy, + unassigned_channel_data: UnassignedChannelDataPolicy, +} + +/// Policy for handling data received on unassigned RTSP interleaved channels. +#[derive(Copy, Clone)] +pub enum UnassignedChannelDataPolicy { + /// Automatic (default). + /// + /// The current policy (which may change) is as follows: + /// + /// * if the server has sent a SDP `tool` attribute for which + /// [`Tool::has_live555_tcp_bug`] is true, use `AssumeStaleSession`. + /// * otherwise (prior to receiving the `DESCRIBE` response, if there was + /// no tool attribute, or if it does not match the known pattern), + /// use `Ignore`. + Auto, + + /// Assume the data is due to the live555 stale TCP session bug described + /// in "Stale sessions" under [`SessionGroup`]. + /// + /// This session will return error, and the `SessionGroup` will track the + /// expiration of a stale session. + AssumeStaleSession, + + /// Returns an error. + Error, + + /// Ignores the data. + /// + /// Some broken IP cameras appear to have some default assignment of streams + /// to interleaved channels. If there is no `SETUP` for that stream before + /// `PLAY`, they will send data anyway, on this channel. In this mode, such + /// data messages are ignored. + Ignore, +} + +impl Default for UnassignedChannelDataPolicy { + fn default() -> Self { + Self::Auto + } } /// The RTP packet transport to request. @@ -451,7 +491,7 @@ impl std::str::FromStr for Transport { } impl SessionOptions { - /// Use the given credentials when/if the server requests digest authentication. + /// Uses the given credentials when/if the server requests digest authentication. pub fn creds(mut self, creds: Option) -> Self { self.creds = creds; self @@ -482,6 +522,11 @@ impl SessionOptions { self.teardown = teardown; self } + + pub fn unassigned_channel_data(mut self, policy: UnassignedChannelDataPolicy) -> Self { + self.unassigned_channel_data = policy; + self + } } /// Options which must be decided at `PLAY` time. @@ -781,6 +826,9 @@ struct RtspConnection { /// The next `CSeq` header value to use when sending an RTSP request. next_cseq: u32, + + /// If Retina has received data on an unassigned RTSP interleaved data channel. + seen_unassigned: bool, } /// Mode to use in `RtspConnection::send` when looking for a response. @@ -789,6 +837,7 @@ enum ResponseMode { Normal, /// Silently discard data messages on assigned channels. + /// /// This is a workaround for recent Reolink cameras which appear to send /// RTCP sender reports immediately *before* the `PLAY` response when /// using interleaved data. It's simplest to discard them rather than @@ -872,6 +921,7 @@ impl RtspConnection { inner, channels: ChannelMappings::default(), next_cseq: 1, + seen_unassigned: false, }) } @@ -892,7 +942,7 @@ impl RtspConnection { .ok_or_else(|| format!("Must specify host in rtsp url {}", &url)) } - /// Sends a request and expects the next message from the peer to be its response. + /// Sends a request and expects an upcoming message from the peer to be its response. /// Takes care of authorization and `CSeq`. Returns `Error` if not successful. async fn send( &mut self, @@ -938,7 +988,7 @@ impl RtspConnection { } rtsp_types::Message::Data(d) => { if matches!(mode, ResponseMode::Teardown { .. }) { - debug!("ignoring RTSP data during TEARDOWN"); + debug!("ignoring RTSP interleaved data during TEARDOWN"); continue; } else if let (ResponseMode::Play, Some(m)) = (&mode, self.channels.lookup(d.channel_id())) @@ -953,18 +1003,8 @@ impl RtspConnection { ); continue; } - - note_stale_live555_data( - tokio::runtime::Handle::try_current().ok(), - tool, - options, - ); - - format!( - "{}-byte interleaved data message on channel {}", - d.len(), - d.channel_id() - ) + self.handle_unassigned_data(msg_ctx, options, tool, d)?; + continue; } rtsp_types::Message::Request(r) => format!("{:?} request", r.method()), }; @@ -1039,6 +1079,63 @@ impl RtspConnection { } } + /// Handles data on an unassigned RTSP channel. + fn handle_unassigned_data( + &mut self, + msg_ctx: RtspMessageContext, + options: &SessionOptions, + tool: Option<&Tool>, + data: Data, + ) -> Result<(), Error> { + let live555 = match options.unassigned_channel_data { + UnassignedChannelDataPolicy::Auto + if tool.map(Tool::has_live555_tcp_bug).unwrap_or(false) => + { + true + } + UnassignedChannelDataPolicy::AssumeStaleSession => true, + UnassignedChannelDataPolicy::Error => false, + UnassignedChannelDataPolicy::Ignore | UnassignedChannelDataPolicy::Auto => { + if !self.seen_unassigned { + log::warn!( + "Ignoring data on unassigned RTSP interleaved data channel {}. \ + This is the first such message. Following messages will be logged \ + at trace priority only.\n\n\ + conn: {}\nmsg: {}\ndata: {:#?}", + data.channel_id(), + self.inner.ctx(), + &msg_ctx, + crate::hex::LimitedHex::new(data.as_slice(), 128), + ); + self.seen_unassigned = true; + } else { + log::trace!( + "Ignoring data on unassigned RTSP interleaved data channel {}.\n\n\ + conn: {}\nmsg: {}\ndata: {:#?}", + data.channel_id(), + self.inner.ctx(), + &msg_ctx, + crate::hex::LimitedHex::new(data.as_slice(), 128), + ); + } + return Ok(()); + } + }; + + if live555 { + note_stale_live555_data(tokio::runtime::Handle::try_current().ok(), tool, options); + } + + let channel_id = data.channel_id(); + let data = data.into_body(); + bail!(ErrorInt::RtspUnassignedChannelError { + conn_ctx: *self.inner.ctx(), + msg_ctx, + channel_id, + data, + }); + } + /// Fills out `req` with authorization and `CSeq` headers. fn fill_req( &mut self, @@ -1804,17 +1901,13 @@ impl Session { let m = match conn.channels.lookup(channel_id) { Some(m) => m, None => { - note_stale_live555_data( - tokio::runtime::Handle::try_current().ok(), - inner.presentation.tool.as_ref(), + conn.handle_unassigned_data( + *msg_ctx, inner.options, - ); - bail!(ErrorInt::RtspUnassignedChannelError { - conn_ctx: *conn.inner.ctx(), - msg_ctx: *msg_ctx, - channel_id, - data: data.into_body(), - }); + inner.presentation.tool.as_ref(), + data, + )?; + return Ok(None); } }; let stream = &mut inner.presentation.streams[m.stream_i]; @@ -2268,6 +2361,7 @@ mod tests { inner: client, channels: ChannelMappings::default(), next_cseq: 1, + seen_unassigned: false, }; (client, server) } @@ -2318,7 +2412,9 @@ mod tests { let (session, _) = tokio::join!( Session::describe_with_conn( conn, - SessionOptions::default().session_group(group.clone()), + SessionOptions::default() + .session_group(group.clone()) + .unassigned_channel_data(UnassignedChannelDataPolicy::Ignore), url ), req_response( @@ -2356,7 +2452,7 @@ mod tests { let session = session.unwrap(); tokio::pin!(session); - // Packet. + // Packets: first ignored one (unassigned channel), then one passed through. tokio::join!( async { match session.next().await { @@ -2369,16 +2465,25 @@ mod tests { } }, async { - let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; + let bad_pkt = b"data on unassigned channel"; + server + .send(rtsp_types::Message::Data(rtsp_types::Data::new( + 2, + Bytes::from_static(bad_pkt), + ))) + .await + .unwrap(); + let good_pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; server .send(rtsp_types::Message::Data(rtsp_types::Data::new( 0, - Bytes::from_static(pkt), + Bytes::from_static(good_pkt), ))) .await .unwrap(); }, ); + drop_time = tokio::time::Instant::now(); }; @@ -2526,13 +2631,15 @@ mod tests { async { let e = Session::describe_with_conn( conn, - SessionOptions::default().session_group(group.clone()), + SessionOptions::default() + .session_group(group.clone()) + .unassigned_channel_data(UnassignedChannelDataPolicy::AssumeStaleSession), url, ) .await .map(|_s| ()) .unwrap_err(); - assert!(matches!(*e.0, ErrorInt::RtspFramingError { .. })); + assert!(matches!(*e.0, ErrorInt::RtspUnassignedChannelError { .. })); }, async { server.send(bogus_rtp).await.unwrap() }, );