workaround for Reolink spurious data

Fixes #17

I'm accumulating unit testing debt, but I think I'd rather get this
quickly out in the field to test against scenarios I haven't anticipated
than get my unit tests in order for scenarios I have anticiapted.
This commit is contained in:
Scott Lamb 2021-08-19 12:56:54 -07:00
parent 7d2fb0def1
commit d8aa4c6565
9 changed files with 227 additions and 72 deletions

View File

@ -1,3 +1,11 @@
## `v0.2.0` (unreleased)
* BREAKING CHANGE: `retina::client::Session::describe` now takes a new
`options: SessionOptions`. The `creds` has moved into the `options`, along
with some new options.
* BREAKING CHANGE: renamed `PlayPolicy` to `PlayOptions` for consistency.
* Added options to work around bugs found in Reolink cameras.
## v0.1.0 (2021-08-13)
* use `SET_PARAMETERS` rather than `GET_PARAMETERS` for keepalives.

2
Cargo.lock generated
View File

@ -989,7 +989,7 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "retina"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"base64",

View File

@ -1,6 +1,6 @@
[package]
name = "retina"
version = "0.1.0"
version = "0.2.0"
authors = ["Scott Lamb <slamb@slamb.org>"]
license = "MIT/Apache-2.0"
edition = "2018"

View File

