support ignoring msgs on unassigned RTSP channels

https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093
This commit is contained in:
Scott Lamb 2022-04-08 15:37:28 -07:00
parent ba8b00b4fd
commit 7cbe39213d
2 changed files with 147 additions and 32 deletions

View File

@ -1,3 +1,11 @@
## unreleased
* camera interop: eliminate `bad clockrate in rtpmap` errors with cameras that
(incorrectly) add trailing spaces to this SDP parameter, as described at
[scottlamb/moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issue-1190760423).
* camera interop: allow ignoring RTSP interleaved data messages on unassigned
channels, also described at [scottlamb-moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093).
## `v0.3.8` (2022-03-08) ## `v0.3.8` (2022-03-08)
* fix depacketization of fragmented AAC frames * fix depacketization of fragmented AAC frames

View File

@ -17,7 +17,7 @@ use bytes::Bytes;
use futures::{ready, Future, SinkExt, StreamExt}; use futures::{ready, Future, SinkExt, StreamExt};
use log::{debug, trace, warn}; use log::{debug, trace, warn};
use pin_project::pin_project; use pin_project::pin_project;
use rtsp_types::Method; use rtsp_types::{Data, Method};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::sync::Notify; use tokio::sync::Notify;
use url::Url; use url::Url;
@ -399,6 +399,46 @@ pub struct SessionOptions {
transport: Transport, transport: Transport,
session_group: Option<Arc<SessionGroup>>, session_group: Option<Arc<SessionGroup>>,
teardown: TeardownPolicy, teardown: TeardownPolicy,
unassigned_channel_data: UnassignedChannelDataPolicy,
}
/// Policy for handling data received on unassigned RTSP interleaved channels.
#[derive(Copy, Clone)]
pub enum UnassignedChannelDataPolicy {
/// Automatic (default).
///
/// The current policy (which may change) is as follows:
///
/// * if the server has sent a SDP `tool` attribute for which
/// [`Tool::has_live555_tcp_bug`] is true, use `AssumeStaleSession`.
/// * otherwise (prior to receiving the `DESCRIBE` response, if there was
/// no tool attribute, or if it does not match the known pattern),
/// use `Ignore`.
Auto,
/// Assume the data is due to the live555 stale TCP session bug described
/// in "Stale sessions" under [`SessionGroup`].
///
/// This session will return error, and the `SessionGroup` will track the
/// expiration of a stale session.
AssumeStaleSession,
/// Returns an error.
Error,
/// Ignores the data.
///
/// Some broken IP cameras appear to have some default assignment of streams
/// to interleaved channels. If there is no `SETUP` for that stream before
/// `PLAY`, they will send data anyway, on this channel. In this mode, such
/// data messages are ignored.
Ignore,
}
impl Default for UnassignedChannelDataPolicy {
fn default() -> Self {
Self::Auto
}
} }
/// The RTP packet transport to request. /// The RTP packet transport to request.
@ -451,7 +491,7 @@ impl std::str::FromStr for Transport {
} }
impl SessionOptions { impl SessionOptions {
/// Use the given credentials when/if the server requests digest authentication. /// Uses the given credentials when/if the server requests digest authentication.
pub fn creds(mut self, creds: Option<Credentials>) -> Self { pub fn creds(mut self, creds: Option<Credentials>) -> Self {
self.creds = creds; self.creds = creds;
self self
@ -482,6 +522,11 @@ impl SessionOptions {
self.teardown = teardown; self.teardown = teardown;
self self
} }
pub fn unassigned_channel_data(mut self, policy: UnassignedChannelDataPolicy) -> Self {
self.unassigned_channel_data = policy;
self
}
} }
/// Options which must be decided at `PLAY` time. /// Options which must be decided at `PLAY` time.
@ -781,6 +826,9 @@ struct RtspConnection {
/// The next `CSeq` header value to use when sending an RTSP request. /// The next `CSeq` header value to use when sending an RTSP request.
next_cseq: u32, next_cseq: u32,
/// If Retina has received data on an unassigned RTSP interleaved data channel.
seen_unassigned: bool,
} }
/// Mode to use in `RtspConnection::send` when looking for a response. /// Mode to use in `RtspConnection::send` when looking for a response.
@ -789,6 +837,7 @@ enum ResponseMode {
Normal, Normal,
/// Silently discard data messages on assigned channels. /// Silently discard data messages on assigned channels.
///
/// This is a workaround for recent Reolink cameras which appear to send /// This is a workaround for recent Reolink cameras which appear to send
/// RTCP sender reports immediately *before* the `PLAY` response when /// RTCP sender reports immediately *before* the `PLAY` response when
/// using interleaved data. It's simplest to discard them rather than /// using interleaved data. It's simplest to discard them rather than
@ -872,6 +921,7 @@ impl RtspConnection {
inner, inner,
channels: ChannelMappings::default(), channels: ChannelMappings::default(),
next_cseq: 1, next_cseq: 1,
seen_unassigned: false,
}) })
} }
@ -892,7 +942,7 @@ impl RtspConnection {
.ok_or_else(|| format!("Must specify host in rtsp url {}", &url)) .ok_or_else(|| format!("Must specify host in rtsp url {}", &url))
} }
/// Sends a request and expects the next message from the peer to be its response. /// Sends a request and expects an upcoming message from the peer to be its response.
/// Takes care of authorization and `CSeq`. Returns `Error` if not successful. /// Takes care of authorization and `CSeq`. Returns `Error` if not successful.
async fn send( async fn send(
&mut self, &mut self,
@ -938,7 +988,7 @@ impl RtspConnection {
} }
rtsp_types::Message::Data(d) => { rtsp_types::Message::Data(d) => {
if matches!(mode, ResponseMode::Teardown { .. }) { if matches!(mode, ResponseMode::Teardown { .. }) {
debug!("ignoring RTSP data during TEARDOWN"); debug!("ignoring RTSP interleaved data during TEARDOWN");
continue; continue;
} else if let (ResponseMode::Play, Some(m)) = } else if let (ResponseMode::Play, Some(m)) =
(&mode, self.channels.lookup(d.channel_id())) (&mode, self.channels.lookup(d.channel_id()))
@ -953,18 +1003,8 @@ impl RtspConnection {
); );
continue; continue;
} }
self.handle_unassigned_data(msg_ctx, options, tool, d)?;
note_stale_live555_data( continue;
tokio::runtime::Handle::try_current().ok(),
tool,
options,
);
format!(
"{}-byte interleaved data message on channel {}",
d.len(),
d.channel_id()
)
} }
rtsp_types::Message::Request(r) => format!("{:?} request", r.method()), rtsp_types::Message::Request(r) => format!("{:?} request", r.method()),
}; };
@ -1039,6 +1079,63 @@ impl RtspConnection {
} }
} }
/// Handles data on an unassigned RTSP channel.
fn handle_unassigned_data(
&mut self,
msg_ctx: RtspMessageContext,
options: &SessionOptions,
tool: Option<&Tool>,
data: Data<Bytes>,
) -> Result<(), Error> {
let live555 = match options.unassigned_channel_data {
UnassignedChannelDataPolicy::Auto
if tool.map(Tool::has_live555_tcp_bug).unwrap_or(false) =>
{
true
}
UnassignedChannelDataPolicy::AssumeStaleSession => true,
UnassignedChannelDataPolicy::Error => false,
UnassignedChannelDataPolicy::Ignore | UnassignedChannelDataPolicy::Auto => {
if !self.seen_unassigned {
log::warn!(
"Ignoring data on unassigned RTSP interleaved data channel {}. \
This is the first such message. Following messages will be logged \
at trace priority only.\n\n\
conn: {}\nmsg: {}\ndata: {:#?}",
data.channel_id(),
self.inner.ctx(),
&msg_ctx,
crate::hex::LimitedHex::new(data.as_slice(), 128),
);
self.seen_unassigned = true;
} else {
log::trace!(
"Ignoring data on unassigned RTSP interleaved data channel {}.\n\n\
conn: {}\nmsg: {}\ndata: {:#?}",
data.channel_id(),
self.inner.ctx(),
&msg_ctx,
crate::hex::LimitedHex::new(data.as_slice(), 128),
);
}
return Ok(());
}
};
if live555 {
note_stale_live555_data(tokio::runtime::Handle::try_current().ok(), tool, options);
}
let channel_id = data.channel_id();
let data = data.into_body();
bail!(ErrorInt::RtspUnassignedChannelError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
channel_id,
data,
});
}
/// Fills out `req` with authorization and `CSeq` headers. /// Fills out `req` with authorization and `CSeq` headers.
fn fill_req( fn fill_req(
&mut self, &mut self,
@ -1804,17 +1901,13 @@ impl Session<Playing> {
let m = match conn.channels.lookup(channel_id) { let m = match conn.channels.lookup(channel_id) {
Some(m) => m, Some(m) => m,
None => { None => {
note_stale_live555_data( conn.handle_unassigned_data(
tokio::runtime::Handle::try_current().ok(), *msg_ctx,
inner.presentation.tool.as_ref(),
inner.options, inner.options,
); inner.presentation.tool.as_ref(),
bail!(ErrorInt::RtspUnassignedChannelError { data,
conn_ctx: *conn.inner.ctx(), )?;
msg_ctx: *msg_ctx, return Ok(None);
channel_id,
data: data.into_body(),
});
} }
}; };
let stream = &mut inner.presentation.streams[m.stream_i]; let stream = &mut inner.presentation.streams[m.stream_i];
@ -2268,6 +2361,7 @@ mod tests {
inner: client, inner: client,
channels: ChannelMappings::default(), channels: ChannelMappings::default(),
next_cseq: 1, next_cseq: 1,
seen_unassigned: false,
}; };
(client, server) (client, server)
} }
@ -2318,7 +2412,9 @@ mod tests {
let (session, _) = tokio::join!( let (session, _) = tokio::join!(
Session::describe_with_conn( Session::describe_with_conn(
conn, conn,
SessionOptions::default().session_group(group.clone()), SessionOptions::default()
.session_group(group.clone())
.unassigned_channel_data(UnassignedChannelDataPolicy::Ignore),
url url
), ),
req_response( req_response(
@ -2356,7 +2452,7 @@ mod tests {
let session = session.unwrap(); let session = session.unwrap();
tokio::pin!(session); tokio::pin!(session);
// Packet. // Packets: first ignored one (unassigned channel), then one passed through.
tokio::join!( tokio::join!(
async { async {
match session.next().await { match session.next().await {
@ -2369,16 +2465,25 @@ mod tests {
} }
}, },
async { async {
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; let bad_pkt = b"data on unassigned channel";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
2,
Bytes::from_static(bad_pkt),
)))
.await
.unwrap();
let good_pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server server
.send(rtsp_types::Message::Data(rtsp_types::Data::new( .send(rtsp_types::Message::Data(rtsp_types::Data::new(
0, 0,
Bytes::from_static(pkt), Bytes::from_static(good_pkt),
))) )))
.await .await
.unwrap(); .unwrap();
}, },
); );
drop_time = tokio::time::Instant::now(); drop_time = tokio::time::Instant::now();
}; };
@ -2526,13 +2631,15 @@ mod tests {
async { async {
let e = Session::describe_with_conn( let e = Session::describe_with_conn(
conn, conn,
SessionOptions::default().session_group(group.clone()), SessionOptions::default()
.session_group(group.clone())
.unassigned_channel_data(UnassignedChannelDataPolicy::AssumeStaleSession),
url, url,
) )
.await .await
.map(|_s| ()) .map(|_s| ())
.unwrap_err(); .unwrap_err();
assert!(matches!(*e.0, ErrorInt::RtspFramingError { .. })); assert!(matches!(*e.0, ErrorInt::RtspUnassignedChannelError { .. }));
}, },
async { server.send(bogus_rtp).await.unwrap() }, async { server.send(bogus_rtp).await.unwrap() },
); );