From d8aa4c65655fb1af36d25f8c3b855b843cdd5b09 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 19 Aug 2021 12:56:54 -0700 Subject: [PATCH] workaround for Reolink spurious data Fixes #17 I'm accumulating unit testing debt, but I think I'd rather get this quickly out in the field to test against scenarios I haven't anticipated than get my unit tests in order for scenarios I have anticiapted. --- CHANGELOG.md | 8 ++ Cargo.lock | 2 +- Cargo.toml | 2 +- benches/client.rs | 9 +- benches/depacketize.rs | 3 +- examples/client/metadata.rs | 10 ++- examples/client/mp4.rs | 14 ++- src/client/mod.rs | 169 +++++++++++++++++++++++++++--------- src/client/rtp.rs | 82 ++++++++++++----- 9 files changed, 227 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0efc11d..9c84f91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## `v0.2.0` (unreleased) + +* BREAKING CHANGE: `retina::client::Session::describe` now takes a new + `options: SessionOptions`. The `creds` has moved into the `options`, along + with some new options. +* BREAKING CHANGE: renamed `PlayPolicy` to `PlayOptions` for consistency. +* Added options to work around bugs found in Reolink cameras. + ## v0.1.0 (2021-08-13) * use `SET_PARAMETERS` rather than `GET_PARAMETERS` for keepalives. diff --git a/Cargo.lock b/Cargo.lock index 874a08e..e1a4eba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -989,7 +989,7 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" [[package]] name = "retina" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "base64", diff --git a/Cargo.toml b/Cargo.toml index 8a659a3..a455006 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "retina" -version = "0.1.0" +version = "0.2.0" authors = ["Scott Lamb "] license = "MIT/Apache-2.0" edition = "2018" diff --git a/benches/client.rs b/benches/client.rs index da142cb..3a712c5 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -8,7 +8,7 @@ use std::{io::ErrorKind, net::SocketAddr, num::NonZeroU32}; use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use futures::StreamExt; -use retina::{client::PlayPolicy, codec::CodecItem}; +use retina::{client::PlayOptions, codec::CodecItem}; use std::convert::TryFrom; use tokio::io::AsyncWriteExt; use url::Url; @@ -107,10 +107,13 @@ fn make_test_data(max_payload_size: u16) -> Bytes { async fn read_to_eof(addr: SocketAddr) { let url = Url::parse(&format!("rtsp://{}/", &addr)).unwrap(); - let mut session = retina::client::Session::describe(url, None).await.unwrap(); + let mut session = + retina::client::Session::describe(url, retina::client::SessionOptions::default()) + .await + .unwrap(); session.setup(0).await.unwrap(); let session = session - .play(PlayPolicy::default()) + .play(PlayOptions::default()) .await .unwrap() .demuxed() diff --git a/benches/depacketize.rs b/benches/depacketize.rs index 1f209f0..6750edc 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -48,6 +48,7 @@ fn h264_aac ()>(mut f: F) { _ => unreachable!(), }; let pkt = match rtps[stream_id].rtp( + &retina::client::SessionOptions::default(), &conn_ctx, &msg_ctx, &mut timelines[stream_id], @@ -55,7 +56,7 @@ fn h264_aac ()>(mut f: F) { stream_id, data, ) { - Ok(retina::client::PacketItem::RtpPacket(rtp)) => rtp, + Ok(Some(retina::client::PacketItem::RtpPacket(rtp))) => rtp, _ => unreachable!(), }; depacketizers[stream_id].push(pkt).unwrap(); diff --git a/examples/client/metadata.rs b/examples/client/metadata.rs index cba563a..1512392 100644 --- a/examples/client/metadata.rs +++ b/examples/client/metadata.rs @@ -16,7 +16,13 @@ pub async fn run(opts: Opts) -> Result<(), Error> { let stop = tokio::signal::ctrl_c(); let creds = super::creds(opts.src.username, opts.src.password); - let mut session = retina::client::Session::describe(opts.src.url, creds).await?; + let mut session = retina::client::Session::describe( + opts.src.url, + retina::client::SessionOptions::default() + .creds(creds) + .user_agent("Retina metadata example".to_owned()), + ) + .await?; let onvif_stream_i = session .streams() .iter() @@ -24,7 +30,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { .ok_or_else(|| anyhow!("couldn't find onvif stream"))?; session.setup(onvif_stream_i).await?; let session = session - .play(retina::client::PlayPolicy::default().ignore_zero_seq(true)) + .play(retina::client::PlayOptions::default().ignore_zero_seq(true)) .await? .demuxed()?; diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index be0cc45..799318d 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -43,6 +43,9 @@ pub struct Opts { #[structopt(long)] no_audio: bool, + #[structopt(long)] + ignore_spurious_data: bool, + #[structopt(parse(try_from_str))] out: PathBuf, } @@ -534,7 +537,14 @@ impl Mp4Writer { pub async fn run(opts: Opts) -> Result<(), Error> { let creds = super::creds(opts.src.username, opts.src.password); let stop = tokio::signal::ctrl_c(); - let mut session = retina::client::Session::describe(opts.src.url, creds).await?; + let mut session = retina::client::Session::describe( + opts.src.url, + retina::client::SessionOptions::default() + .creds(creds) + .user_agent("Retina mp4 example".to_owned()) + .ignore_spurious_data(opts.ignore_spurious_data), + ) + .await?; let video_stream = if !opts.no_video { session .streams() @@ -567,7 +577,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { } let session = session .play( - retina::client::PlayPolicy::default() + retina::client::PlayOptions::default() .initial_timestamp(opts.initial_timestamp) .enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()), ) diff --git a/src/client/mod.rs b/src/client/mod.rs index 536175e..2ff6677 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -85,17 +85,70 @@ impl std::str::FromStr for InitialTimestampPolicy { } } -/// Policy decisions to make on `PLAY`. +/// Options which must be known right as a session is created. +/// +/// Decisions which can be deferred are in [PlayOptions] instead. +#[derive(Default)] +pub struct SessionOptions { + creds: Option, + user_agent: String, + ignore_spurious_data: bool, +} + +impl SessionOptions { + /// Ignores RTSP interleaved data packets for channels that aren't assigned, + /// aren't in PLAY state, or already have a different SSRC in use. + /// + /// This still assumes that for assigned channels, the packet's protocol + /// (RTP or RTCP) matches the assignment. All known RTSP implementations + /// only use RTP on even channels and RTCP on odd channels, so this seems + /// reasonably safe. + /// + /// ``ignore_spurious_data` is necessary with some Reolink cameras for at + /// least two reasons: + /// * Reolink RLC-410 (IPC_3816M) firmware version v2.0.0.1441_19032101: + /// the camera sent interleaved data that apparently belonged to a + /// previous RTSP session. This happened immediately on connection + /// establishment and continued for some time after the following PLAY + /// request. + /// * Reolink RLC-822A (IPC_523128M8MP) firmware v3.0.0.124_20112601): + /// the camera sent RTCP SR packets immediately *before* its PLAY + /// response rather than afterward. It's easiest to treat them similarly + /// to the above case and discard them. (An alternative workaround would + /// be to buffer them until after Retina has examined the server's + /// PLAY response.) + /// + /// Currently each packet is logged at debug priority. This may change. + pub fn ignore_spurious_data(mut self, ignore_spurious_data: bool) -> Self { + self.ignore_spurious_data = ignore_spurious_data; + self + } + + /// Use the given credentials when/if the server requests digest authentication. + pub fn creds(mut self, creds: Option) -> Self { + self.creds = creds; + self + } + + /// Sends the given user agent string with each request. + pub fn user_agent(mut self, user_agent: String) -> Self { + self.user_agent = user_agent; + self + } +} + +/// Options which must be decided at `PLAY` time. /// /// These are mostly adjustments for non-compliant server implementations. +/// See also [SessionOptions] for options which must be decided earlier. #[derive(Default)] -pub struct PlayPolicy { +pub struct PlayOptions { initial_timestamp: InitialTimestampPolicy, ignore_zero_seq: bool, enforce_timestamps_with_max_jump_secs: Option, } -impl PlayPolicy { +impl PlayOptions { pub fn initial_timestamp(self, initial_timestamp: InitialTimestampPolicy) -> Self { Self { initial_timestamp, @@ -291,10 +344,9 @@ impl State for Playing {} /// The raw connection, without tracking session state. struct RtspConnection { - creds: Option, + options: SessionOptions, requested_auth: Option, inner: crate::tokio::Connection, - user_agent: String, /// The next `CSeq` header value to use when sending an RTSP request. next_cseq: u32, @@ -311,7 +363,7 @@ pub struct Session { } impl RtspConnection { - async fn connect(url: &Url, creds: Option) -> Result { + async fn connect(url: &Url, options: SessionOptions) -> Result { let host = RtspConnection::validate_url(url).map_err(|e| wrap!(ErrorInt::InvalidArgument(e)))?; let port = url.port().unwrap_or(554); @@ -320,9 +372,8 @@ impl RtspConnection { .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; Ok(Self { inner, - creds, + options, requested_auth: None, - user_agent: "moonfire-rtsp test".to_string(), next_cseq: 1, }) } @@ -357,23 +408,40 @@ impl RtspConnection { .await .map_err(|e| wrap!(e))?; let method: &str = req.method().into(); - let msg = self.inner.next().await.unwrap_or_else(|| { - bail!(ErrorInt::ReadError { - conn_ctx: *self.inner.ctx(), - msg_ctx: self.inner.eof_ctx(), - source: std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!("EOF while expecting reply to {} CSeq {}", method, cseq), - ), - }) - })?; - let resp = match msg.msg { - rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => r, - o => bail!(ErrorInt::RtspFramingError { - conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, - description: format!("Expected reply to {} CSeq {}, got {:?}", method, cseq, o,), - }), + let (resp, msg_ctx) = loop { + let msg = self.inner.next().await.unwrap_or_else(|| { + bail!(ErrorInt::ReadError { + conn_ctx: *self.inner.ctx(), + msg_ctx: self.inner.eof_ctx(), + source: std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!("EOF while expecting reply to {} CSeq {}", method, cseq), + ), + }) + })?; + match msg.msg { + rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => { + break (r, msg.ctx) + } + rtsp_types::Message::Data(d) if self.options.ignore_spurious_data => { + debug!( + "ignoring interleaved data message on channel {} while waiting \ + for reply to {} CSeq {}", + d.channel_id(), + method, + cseq + ); + continue; + } + o => bail!(ErrorInt::RtspFramingError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + description: format!( + "Expected reply to {} CSeq {}, got {:?}", + method, cseq, o, + ), + }), + }; }; if resp.status() == rtsp_types::StatusCode::Unauthorized { if self.requested_auth.is_some() { @@ -381,7 +449,7 @@ impl RtspConnection { // In that case, we should retry rather than returning error. bail!(ErrorInt::RtspResponseError { conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, + msg_ctx, method: req.method().clone(), cseq, status: resp.status(), @@ -391,7 +459,7 @@ impl RtspConnection { let www_authenticate = match resp.header(&rtsp_types::headers::WWW_AUTHENTICATE) { None => bail!(ErrorInt::RtspResponseError { conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, + msg_ctx, method: req.method().clone(), cseq, status: resp.status(), @@ -405,7 +473,7 @@ impl RtspConnection { // error or not based on ordering. bail!(ErrorInt::RtspResponseError { conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, + msg_ctx, method: req.method().clone(), cseq, status: resp.status(), @@ -415,10 +483,10 @@ impl RtspConnection { ), }) } - if self.creds.is_none() { + if self.options.creds.is_none() { bail!(ErrorInt::RtspResponseError { conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, + msg_ctx, method: req.method().clone(), cseq, status: resp.status(), @@ -426,7 +494,6 @@ impl RtspConnection { .to_owned(), }) } - let msg_ctx = msg.ctx; let www_authenticate = digest_auth::WwwAuthenticateHeader::parse(www_authenticate) .map_err(|e| { wrap!(ErrorInt::RtspResponseError { @@ -446,14 +513,14 @@ impl RtspConnection { } else if !resp.status().is_success() { bail!(ErrorInt::RtspResponseError { conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, + msg_ctx, method: req.method().clone(), cseq, status: resp.status(), description: "Unexpected RTSP response status".into(), }); } - return Ok((msg.ctx, cseq, resp)); + return Ok((msg_ctx, cseq, resp)); } } @@ -463,6 +530,7 @@ impl RtspConnection { self.next_cseq += 1; if let Some(ref mut auth) = self.requested_auth { let creds = self + .options .creds .as_ref() .expect("creds were checked when filling request_auth"); @@ -484,7 +552,12 @@ impl RtspConnection { req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization); } req.insert_header(rtsp_types::headers::CSEQ, cseq.to_string()); - req.insert_header(rtsp_types::headers::USER_AGENT, self.user_agent.clone()); + if !self.options.user_agent.is_empty() { + req.insert_header( + rtsp_types::headers::USER_AGENT, + self.options.user_agent.clone(), + ); + } Ok(cseq) } } @@ -497,8 +570,8 @@ impl Session { /// depacketized correctly. If those streams are setup via /// `Session::setup`, the erorrs in question will be ultimately /// returned from `Stream::demuxed`. - pub async fn describe(url: Url, creds: Option) -> Result { - let mut conn = RtspConnection::connect(&url, creds).await?; + pub async fn describe(url: Url, options: SessionOptions) -> Result { + let mut conn = RtspConnection::connect(&url, options).await?; let mut req = rtsp_types::Request::builder(rtsp_types::Method::Describe, rtsp_types::Version::V1_0) .header(rtsp_types::headers::ACCEPT, "application/sdp") @@ -628,7 +701,7 @@ impl Session { /// /// The presentation must support aggregate control, as defined in [RFC 2326 /// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3). - pub async fn play(mut self, policy: PlayPolicy) -> Result, Error> { + pub async fn play(mut self, policy: PlayOptions) -> Result, Error> { let session_id = self.state.session_id.take().ok_or_else(|| { wrap!(ErrorInt::FailedPrecondition( "must SETUP before PLAY".into() @@ -892,6 +965,13 @@ impl Session { let channel_id = data.channel_id(); let m = match state.channels.lookup(channel_id) { Some(m) => m, + None if conn.options.ignore_spurious_data => { + log::debug!( + "Ignoring interleaved data on unassigned channel id {}", + channel_id + ); + return Ok(None); + } None => bail!(ErrorInt::RtspUnassignedChannelError { conn_ctx: *conn.inner.ctx(), msg_ctx: *msg_ctx, @@ -910,16 +990,23 @@ impl Session { ), }; match m.channel_type { - ChannelType::Rtp => Ok(Some(rtp_handler.rtp( + ChannelType::Rtp => Ok(rtp_handler.rtp( + &conn.options, conn.inner.ctx(), msg_ctx, &mut timeline, channel_id, m.stream_i, data.into_body(), - )?)), + )?), ChannelType::Rtcp => rtp_handler - .rtcp(msg_ctx, &mut timeline, m.stream_i, data.into_body()) + .rtcp( + &conn.options, + msg_ctx, + &mut timeline, + m.stream_i, + data.into_body(), + ) .map_err(|description| { wrap!(ErrorInt::RtspDataMessageError { conn_ctx: *conn.inner.ctx(), @@ -1078,6 +1165,8 @@ impl futures::Stream for Demuxed { mod tests { use super::*; + // TODO: tests of Reolink ignore_spurious_data scenarios. + // See with: cargo test -- --nocapture client::tests::print_sizes #[test] fn print_sizes() { diff --git a/src/client/rtp.rs b/src/client/rtp.rs index 21386a2..8fb9eae 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -87,13 +87,14 @@ impl StrictSequenceChecker { pub fn rtp( &mut self, + session_options: &super::SessionOptions, conn_ctx: &crate::ConnectionContext, msg_ctx: &crate::RtspMessageContext, timeline: &mut super::Timeline, channel_id: u8, stream_id: usize, mut data: Bytes, - ) -> Result { + ) -> Result, Error> { // Terrible hack to try to make sense of the GW Security GW4089IP's audio stream. // It appears to have one RTSP interleaved message wrapped in another. RTP and RTCP // packets can never start with '$', so this shouldn't interfere with well-behaved @@ -123,6 +124,47 @@ impl StrictSequenceChecker { })?; let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq. let ssrc = reader.ssrc(); + let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); + if matches!(self.ssrc, Some(s) if s != ssrc) { + if session_options.ignore_spurious_data { + log::debug!( + "Ignoring spurious RTP data with ssrc={:08x} seq={:04x} while expecting \ + ssrc={:08x?} seq={:04x?}", + ssrc, + sequence_number, + self.ssrc, + self.next_seq + ); + return Ok(None); + } else { + bail!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + msg_ctx: *msg_ctx, + channel_id, + stream_id, + ssrc, + sequence_number, + description: format!( + "Wrong ssrc; expecting ssrc={:08x?} seq={:04x?}", + self.ssrc, self.next_seq + ), + }); + } + } + if loss > 0x80_00 { + bail!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + msg_ctx: *msg_ctx, + channel_id, + stream_id, + ssrc, + sequence_number, + description: format!( + "Out-of-order packet or large loss; expecting ssrc={:08x?} seq={:04x?}", + self.ssrc, self.next_seq + ), + }); + } let timestamp = match timeline.advance_to(reader.timestamp()) { Ok(ts) => ts, Err(description) => bail!(ErrorInt::RtpPacketError { @@ -135,21 +177,6 @@ impl StrictSequenceChecker { description, }), }; - let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); - if matches!(self.ssrc, Some(s) if s != ssrc) || loss > 0x80_00 { - bail!(ErrorInt::RtpPacketError { - conn_ctx: *conn_ctx, - msg_ctx: *msg_ctx, - channel_id, - stream_id, - ssrc, - sequence_number, - description: format!( - "Expected ssrc={:08x?} seq={:04x?}", - self.ssrc, self.next_seq - ), - }); - } self.ssrc = Some(ssrc); let mark = reader.mark(); let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| { @@ -166,7 +193,7 @@ impl StrictSequenceChecker { data.truncate(payload_range.end); data.advance(payload_range.start); self.next_seq = Some(sequence_number.wrapping_add(1)); - Ok(PacketItem::RtpPacket(Packet { + Ok(Some(PacketItem::RtpPacket(Packet { ctx: *msg_ctx, channel_id, stream_id, @@ -176,11 +203,12 @@ impl StrictSequenceChecker { loss, mark, payload: data, - })) + }))) } pub fn rtcp( &mut self, + session_options: &super::SessionOptions, msg_ctx: &crate::RtspMessageContext, timeline: &mut super::Timeline, stream_id: usize, @@ -207,10 +235,20 @@ impl StrictSequenceChecker { let ssrc = pkt.ssrc(); if matches!(self.ssrc, Some(s) if s != ssrc) { - return Err(format!( - "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", - self.ssrc, ssrc - )); + if session_options.ignore_spurious_data { + log::debug!( + "Ignoring spurious RTCP data with ssrc={:08x} while \ + expecting ssrc={:08x?}", + ssrc, + self.ssrc + ); + return Ok(None); + } else { + return Err(format!( + "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", + self.ssrc, ssrc + )); + } } self.ssrc = Some(ssrc);