@ -8,7 +8,7 @@ use std::{io::ErrorKind, net::SocketAddr, num::NonZeroU32};
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use futures::StreamExt;
use retina::{client::PlayPolicy, codec::CodecItem};
use retina::{client::PlayOptions, codec::CodecItem};
use std::convert::TryFrom;
use tokio::io::AsyncWriteExt;
use url::Url;
@ -107,10 +107,13 @@ fn make_test_data(max_payload_size: u16) -> Bytes {
async fn read_to_eof(addr: SocketAddr) {
let url = Url::parse(&format!("rtsp://{}/", &addr)).unwrap();
let mut session = retina::client::Session::describe(url, None).await.unwrap();
let mut session =
retina::client::Session::describe(url, retina::client::SessionOptions::default())
.await
.unwrap();
session.setup(0).await.unwrap();
let session = session
.play(PlayPolicy::default())
.play(PlayOptions::default())
.await
.unwrap()
.demuxed()

View File

@ -48,6 +48,7 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
_ => unreachable!(),
};
let pkt = match rtps[stream_id].rtp(
&retina::client::SessionOptions::default(),
&conn_ctx,
&msg_ctx,
&mut timelines[stream_id],
@ -55,7 +56,7 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
stream_id,
data,
) {
Ok(retina::client::PacketItem::RtpPacket(rtp)) => rtp,
Ok(Some(retina::client::PacketItem::RtpPacket(rtp))) => rtp,
_ => unreachable!(),
};
depacketizers[stream_id].push(pkt).unwrap();

View File

@ -16,7 +16,13 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
let stop = tokio::signal::ctrl_c();
let creds = super::creds(opts.src.username, opts.src.password);
let mut session = retina::client::Session::describe(opts.src.url, creds).await?;
let mut session = retina::client::Session::describe(
opts.src.url,
retina::client::SessionOptions::default()
.creds(creds)
.user_agent("Retina metadata example".to_owned()),
)
.await?;
let onvif_stream_i = session
.streams()
.iter()
@ -24,7 +30,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
.ok_or_else(|| anyhow!("couldn't find onvif stream"))?;
session.setup(onvif_stream_i).await?;
let session = session
.play(retina::client::PlayPolicy::default().ignore_zero_seq(true))
.play(retina::client::PlayOptions::default().ignore_zero_seq(true))
.await?
.demuxed()?;

View File

@ -43,6 +43,9 @@ pub struct Opts {
#[structopt(long)]
no_audio: bool,
#[structopt(long)]
ignore_spurious_data: bool,
#[structopt(parse(try_from_str))]
out: PathBuf,
}
@ -534,7 +537,14 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
pub async fn run(opts: Opts) -> Result<(), Error> {
let creds = super::creds(opts.src.username, opts.src.password);
let stop = tokio::signal::ctrl_c();
let mut session = retina::client::Session::describe(opts.src.url, creds).await?;
let mut session = retina::client::Session::describe(
opts.src.url,
retina::client::SessionOptions::default()
.creds(creds)
.user_agent("Retina mp4 example".to_owned())
.ignore_spurious_data(opts.ignore_spurious_data),
)
.await?;
let video_stream = if !opts.no_video {
session
.streams()
@ -567,7 +577,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
}
let session = session
.play(
retina::client::PlayPolicy::default()
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
)

View File

@ -85,17 +85,70 @@ impl std::str::FromStr for InitialTimestampPolicy {
}
}
/// Policy decisions to make on `PLAY`.
/// Options which must be known right as a session is created.
///
/// Decisions which can be deferred are in [PlayOptions] instead.
#[derive(Default)]
pub struct SessionOptions {
creds: Option<Credentials>,
user_agent: String,
ignore_spurious_data: bool,
}
impl SessionOptions {
/// Ignores RTSP interleaved data packets for channels that aren't assigned,
/// aren't in PLAY state, or already have a different SSRC in use.
///
/// This still assumes that for assigned channels, the packet's protocol
/// (RTP or RTCP) matches the assignment. All known RTSP implementations
/// only use RTP on even channels and RTCP on odd channels, so this seems
/// reasonably safe.
///
/// ``ignore_spurious_data` is necessary with some Reolink cameras for at
/// least two reasons:
/// * Reolink RLC-410 (IPC_3816M) firmware version v2.0.0.1441_19032101:
/// the camera sent interleaved data that apparently belonged to a
/// previous RTSP session. This happened immediately on connection
/// establishment and continued for some time after the following PLAY
/// request.
/// * Reolink RLC-822A (IPC_523128M8MP) firmware v3.0.0.124_20112601):
/// the camera sent RTCP SR packets immediately *before* its PLAY
/// response rather than afterward. It's easiest to treat them similarly
/// to the above case and discard them. (An alternative workaround would
/// be to buffer them until after Retina has examined the server's
/// PLAY response.)
///
/// Currently each packet is logged at debug priority. This may change.
pub fn ignore_spurious_data(mut self, ignore_spurious_data: bool) -> Self {
self.ignore_spurious_data = ignore_spurious_data;
self
}
/// Use the given credentials when/if the server requests digest authentication.
pub fn creds(mut self, creds: Option<Credentials>) -> Self {
self.creds = creds;
self
}
/// Sends the given user agent string with each request.
pub fn user_agent(mut self, user_agent: String) -> Self {
self.user_agent = user_agent;
self
}
}
/// Options which must be decided at `PLAY` time.
///
/// These are mostly adjustments for non-compliant server implementations.
/// See also [SessionOptions] for options which must be decided earlier.
#[derive(Default)]
pub struct PlayPolicy {
pub struct PlayOptions {
initial_timestamp: InitialTimestampPolicy,
ignore_zero_seq: bool,
enforce_timestamps_with_max_jump_secs: Option<NonZeroU32>,
}
impl PlayPolicy {
impl PlayOptions {
pub fn initial_timestamp(self, initial_timestamp: InitialTimestampPolicy) -> Self {
Self {
initial_timestamp,
@ -291,10 +344,9 @@ impl State for Playing {}
/// The raw connection, without tracking session state.
struct RtspConnection {
creds: Option<Credentials>,
options: SessionOptions,
requested_auth: Option<digest_auth::WwwAuthenticateHeader>,
inner: crate::tokio::Connection,
user_agent: String,
/// The next `CSeq` header value to use when sending an RTSP request.
next_cseq: u32,
@ -311,7 +363,7 @@ pub struct Session<S: State> {
}
impl RtspConnection {
async fn connect(url: &Url, creds: Option<Credentials>) -> Result<Self, Error> {
async fn connect(url: &Url, options: SessionOptions) -> Result<Self, Error> {
let host =
RtspConnection::validate_url(url).map_err(|e| wrap!(ErrorInt::InvalidArgument(e)))?;
let port = url.port().unwrap_or(554);
@ -320,9 +372,8 @@ impl RtspConnection {
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
Ok(Self {
inner,
creds,
options,
requested_auth: None,
user_agent: "moonfire-rtsp test".to_string(),
next_cseq: 1,
})
}
@ -357,23 +408,40 @@ impl RtspConnection {
.await
.map_err(|e| wrap!(e))?;
let method: &str = req.method().into();
let msg = self.inner.next().await.unwrap_or_else(|| {
bail!(ErrorInt::ReadError {
conn_ctx: *self.inner.ctx(),
msg_ctx: self.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("EOF while expecting reply to {} CSeq {}", method, cseq),
),
})
})?;
let resp = match msg.msg {
rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => r,
o => bail!(ErrorInt::RtspFramingError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
description: format!("Expected reply to {} CSeq {}, got {:?}", method, cseq, o,),
}),
let (resp, msg_ctx) = loop {
let msg = self.inner.next().await.unwrap_or_else(|| {
bail!(ErrorInt::ReadError {
conn_ctx: *self.inner.ctx(),
msg_ctx: self.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("EOF while expecting reply to {} CSeq {}", method, cseq),
),
})
})?;
match msg.msg {
rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => {
break (r, msg.ctx)
}
rtsp_types::Message::Data(d) if self.options.ignore_spurious_data => {
debug!(
"ignoring interleaved data message on channel {} while waiting \
for reply to {} CSeq {}",
d.channel_id(),
method,
cseq
);
continue;
}
o => bail!(ErrorInt::RtspFramingError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
description: format!(
"Expected reply to {} CSeq {}, got {:?}",
method, cseq, o,
),
}),
};
};
if resp.status() == rtsp_types::StatusCode::Unauthorized {
if self.requested_auth.is_some() {
@ -381,7 +449,7 @@ impl RtspConnection {
// In that case, we should retry rather than returning error.
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
@ -391,7 +459,7 @@ impl RtspConnection {
let www_authenticate = match resp.header(&rtsp_types::headers::WWW_AUTHENTICATE) {
None => bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
@ -405,7 +473,7 @@ impl RtspConnection {
// error or not based on ordering.
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
@ -415,10 +483,10 @@ impl RtspConnection {
),
})
}
if self.creds.is_none() {
if self.options.creds.is_none() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
@ -426,7 +494,6 @@ impl RtspConnection {
.to_owned(),
})
}
let msg_ctx = msg.ctx;
let www_authenticate = digest_auth::WwwAuthenticateHeader::parse(www_authenticate)
.map_err(|e| {
wrap!(ErrorInt::RtspResponseError {
@ -446,14 +513,14 @@ impl RtspConnection {
} else if !resp.status().is_success() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: "Unexpected RTSP response status".into(),
});
}
return Ok((msg.ctx, cseq, resp));
return Ok((msg_ctx, cseq, resp));
}
}
@ -463,6 +530,7 @@ impl RtspConnection {
self.next_cseq += 1;
if let Some(ref mut auth) = self.requested_auth {
let creds = self
.options
.creds
.as_ref()
.expect("creds were checked when filling request_auth");
@ -484,7 +552,12 @@ impl RtspConnection {
req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization);
}
req.insert_header(rtsp_types::headers::CSEQ, cseq.to_string());
req.insert_header(rtsp_types::headers::USER_AGENT, self.user_agent.clone());
if !self.options.user_agent.is_empty() {
req.insert_header(
rtsp_types::headers::USER_AGENT,
self.options.user_agent.clone(),
);
}
Ok(cseq)
}
}
@ -497,8 +570,8 @@ impl Session<Described> {
/// depacketized correctly. If those streams are setup via
/// `Session<Described>::setup`, the erorrs in question will be ultimately
/// returned from `Stream<Playing>::demuxed`.
pub async fn describe(url: Url, creds: Option<Credentials>) -> Result<Self, Error> {
let mut conn = RtspConnection::connect(&url, creds).await?;
pub async fn describe(url: Url, options: SessionOptions) -> Result<Self, Error> {
let mut conn = RtspConnection::connect(&url, options).await?;
let mut req =
rtsp_types::Request::builder(rtsp_types::Method::Describe, rtsp_types::Version::V1_0)
.header(rtsp_types::headers::ACCEPT, "application/sdp")
@ -628,7 +701,7 @@ impl Session<Described> {
///
/// The presentation must support aggregate control, as defined in [RFC 2326
/// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3).
pub async fn play(mut self, policy: PlayPolicy) -> Result<Session<Playing>, Error> {
pub async fn play(mut self, policy: PlayOptions) -> Result<Session<Playing>, Error> {
let session_id = self.state.session_id.take().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition(
"must SETUP before PLAY".into()
@ -892,6 +965,13 @@ impl Session<Playing> {
let channel_id = data.channel_id();
let m = match state.channels.lookup(channel_id) {
Some(m) => m,
None if conn.options.ignore_spurious_data => {
log::debug!(
"Ignoring interleaved data on unassigned channel id {}",
channel_id
);
return Ok(None);
}
None => bail!(ErrorInt::RtspUnassignedChannelError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: *msg_ctx,
@ -910,16 +990,23 @@ impl Session<Playing> {
),
};
match m.channel_type {
ChannelType::Rtp => Ok(Some(rtp_handler.rtp(
ChannelType::Rtp => Ok(rtp_handler.rtp(
&conn.options,
conn.inner.ctx(),
msg_ctx,
&mut timeline,
channel_id,
m.stream_i,
data.into_body(),
)?)),
)?),
ChannelType::Rtcp => rtp_handler
.rtcp(msg_ctx, &mut timeline, m.stream_i, data.into_body())
.rtcp(
&conn.options,
msg_ctx,
&mut timeline,
m.stream_i,
data.into_body(),
)
.map_err(|description| {
wrap!(ErrorInt::RtspDataMessageError {
conn_ctx: *conn.inner.ctx(),
@ -1078,6 +1165,8 @@ impl futures::Stream for Demuxed {
mod tests {
use super::*;
// TODO: tests of Reolink ignore_spurious_data scenarios.
// See with: cargo test -- --nocapture client::tests::print_sizes
#[test]
fn print_sizes() {

View File

@ -87,13 +87,14 @@ impl StrictSequenceChecker {
pub fn rtp(
&mut self,
session_options: &super::SessionOptions,
conn_ctx: &crate::ConnectionContext,
msg_ctx: &crate::RtspMessageContext,
timeline: &mut super::Timeline,
channel_id: u8,
stream_id: usize,
mut data: Bytes,
) -> Result<PacketItem, Error> {
) -> Result<Option<PacketItem>, Error> {
// Terrible hack to try to make sense of the GW Security GW4089IP's audio stream.
// It appears to have one RTSP interleaved message wrapped in another. RTP and RTCP
// packets can never start with '$', so this shouldn't interfere with well-behaved
@ -123,6 +124,47 @@ impl StrictSequenceChecker {
})?;
let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq.
let ssrc = reader.ssrc();
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
if matches!(self.ssrc, Some(s) if s != ssrc) {
if session_options.ignore_spurious_data {
log::debug!(
"Ignoring spurious RTP data with ssrc={:08x} seq={:04x} while expecting \
ssrc={:08x?} seq={:04x?}",
ssrc,
sequence_number,
self.ssrc,
self.next_seq
);
return Ok(None);
} else {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description: format!(
"Wrong ssrc; expecting ssrc={:08x?} seq={:04x?}",
self.ssrc, self.next_seq
),
});
}
}
if loss > 0x80_00 {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description: format!(
"Out-of-order packet or large loss; expecting ssrc={:08x?} seq={:04x?}",
self.ssrc, self.next_seq
),
});
}
let timestamp = match timeline.advance_to(reader.timestamp()) {
Ok(ts) => ts,
Err(description) => bail!(ErrorInt::RtpPacketError {
@ -135,21 +177,6 @@ impl StrictSequenceChecker {
description,
}),
};
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
if matches!(self.ssrc, Some(s) if s != ssrc) || loss > 0x80_00 {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description: format!(
"Expected ssrc={:08x?} seq={:04x?}",
self.ssrc, self.next_seq
),
});
}
self.ssrc = Some(ssrc);
let mark = reader.mark();
let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| {
@ -166,7 +193,7 @@ impl StrictSequenceChecker {
data.truncate(payload_range.end);
data.advance(payload_range.start);
self.next_seq = Some(sequence_number.wrapping_add(1));
Ok(PacketItem::RtpPacket(Packet {
Ok(Some(PacketItem::RtpPacket(Packet {
ctx: *msg_ctx,
channel_id,
stream_id,
@ -176,11 +203,12 @@ impl StrictSequenceChecker {
loss,
mark,
payload: data,
}))
})))
}
pub fn rtcp(
&mut self,
session_options: &super::SessionOptions,
msg_ctx: &crate::RtspMessageContext,
timeline: &mut super::Timeline,
stream_id: usize,
@ -207,10 +235,20 @@ impl StrictSequenceChecker {
let ssrc = pkt.ssrc();
if matches!(self.ssrc, Some(s) if s != ssrc) {
return Err(format!(
"Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}",
self.ssrc, ssrc
));
if session_options.ignore_spurious_data {
log::debug!(
"Ignoring spurious RTCP data with ssrc={:08x} while \
expecting ssrc={:08x?}",
ssrc,
self.ssrc
);
return Ok(None);
} else {
return Err(format!(
"Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}",
self.ssrc, ssrc
));
}
}
self.ssrc = Some(ssrc);