diff --git a/CHANGELOG.md b/CHANGELOG.md index 30bc9de..008a88f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## unreleased +* New opaque error type with more uniform, richer error messages. No more + `failure` dependency. + ## v0.0.4 (2021-06-28) * bugfix: Retina stopped receiving packets after receiving a keepalive response. diff --git a/Cargo.lock b/Cargo.lock index fcf9f69..bd36cff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,21 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "addr2line" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03345e98af8f3d786b6d9f656ccfa6ac316d954e92bc4841f0bba20789d5fb5a" -dependencies = [ - "gimli", -] - -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "ansi_term" version = "0.11.0" @@ -26,6 +11,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "anyhow" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" + [[package]] name = "arrayvec" version = "0.5.2" @@ -49,21 +40,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" -[[package]] -name = "backtrace" -version = "0.3.60" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7815ea54e4d821e791162e078acbebfd6d8c8939cd559c9335dceb1c8ca7282" -dependencies = [ - "addr2line", - "cc", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", -] - [[package]] name = "base64" version = "0.13.0" @@ -139,12 +115,6 @@ dependencies = [ "rustc_version", ] -[[package]] -name = "cc" -version = "1.0.68" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a72c244c1ff497a746a7e1fb3d14bd08420ecda70c8f25c7112f2781652d787" - [[package]] name = "cfg-if" version = "1.0.0" @@ -326,28 +296,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "failure" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86" -dependencies = [ - "backtrace", - "failure_derive", -] - -[[package]] -name = "failure_derive" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "synstructure", -] - [[package]] name = "form_urlencoded" version = "1.0.1" @@ -485,12 +433,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "gimli" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4075386626662786ddb0ec9081e7c7eeb1ba31951f447ca780ef9f5d568189" - [[package]] name = "h264-reader" version = "0.5.0" @@ -661,16 +603,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "miniz_oxide" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" -dependencies = [ - "adler", - "autocfg", -] - [[package]] name = "mio" version = "0.7.11" @@ -770,15 +702,6 @@ dependencies = [ "libc", ] -[[package]] -name = "object" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9023c1c0973b327f073c7f2fceb9bcc049862f93a7d14c6feb46c8a56460a0d5" -dependencies = [ - "memchr", -] - [[package]] name = "once_cell" version = "1.7.2" @@ -1066,14 +989,14 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" [[package]] name = "retina" -version = "0.0.4" +version = "0.0.5-dev" dependencies = [ + "anyhow", "base64", "bitreader", "bytes", "criterion", "digest_auth", - "failure", "futures", "h264-reader", "hex", @@ -1088,6 +1011,7 @@ dependencies = [ "sdp", "smallvec", "structopt", + "thiserror", "time", "tokio", "tokio-util", @@ -1133,12 +1057,6 @@ dependencies = [ "url", ] -[[package]] -name = "rustc-demangle" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "410f7acf3cb3a44527c5d9546bad4bf4e6c460915d5f9f2fc524498bfe8f70ce" - [[package]] name = "rustc_version" version = "0.3.3" @@ -1317,18 +1235,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "synstructure" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "unicode-xid", -] - [[package]] name = "tap" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 64faf1c..dbca0c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "retina" -version = "0.0.4" +version = "0.0.5-dev" authors = ["Scott Lamb "] license = "MIT/Apache-2.0" edition = "2018" @@ -15,7 +15,6 @@ base64 = "0.13.0" bitreader = "0.3.3" bytes = "1.0.1" digest_auth = "0.3.0" -failure = "0.1.8" futures = "0.3.14" hex = "0.4.3" h264-reader = "0.5.0" @@ -28,12 +27,14 @@ rtp-rs = "0.6.0" rtsp-types = "0.0.2" sdp = "0.1.4" smallvec = { version = "1.6.1", features = ["union"] } +thiserror = "1.0.25" time = "0.1.43" tokio = { version = "1.5.0", features = ["macros", "net", "time"] } tokio-util = { version = "0.6.6", features = ["codec"] } url = "2.2.1" [dev-dependencies] +anyhow = "1.0.41" criterion = { version = "0.3.4", features = ["async_tokio"] } mylog = { git = "https://github.com/scottlamb/mylog" } structopt = "0.3.21" diff --git a/README.md b/README.md index 624320e..5f9172b 100644 --- a/README.md +++ b/README.md @@ -44,8 +44,9 @@ Progress: * [x] application: ONVIF metadata * [ ] uniform, documented API. (Currently haphazard in terms of naming, what fields are exposed directly vs use an accessors, etc.) -* [ ] rich errors. (Currently uses untyped errors with the deprecated - `failure` crate; some error messages are quite detailed, others aren't.) +* quality errors +* * [x] detailed error description text. +* * [ ] programmatically inspectable error type. * [ ] good functional testing coverage. (Currently lightly / unevenly tested.) Most depacketizers have no tests.) * [ ] fuzz testing. (In progress.) diff --git a/benches/depacketize.rs b/benches/depacketize.rs index 915b7d8..1f209f0 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -31,7 +31,8 @@ fn h264_aac ()>(mut f: F) { Depacketizer::new("audio", "mpeg4-generic", 12_000, NonZeroU16::new(2), Some("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490")).unwrap(), Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(), ]; - let ctx = retina::Context::dummy(); + let conn_ctx = retina::ConnectionContext::dummy(); + let msg_ctx = retina::RtspMessageContext::dummy(); while !remaining.is_empty() { assert!(remaining.len() > 4); assert_eq!(remaining[0], b'$'); @@ -46,12 +47,19 @@ fn h264_aac ()>(mut f: F) { 1 | 3 => continue, // RTCP _ => unreachable!(), }; - let pkt = match rtps[stream_id].rtp(ctx, &mut timelines[stream_id], stream_id, data) { + let pkt = match rtps[stream_id].rtp( + &conn_ctx, + &msg_ctx, + &mut timelines[stream_id], + channel_id, + stream_id, + data, + ) { Ok(retina::client::PacketItem::RtpPacket(rtp)) => rtp, _ => unreachable!(), }; depacketizers[stream_id].push(pkt).unwrap(); - while let Some(pkt) = depacketizers[stream_id].pull().unwrap() { + while let Some(pkt) = depacketizers[stream_id].pull(&conn_ctx).unwrap() { f(pkt); } } diff --git a/examples/client/main.rs b/examples/client/main.rs index d24c834..f426386 100644 --- a/examples/client/main.rs +++ b/examples/client/main.rs @@ -6,9 +6,9 @@ mod metadata; mod mp4; -use failure::Error; +use anyhow::Error; use log::{error, info}; -use std::{fmt::Write, str::FromStr}; +use std::str::FromStr; use structopt::StructOpt; #[derive(StructOpt)] @@ -29,24 +29,6 @@ enum Cmd { Metadata(metadata::Opts), } -/// Returns a pretty-and-informative version of `e`. -pub fn prettify_failure(e: &failure::Error) -> String { - let mut msg = e.to_string(); - for cause in e.iter_causes() { - write!(&mut msg, "\ncaused by: {}", cause).unwrap(); - } - if e.backtrace().is_empty() { - write!( - &mut msg, - "\n\n(set environment variable RUST_BACKTRACE=1 to see backtraces)" - ) - .unwrap(); - } else { - write!(&mut msg, "\n\nBacktrace:\n{}", e.backtrace()).unwrap(); - } - msg -} - fn init_logging() -> mylog::Handle { let h = mylog::Builder::new() .set_format( @@ -68,7 +50,7 @@ async fn main() { let _a = h.async_scope(); main_inner().await } { - error!("Fatal: {}", prettify_failure(&e)); + error!("Fatal: {}", e); std::process::exit(1); } info!("Done"); diff --git a/examples/client/metadata.rs b/examples/client/metadata.rs index 87b55a7..cba563a 100644 --- a/examples/client/metadata.rs +++ b/examples/client/metadata.rs @@ -1,7 +1,7 @@ // Copyright (C) 2021 Scott Lamb // SPDX-License-Identifier: MIT OR Apache-2.0 -use failure::{format_err, Error}; +use anyhow::{anyhow, Error}; use futures::StreamExt; use log::info; use retina::codec::CodecItem; @@ -21,7 +21,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { .streams() .iter() .position(|s| matches!(s.parameters(), Some(retina::codec::Parameters::Message(..)))) - .ok_or_else(|| format_err!("couldn't find onvif stream"))?; + .ok_or_else(|| anyhow!("couldn't find onvif stream"))?; session.setup(onvif_stream_i).await?; let session = session .play(retina::client::PlayPolicy::default().ignore_zero_seq(true)) @@ -33,7 +33,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { loop { tokio::select! { item = session.next() => { - match item.ok_or_else(|| format_err!("EOF"))?? { + match item.ok_or_else(|| anyhow!("EOF"))?? { CodecItem::MessageFrame(m) => { info!("{}: {}\n", &m.timestamp, std::str::from_utf8(&m.data[..]).unwrap()); }, diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index dcd3df2..369efa0 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -17,8 +17,8 @@ //! https://github.com/scottlamb/moonfire-nvr/wiki/Standards-and-specifications //! https://standards.iso.org/ittf/PubliclyAvailableStandards/c068960_ISO_IEC_14496-12_2015.zip +use anyhow::{anyhow, bail, Error}; use bytes::{Buf, BufMut, BytesMut}; -use failure::{bail, format_err, Error}; use futures::StreamExt; use log::info; use retina::codec::{AudioParameters, CodecItem, VideoParameters}; @@ -485,7 +485,7 @@ impl Mp4Writer { Ok(()) } - async fn video(&mut self, frame: retina::codec::VideoFrame) -> Result<(), failure::Error> { + async fn video(&mut self, frame: retina::codec::VideoFrame) -> Result<(), Error> { println!( "{}: {}-byte video frame", &frame.timestamp, @@ -500,7 +500,7 @@ impl Mp4Writer { self.mdat_pos = self .mdat_pos .checked_add(size) - .ok_or_else(|| format_err!("mdat_pos overflow"))?; + .ok_or_else(|| anyhow!("mdat_pos overflow"))?; if frame.is_random_access_point { self.video_sync_sample_nums .push(u32::try_from(self.video_trak.samples)?); @@ -510,7 +510,7 @@ impl Mp4Writer { Ok(()) } - async fn audio(&mut self, mut frame: retina::codec::AudioFrame) -> Result<(), failure::Error> { + async fn audio(&mut self, mut frame: retina::codec::AudioFrame) -> Result<(), Error> { println!( "{}: {}-byte audio frame", &frame.timestamp, @@ -522,7 +522,7 @@ impl Mp4Writer { self.mdat_pos = self .mdat_pos .checked_add(size) - .ok_or_else(|| format_err!("mdat_pos overflow"))?; + .ok_or_else(|| anyhow!("mdat_pos overflow"))?; write_all_buf(&mut self.inner, &mut frame.data).await?; Ok(()) } @@ -585,7 +585,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { loop { tokio::select! { pkt = session.next() => { - match pkt.ok_or_else(|| format_err!("EOF"))?? { + match pkt.ok_or_else(|| anyhow!("EOF"))?? { CodecItem::VideoFrame(f) => mp4.video(f).await?, CodecItem::AudioFrame(f) => mp4.audio(f).await?, _ => continue, diff --git a/src/client/channel_mapping.rs b/src/client/channel_mapping.rs index d55d6e3..8a2f450 100644 --- a/src/client/channel_mapping.rs +++ b/src/client/channel_mapping.rs @@ -5,8 +5,6 @@ use std::num::NonZeroU8; -use failure::{bail, Error}; - #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum ChannelType { Rtp, @@ -39,27 +37,27 @@ pub struct ChannelMapping { pub struct ChannelMappings(smallvec::SmallVec<[Option; 16]>); impl ChannelMappings { - /// Returns the next unassigned even channel id, or errors. - pub fn next_unassigned(&self) -> Result { + /// Returns the next unassigned even channel id, or `None` if all assigned. + pub fn next_unassigned(&self) -> Option { if let Some(i) = self.0.iter().position(Option::is_none) { - return Ok((i as u8) << 1); + return Some((i as u8) << 1); } if self.0.len() < 128 { - return Ok((self.0.len() as u8) << 1); + return Some((self.0.len() as u8) << 1); } - bail!("all RTSP channels have been assigned"); + None } /// Assigns an even channel id (to RTP) and its odd successor (to RTCP) or errors. - pub fn assign(&mut self, channel_id: u8, stream_i: usize) -> Result<(), Error> { + pub fn assign(&mut self, channel_id: u8, stream_i: usize) -> Result<(), String> { if (channel_id & 1) != 0 { - bail!("Can't assign odd channel id {}", channel_id); + return Err(format!("Can't assign odd channel id {}", channel_id)); } if stream_i >= 255 { - bail!( + return Err(format!( "Can't assign channel to stream id {} because it's >= 255", stream_i - ); + )); } let i = usize::from(channel_id >> 1); if i >= self.0.len() { @@ -67,12 +65,12 @@ impl ChannelMappings { } let c = &mut self.0[i]; if let Some(c) = c { - bail!( + return Err(format!( "Channel id {} is already assigned to stream {}; won't reassign to stream {}", channel_id, c.get() - 1, channel_id - ); + )); } *c = Some(NonZeroU8::new((stream_i + 1) as u8).expect("[0, 255) + 1 is non-zero")); Ok(()) diff --git a/src/client/mod.rs b/src/client/mod.rs index 6078716..cb9d8ad 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -8,16 +8,15 @@ use std::{borrow::Cow, fmt::Debug, num::NonZeroU16, pin::Pin}; use self::channel_mapping::*; pub use self::timeline::Timeline; use bytes::Bytes; -use failure::{bail, format_err, Error}; use futures::{ready, Future, SinkExt, StreamExt}; use log::{debug, trace, warn}; use pin_project::pin_project; use sdp::session_description::SessionDescription; use tokio::pin; -use tokio_util::codec::Framed; use url::Url; -use crate::{codec::CodecItem, Context}; +use crate::codec::CodecItem; +use crate::{Error, ErrorInt, RtspMessageContext}; mod channel_mapping; mod parse; @@ -59,12 +58,12 @@ impl Default for InitialTimestampPolicy { impl std::fmt::Display for InitialTimestampPolicy { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - InitialTimestampPolicy::Default => f.pad("default"), - InitialTimestampPolicy::Require => f.pad("require"), - InitialTimestampPolicy::Ignore => f.pad("ignore"), - InitialTimestampPolicy::Permissive => f.pad("permissive"), - } + f.pad(match self { + InitialTimestampPolicy::Default => "default", + InitialTimestampPolicy::Require => "require", + InitialTimestampPolicy::Ignore => "ignore", + InitialTimestampPolicy::Permissive => "permissive", + }) } } @@ -77,7 +76,11 @@ impl std::str::FromStr for InitialTimestampPolicy { "require" => InitialTimestampPolicy::Require, "ignore" => InitialTimestampPolicy::Ignore, "permissive" => InitialTimestampPolicy::Permissive, - _ => bail!("Initial timestamp mode {:?} not understood", s), + _ => bail!(ErrorInt::InvalidArgument(format!( + "bad InitialTimestampPolicy {}; \ + expected default, require, ignore or permissive", + s + ))), }) } } @@ -165,7 +168,7 @@ pub struct Stream { /// Number of audio channels, if applicable (`media` is `audio`) and known. pub channels: Option, - depacketizer: Result, + depacketizer: Result, /// The specified control URL. /// This is needed with multiple streams to send `SETUP` requests and @@ -240,15 +243,26 @@ pub struct Credentials { /// `DESCRIBE` changes the connection's state such that another one will fail, /// before assigning a session id. Thus [Session] represents something more like /// an RTSP connection than an RTSP session. +#[doc(hidden)] pub trait State {} -/// Initial state after a `DESCRIBE`. +/// Initial state after a `DESCRIBE`; use via `Session`. /// One or more `SETUP`s may have also been issued, in which case a `session_id` /// will be assigned. +#[doc(hidden)] pub struct Described { presentation: Presentation, session_id: Option, channels: ChannelMappings, + + // Keep some information about the DESCRIBE response. If a depacketizer + // couldn't be constructed correctly for one or more streams, this will be + // used to create a RtspResponseError on `State::demuxed()`. + // We defer such errors from DESCRIBE time until then because they only + // matter if the stream is setup and the caller wants depacketization. + describe_ctx: RtspMessageContext, + describe_cseq: u32, + describe_status: rtsp_types::StatusCode, } impl State for Described {} @@ -258,13 +272,17 @@ enum KeepaliveState { Waiting(u32), } -/// State after a `PLAY`. +/// State after a `PLAY`; use via `Session`. +#[doc(hidden)] #[pin_project(project = PlayingProj)] pub struct Playing { presentation: Presentation, session_id: String, channels: ChannelMappings, keepalive_state: KeepaliveState, + describe_ctx: RtspMessageContext, + describe_cseq: u32, + describe_status: rtsp_types::StatusCode, #[pin] keepalive_timer: tokio::time::Sleep, @@ -275,7 +293,7 @@ impl State for Playing {} struct RtspConnection { creds: Option, requested_auth: Option, - stream: Framed, + inner: crate::tokio::Connection, user_agent: String, /// The next `CSeq` header value to use when sending an RTSP request. @@ -294,44 +312,36 @@ pub struct Session { impl RtspConnection { async fn connect(url: &Url, creds: Option) -> Result { + let host = + RtspConnection::validate_url(url).map_err(|e| wrap!(ErrorInt::InvalidArgument(e)))?; + let port = url.port().unwrap_or(554); + let inner = crate::tokio::Connection::connect(host, port) + .await + .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; + Ok(Self { + inner, + creds, + requested_auth: None, + user_agent: "moonfire-rtsp test".to_string(), + next_cseq: 1, + }) + } + + fn validate_url(url: &Url) -> Result, String> { if url.scheme() != "rtsp" { - bail!("Only rtsp urls supported (no rtsps yet)"); + return Err(format!( + "Bad URL {}; only scheme rtsp supported", + url.as_str() + )); } if url.username() != "" || url.password().is_some() { // Url apparently doesn't even have a way to clear the credentials, // so this has to be an error. - bail!("URL must not contain credentials"); + // TODO: that's not true; revisit this. + return Err("URL must not contain credentials".to_owned()); } - let host = url - .host_str() - .ok_or_else(|| format_err!("Must specify host in rtsp url {}", &url))?; - let port = url.port().unwrap_or(554); - let stream = tokio::net::TcpStream::connect((host, port)).await?; - let conn_established_wall = time::get_time(); - let conn_established = std::time::Instant::now(); - let conn_local_addr = stream.local_addr()?; - let conn_peer_addr = stream.peer_addr()?; - let stream = Framed::new( - stream, - crate::Codec { - ctx: crate::Context { - conn_established_wall, - conn_established, - conn_local_addr, - conn_peer_addr, - msg_pos: 0, - msg_received_wall: conn_established_wall, - msg_received: conn_established, - }, - }, - ); - Ok(Self { - creds, - requested_auth: None, - stream, - user_agent: "moonfire-rtsp test".to_string(), - next_cseq: 1, - }) + url.host() + .ok_or_else(|| format!("Must specify host in rtsp url {}", &url)) } /// Sends a request and expects the next message from the peer to be its response. @@ -339,63 +349,111 @@ impl RtspConnection { async fn send( &mut self, req: &mut rtsp_types::Request, - ) -> Result, Error> { + ) -> Result<(RtspMessageContext, u32, rtsp_types::Response), Error> { loop { let cseq = self.fill_req(req)?; - self.stream + self.inner .send(rtsp_types::Message::Request(req.clone())) - .await?; - let msg = self - .stream - .next() .await - .ok_or_else(|| format_err!("unexpected EOF while waiting for reply"))??; + .map_err(|e| wrap!(e))?; + let method: &str = req.method().into(); + let msg = self.inner.next().await.unwrap_or_else(|| { + bail!(ErrorInt::ReadError { + conn_ctx: *self.inner.ctx(), + msg_ctx: self.inner.eof_ctx(), + source: std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!("EOF while expecting reply to {} CSeq {}", method, cseq), + ), + }) + })?; let resp = match msg.msg { - rtsp_types::Message::Response(r) => r, - o => bail!("Unexpected RTSP message {:?}", &o), + rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => r, + o => bail!(ErrorInt::RtspFramingError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + description: format!("Expected reply to {} CSeq {}, got {:?}", method, cseq, o,), + }), }; - if parse::get_cseq(&resp) != Some(cseq) { - bail!( - "didn't get expected CSeq {:?} on {:?} at {:#?}", - &cseq, - &resp, - &msg.ctx - ); - } if resp.status() == rtsp_types::StatusCode::Unauthorized { if self.requested_auth.is_some() { - bail!( - "Received Unauthorized after trying digest auth at {:#?}", - &msg.ctx - ); + // TODO: the WWW-Authenticate might indicate a new domain or nonce. + // In that case, we should retry rather than returning error. + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + method: req.method().clone(), + cseq, + status: resp.status(), + description: "Received Unauthorized after trying digest auth".into(), + }) } let www_authenticate = match resp.header(&rtsp_types::headers::WWW_AUTHENTICATE) { - None => bail!( - "Unauthorized without WWW-Authenticate header at {:#?}", - &msg.ctx - ), + None => bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + method: req.method().clone(), + cseq, + status: resp.status(), + description: "Unauthorized without WWW-Authenticate header".into(), + }), Some(h) => h, }; let www_authenticate = www_authenticate.as_str(); if !www_authenticate.starts_with("Digest ") { - bail!( - "Non-digest authentication requested at {:#?}: {}", - &msg.ctx, - www_authenticate - ); + // TODO: the header(s) might also indicate both Basic and Digest; we shouldn't + // error or not based on ordering. + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + method: req.method().clone(), + cseq, + status: resp.status(), + description: format!( + "Non-digest authentication requested: {}", + www_authenticate + ), + }) } - let www_authenticate = digest_auth::WwwAuthenticateHeader::parse(www_authenticate)?; + if self.creds.is_none() { + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + method: req.method().clone(), + cseq, + status: resp.status(), + description: "Authentication requested and no credentials supplied" + .to_owned(), + }) + } + let msg_ctx = msg.ctx; + let www_authenticate = digest_auth::WwwAuthenticateHeader::parse(www_authenticate) + .map_err(|e| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *self.inner.ctx(), + msg_ctx, + method: req.method().clone(), + cseq, + status: resp.status(), + description: format!( + "Bad WWW-Authenticate header {:?}: {}", + www_authenticate, e + ), + }) + })?; self.requested_auth = Some(www_authenticate); continue; } else if !resp.status().is_success() { - bail!( - "RTSP {:?} request returned {} at {:#?}", - req.method(), - resp.status(), - &msg.ctx - ); + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.inner.ctx(), + msg_ctx: msg.ctx, + method: req.method().clone(), + cseq, + status: resp.status(), + description: "Unexpected RTSP response status".into(), + }); } - return Ok(resp); + return Ok((msg.ctx, cseq, resp)); } } @@ -403,22 +461,27 @@ impl RtspConnection { fn fill_req(&mut self, req: &mut rtsp_types::Request) -> Result { let cseq = self.next_cseq; self.next_cseq += 1; - match (self.requested_auth.as_mut(), self.creds.as_ref()) { - (None, _) => {} - (Some(auth), Some(creds)) => { - let uri = req.request_uri().map(|u| u.as_str()).unwrap_or("*"); - let method = digest_auth::HttpMethod(Cow::Borrowed(req.method().into())); - let ctx = digest_auth::AuthContext::new_with_method( - &creds.username, - &creds.password, - uri, - Option::<&'static [u8]>::None, - method, - ); - let authorization = auth.respond(&ctx)?.to_string(); - req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization); - } - (Some(_), None) => bail!("Authentication required; no credentials supplied"), + if let Some(ref mut auth) = self.requested_auth { + let creds = self + .creds + .as_ref() + .expect("creds were checked when filling request_auth"); + let uri = req.request_uri().map(|u| u.as_str()).unwrap_or("*"); + let method = digest_auth::HttpMethod(Cow::Borrowed(req.method().into())); + let ctx = digest_auth::AuthContext::new_with_method( + &creds.username, + &creds.password, + uri, + Option::<&'static [u8]>::None, + method, + ); + + // digest_auth's comments seem to say 'respond' failing means a parser bug. + let authorization = auth + .respond(&ctx) + .map_err(|e| wrap!(ErrorInt::Internal(e.into())))? + .to_string(); + req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization); } req.insert_header(rtsp_types::headers::CSEQ, cseq.to_string()); req.insert_header(rtsp_types::headers::USER_AGENT, self.user_agent.clone()); @@ -427,6 +490,13 @@ impl RtspConnection { } impl Session { + /// Creates a new session from a `DESCRIBE` request on the given URL. + /// + /// This method is permissive; it will return success even if there are + /// errors in the SDP that would prevent one or more streams from being + /// depacketized correctly. If those streams are setup via + /// `Session::setup`, the erorrs in question will be ultimately + /// returned from `Stream::demuxed`. pub async fn describe(url: Url, creds: Option) -> Result { let mut conn = RtspConnection::connect(&url, creds).await?; let mut req = @@ -434,14 +504,26 @@ impl Session { .header(rtsp_types::headers::ACCEPT, "application/sdp") .request_uri(url.clone()) .build(Bytes::new()); - let response = conn.send(&mut req).await?; - let presentation = parse::parse_describe(url, response)?; + let (msg_ctx, cseq, response) = conn.send(&mut req).await?; + let presentation = parse::parse_describe(url, &response).map_err(|description| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *conn.inner.ctx(), + msg_ctx, + method: rtsp_types::Method::Describe, + cseq, + status: response.status(), + description, + }) + })?; Ok(Session { conn, state: Described { presentation, session_id: None, channels: ChannelMappings::default(), + describe_ctx: msg_ctx, + describe_cseq: cseq, + describe_status: response.status(), }, }) } @@ -451,6 +533,7 @@ impl Session { } /// Sends a `SETUP` request for a stream. + /// /// Note these can't reasonably be pipelined because subsequent requests /// are expected to adopt the previous response's `Session`. Likewise, /// the server may override the preferred interleaved channel id and it @@ -461,9 +544,13 @@ impl Session { pub async fn setup(&mut self, stream_i: usize) -> Result<(), Error> { let stream = &mut self.state.presentation.streams[stream_i]; if !matches!(stream.state, StreamState::Uninit) { - bail!("stream already set up"); + bail!(ErrorInt::FailedPrecondition("stream already set up".into())); } - let proposed_channel_id = self.state.channels.next_unassigned()?; + let proposed_channel_id = self.state.channels.next_unassigned().ok_or_else(|| { + wrap!(ErrorInt::FailedPrecondition( + "no unassigned channels".into() + )) + })?; let url = stream .control .as_ref() @@ -484,21 +571,51 @@ impl Session { if let Some(ref s) = self.state.session_id { req = req.header(rtsp_types::headers::SESSION, s.clone()); } - let response = self.conn.send(&mut req.build(Bytes::new())).await?; + let (msg_ctx, cseq, response) = self.conn.send(&mut req.build(Bytes::new())).await?; debug!("SETUP response: {:#?}", &response); - let response = parse::parse_setup(&response)?; + let conn_ctx = self.conn.inner.ctx(); + let status = response.status(); + let response = parse::parse_setup(&response).map_err(|description| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *conn_ctx, + msg_ctx, + method: rtsp_types::Method::Setup, + cseq, + status, + description, + }) + })?; match self.state.session_id.as_ref() { Some(old) if old != response.session_id => { - bail!( - "SETUP response changed session id from {:?} to {:?}", - old, - response.session_id - ); + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.conn.inner.ctx(), + msg_ctx, + method: rtsp_types::Method::Setup, + cseq, + status, + description: format!( + "session id changed from {:?} to {:?}", + old, response.session_id, + ), + }); } Some(_) => {} None => self.state.session_id = Some(response.session_id.to_owned()), }; - self.state.channels.assign(response.channel_id, stream_i)?; + let conn_ctx = self.conn.inner.ctx(); + self.state + .channels + .assign(response.channel_id, stream_i) + .map_err(|description| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *conn_ctx, + msg_ctx, + method: rtsp_types::Method::Setup, + cseq, + status, + description, + }) + })?; stream.state = StreamState::Init(StreamStateInit { ssrc: response.ssrc, initial_seq: None, @@ -508,16 +625,17 @@ impl Session { } /// Sends a `PLAY` request for the entire presentation. + /// /// The presentation must support aggregate control, as defined in [RFC 2326 /// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3). pub async fn play(mut self, policy: PlayPolicy) -> Result, Error> { - let session_id = self - .state - .session_id - .take() - .ok_or_else(|| format_err!("must SETUP before PLAY"))?; + let session_id = self.state.session_id.take().ok_or_else(|| { + wrap!(ErrorInt::FailedPrecondition( + "must SETUP before PLAY".into() + )) + })?; trace!("PLAY with channel mappings: {:#?}", &self.state.channels); - let response = self + let (msg_ctx, cseq, response) = self .conn .send( &mut rtsp_types::Request::builder( @@ -530,7 +648,16 @@ impl Session { .build(Bytes::new()), ) .await?; - parse::parse_play(response, &mut self.state.presentation)?; + parse::parse_play(&response, &mut self.state.presentation).map_err(|description| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *self.conn.inner.ctx(), + msg_ctx, + method: rtsp_types::Method::Play, + cseq, + status: response.status(), + description, + }) + })?; // Count how many streams have been setup (not how many are in the presentation). let setup_streams = self @@ -563,27 +690,34 @@ impl Session { ssrc, .. }) => { - let initial_rtptime = - match policy.initial_timestamp { - InitialTimestampPolicy::Require | InitialTimestampPolicy::Default - if setup_streams > 1 => - { - if initial_rtptime.is_none() { - bail!( - "Expected rtptime on PLAY with mode {:?}, missing on stream \ - {} ({:?}). Consider setting initial timestamp mode \ - use-if-all-present.", - policy.initial_timestamp, i, &s.control); - } - initial_rtptime + let initial_rtptime = match policy.initial_timestamp { + InitialTimestampPolicy::Require | InitialTimestampPolicy::Default + if setup_streams > 1 => + { + if initial_rtptime.is_none() { + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.conn.inner.ctx(), + msg_ctx, + method: rtsp_types::Method::Play, + cseq, + status: response.status(), + description: format!( + "Expected rtptime on PLAY with mode {:?}, missing on \ + stream {} ({:?}). Consider setting initial timestamp \ + mode use-if-all-present.", + policy.initial_timestamp, i, &s.control + ), + }); } - InitialTimestampPolicy::Permissive - if setup_streams > 1 && all_have_time => - { - initial_rtptime - } - _ => None, - }; + initial_rtptime + } + InitialTimestampPolicy::Permissive + if setup_streams > 1 && all_have_time => + { + initial_rtptime + } + _ => None, + }; let initial_seq = match initial_seq { Some(0) if policy.ignore_zero_seq => { log::info!("Ignoring seq=0 on stream {}", i); @@ -591,12 +725,23 @@ impl Session { } o => o, }; + let conn_ctx = self.conn.inner.ctx(); s.state = StreamState::Playing { timeline: Timeline::new( initial_rtptime, s.clock_rate, policy.enforce_timestamps_with_max_jump_secs, - )?, + ) + .map_err(|description| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *conn_ctx, + msg_ctx, + method: rtsp_types::Method::Play, + cseq, + status: response.status(), + description, + }) + })?, rtp_handler: rtp::StrictSequenceChecker::new(ssrc, initial_seq), }; } @@ -612,6 +757,9 @@ impl Session { channels: self.state.channels, keepalive_state: KeepaliveState::Idle, keepalive_timer: tokio::time::sleep(KEEPALIVE_DURATION), + describe_ctx: self.state.describe_ctx, + describe_cseq: self.state.describe_cseq, + describe_status: self.state.describe_status, }, }) } @@ -624,11 +772,20 @@ pub enum PacketItem { impl Session { /// Returns a wrapper which demuxes/depacketizes into frames. + /// + /// Fails if a stream that has been setup can't be depacketized. pub fn demuxed(mut self) -> Result { for s in &mut self.state.presentation.streams { if matches!(s.state, StreamState::Playing { .. }) { - if let Err(ref mut e) = s.depacketizer { - return Err(std::mem::replace(e, format_err!("(placeholder)"))); + if let Err(ref description) = s.depacketizer { + bail!(ErrorInt::RtspResponseError { + conn_ctx: *self.conn.inner.ctx(), + msg_ctx: self.state.describe_ctx, + method: rtsp_types::Method::Describe, + cseq: self.state.describe_cseq, + status: self.state.describe_status, + description: description.clone(), + }); } } } @@ -645,23 +802,36 @@ impl Session { ) -> Result<(), Error> { // Expect the previous keepalive request to have finished. match state.keepalive_state { - KeepaliveState::Flushing(cseq) => bail!( - "Unable to write keepalive {} within {:?}", - cseq, - KEEPALIVE_DURATION, - ), - KeepaliveState::Waiting(cseq) => bail!( - "Server failed to respond to keepalive {} within {:?}", - cseq, - KEEPALIVE_DURATION, - ), + KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError { + conn_ctx: *conn.inner.ctx(), + source: std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!( + "Unable to write keepalive {} within {:?}", + cseq, KEEPALIVE_DURATION, + ), + ), + }), + KeepaliveState::Waiting(cseq) => bail!(ErrorInt::ReadError { + conn_ctx: *conn.inner.ctx(), + msg_ctx: conn.inner.eof_ctx(), + source: std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!( + "Server failed to respond to keepalive {} within {:?}", + cseq, KEEPALIVE_DURATION, + ), + ), + }), KeepaliveState::Idle => {} } // Currently the only outbound data should be keepalives, and the previous one // has already been flushed, so there's no reason the Sink shouldn't be ready. - if matches!(conn.stream.poll_ready_unpin(cx), Poll::Pending) { - bail!("Unexpectedly not ready to send keepalive"); + if matches!(conn.inner.poll_ready_unpin(cx), Poll::Pending) { + bail!(ErrorInt::Internal( + "Unexpectedly not ready to send keepalive".into() + )); } // Send a new one and reset the timer. @@ -673,12 +843,12 @@ impl Session { .header(rtsp_types::headers::SESSION, state.session_id.clone()) .build(Bytes::new()); let cseq = conn.fill_req(&mut req)?; - conn.stream + conn.inner .start_send_unpin(rtsp_types::Message::Request(req)) .expect("encoding is infallible"); - *state.keepalive_state = match conn.stream.poll_flush_unpin(cx) { + *state.keepalive_state = match conn.inner.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq), - Poll::Ready(Err(e)) => return Err(e), + Poll::Ready(Err(e)) => bail!(e), Poll::Pending => KeepaliveState::Flushing(cseq), }; @@ -691,6 +861,8 @@ impl Session { fn handle_response( state: &mut PlayingProj<'_>, + conn: &RtspConnection, + msg_ctx: &crate::RtspMessageContext, response: rtsp_types::Response, ) -> Result<(), Error> { if matches!(*state.keepalive_state, @@ -702,18 +874,27 @@ impl Session { } // The only response we expect in this state is to our keepalive request. - bail!("Unexpected RTSP response {:#?}", response); + bail!(ErrorInt::RtspFramingError { + conn_ctx: *conn.inner.ctx(), + msg_ctx: *msg_ctx, + description: format!("Unexpected RTSP response {:#?}", response), + }) } fn handle_data( state: &mut PlayingProj<'_>, - ctx: Context, + conn: &RtspConnection, + msg_ctx: &RtspMessageContext, data: rtsp_types::Data, ) -> Result, Error> { - let c = data.channel_id(); - let m = match state.channels.lookup(c) { + let channel_id = data.channel_id(); + let m = match state.channels.lookup(channel_id) { Some(m) => m, - None => bail!("Data message on unexpected channel {} at {:#?}", c, &ctx), + None => bail!(ErrorInt::RtspUnassignedChannelError { + conn_ctx: *conn.inner.ctx(), + msg_ctx: *msg_ctx, + channel_id, + }), }; let stream = &mut state.presentation.streams[m.stream_i]; let (mut timeline, rtp_handler) = match &mut stream.state { @@ -721,18 +902,31 @@ impl Session { timeline, rtp_handler, } => (timeline, rtp_handler), - _ => unreachable!("Session's {}->{:?} not in Playing state", c, m), + _ => unreachable!( + "Session's {}->{:?} not in Playing state", + channel_id, m + ), }; match m.channel_type { ChannelType::Rtp => Ok(Some(rtp_handler.rtp( - ctx, + conn.inner.ctx(), + msg_ctx, &mut timeline, + channel_id, m.stream_i, data.into_body(), )?)), - ChannelType::Rtcp => { - Ok(rtp_handler.rtcp(ctx, &mut timeline, m.stream_i, data.into_body())?) - } + ChannelType::Rtcp => rtp_handler + .rtcp(msg_ctx, &mut timeline, m.stream_i, data.into_body()) + .map_err(|description| { + wrap!(ErrorInt::RtspDataMessageError { + conn_ctx: *conn.inner.ctx(), + msg_ctx: *msg_ctx, + channel_id, + stream_id: m.stream_i, + description, + }) + }), } } @@ -753,17 +947,19 @@ impl futures::Stream for Session { loop { // First try receiving data. Let this starve keepalive handling; if we can't keep up, // the server should probably drop us. - match Pin::new(&mut this.conn.stream).poll_next(cx) { + match Pin::new(&mut this.conn.inner).poll_next(cx) { Poll::Ready(Some(Ok(msg))) => match msg.msg { rtsp_types::Message::Data(data) => { - match Session::handle_data(&mut state, msg.ctx, data) { + match Session::handle_data(&mut state, &this.conn, &msg.ctx, data) { Err(e) => return Poll::Ready(Some(Err(e))), Ok(Some(pkt)) => return Poll::Ready(Some(Ok(pkt))), Ok(None) => continue, }; } rtsp_types::Message::Response(response) => { - if let Err(e) = Session::handle_response(&mut state, response) { + if let Err(e) = + Session::handle_response(&mut state, &this.conn, &msg.ctx, response) + { return Poll::Ready(Some(Err(e))); } continue; @@ -785,9 +981,9 @@ impl futures::Stream for Session { // Then finish flushing the current keepalive if necessary. if let KeepaliveState::Flushing(cseq) = state.keepalive_state { - match this.conn.stream.poll_flush_unpin(cx) { + match this.conn.inner.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => *state.keepalive_state = KeepaliveState::Waiting(*cseq), - Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Box::new(e))))), Poll::Pending => {} } } @@ -801,6 +997,7 @@ impl futures::Stream for Session { enum DemuxedState { Waiting, Pulling(usize), + Fused, } /// Wrapper returned by [`Session::demuxed`] which demuxes/depacketizes into frames. @@ -830,6 +1027,7 @@ impl futures::Stream for Demuxed { None => return Poll::Ready(None), }, DemuxedState::Pulling(stream_id) => (*stream_id, None), + DemuxedState::Fused => return Poll::Ready(None), }; let session = this.session.as_mut().project(); let playing = session.state.project(); @@ -837,10 +1035,26 @@ impl futures::Stream for Demuxed { Ok(d) => d, Err(_) => unreachable!("depacketizer was Ok"), }; + let conn_ctx = session.conn.inner.ctx(); if let Some(p) = pkt { - depacketizer.push(p)?; + let msg_ctx = p.ctx; + let channel_id = p.channel_id; + let stream_id = p.stream_id; + let ssrc = p.ssrc; + let sequence_number = p.sequence_number; + depacketizer.push(p).map_err(|description| { + wrap!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + msg_ctx, + channel_id, + stream_id, + ssrc, + sequence_number, + description, + }) + })?; } - match depacketizer.pull() { + match depacketizer.pull(conn_ctx) { Ok(Some(item)) => { *this.state = DemuxedState::Pulling(stream_id); return Poll::Ready(Some(Ok(item))); @@ -849,7 +1063,10 @@ impl futures::Stream for Demuxed { *this.state = DemuxedState::Waiting; continue; } - Err(e) => return Poll::Ready(Some(Err(e))), + Err(e) => { + *this.state = DemuxedState::Fused; + return Poll::Ready(Some(Err(e))); + } } } } diff --git a/src/client/parse.rs b/src/client/parse.rs index 70df8dc..aabc0db 100644 --- a/src/client/parse.rs +++ b/src/client/parse.rs @@ -2,8 +2,8 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 use bytes::{Buf, Bytes}; -use failure::{bail, format_err, Error, ResultExt}; use log::debug; +use pretty_hex::PrettyHex; use sdp::media_description::MediaDescription; use std::{convert::TryFrom, num::NonZeroU16}; use url::Url; @@ -184,17 +184,16 @@ static STATIC_PAYLOAD_TYPES: [Option; 35] = [ }), ]; -fn join_control(base_url: &Url, control: &str) -> Result { +fn join_control(base_url: &Url, control: &str) -> Result { if control == "*" { return Ok(base_url.clone()); } - Ok(base_url.join(control).with_context(|_| { - format_err!( - "unable to join base url {} with control url {:?}", - base_url, - control + base_url.join(control).map_err(|e| { + format!( + "unable to join base url {} with control url {:?}: {}", + base_url, control, e ) - })?) + }) } /// Returns the `CSeq` from an RTSP response as a `u32`, or `None` if missing/unparseable. @@ -206,12 +205,12 @@ pub(crate) fn get_cseq(response: &rtsp_types::Response) -> Option { /// Parses a [MediaDescription] to a [Stream]. /// On failure, returns an error which is expected to be supplemented with -/// the [MediaDescription] debug string. +/// the [MediaDescription] debug string and packed into a `RtspResponseError`. fn parse_media( base_url: &Url, alt_base_url: &Url, media_description: &MediaDescription, -) -> Result { +) -> Result { let media = media_description.media_name.media.clone(); // https://tools.ietf.org/html/rfc8866#section-5.14 says "If the @@ -226,7 +225,7 @@ fn parse_media( .iter() .any(|p| p == "RTP") { - bail!("Expected RTP-based proto"); + return Err("Expected RTP-based proto".into()); } // RFC 8866 continues: "When a list of payload type numbers is given, @@ -238,11 +237,11 @@ fn parse_media( .media_name .formats .first() - .ok_or_else(|| format_err!("missing RTP payload type"))?; + .ok_or_else(|| "missing RTP payload type".to_string())?; let rtp_payload_type = u8::from_str_radix(rtp_payload_type_str, 10) - .map_err(|_| format_err!("invalid RTP payload type"))?; + .map_err(|_| format!("invalid RTP payload type {:?}", rtp_payload_type_str))?; if (rtp_payload_type & 0x80) != 0 { - bail!("invalid RTP payload type"); + return Err(format!("invalid RTP payload type {}", rtp_payload_type)); } // Capture interesting attributes. @@ -260,7 +259,7 @@ fn parse_media( let v = a .value .as_ref() - .ok_or_else(|| format_err!("rtpmap attribute with no value"))?; + .ok_or_else(|| "rtpmap attribute with no value".to_string())?; // https://tools.ietf.org/html/rfc8866#section-6.6 // rtpmap-value = payload-type SP encoding-name // "/" clock-rate [ "/" encoding-params ] @@ -271,7 +270,7 @@ fn parse_media( // channels = integer let (rtpmap_payload_type, v) = v .split_once(' ') - .ok_or_else(|| format_err!("invalid rtmap attribute"))?; + .ok_or_else(|| "invalid rtmap attribute".to_string())?; if rtpmap_payload_type == rtp_payload_type_str { rtpmap = Some(v); } @@ -280,10 +279,10 @@ fn parse_media( let v = a .value .as_ref() - .ok_or_else(|| format_err!("rtpmap attribute with no value"))?; + .ok_or_else(|| "fmtp attribute with no value".to_string())?; let (fmtp_payload_type, v) = v .split_once(' ') - .ok_or_else(|| format_err!("invalid rtmap attribute"))?; + .ok_or_else(|| "invalid fmtp attribute".to_string())?; if fmtp_payload_type == rtp_payload_type_str { fmtp = Some(v); } @@ -308,20 +307,20 @@ fn parse_media( Some(rtpmap) => { let (e, rtpmap) = rtpmap .split_once('/') - .ok_or_else(|| format_err!("invalid rtpmap attribute"))?; + .ok_or_else(|| "invalid rtpmap attribute".to_string())?; encoding_name = e; let (clock_rate_str, channels_str) = match rtpmap.find('/') { None => (rtpmap, None), Some(i) => (&rtpmap[..i], Some(&rtpmap[i + 1..])), }; clock_rate = u32::from_str_radix(clock_rate_str, 10) - .map_err(|_| format_err!("bad clockrate in rtpmap"))?; + .map_err(|_| "bad clockrate in rtpmap".to_string())?; channels = channels_str .map(|c| { u16::from_str_radix(c, 10) .ok() .and_then(NonZeroU16::new) - .ok_or_else(|| format_err!("Invalid channels specification {:?}", c)) + .ok_or_else(|| format!("Invalid channels specification {:?}", c)) }) .transpose()?; } @@ -330,7 +329,7 @@ fn parse_media( .get(usize::from(rtp_payload_type)) .and_then(Option::as_ref) .ok_or_else(|| { - format_err!( + format!( "Expected rtpmap parameter or assigned static payload type (got {})", rtp_payload_type ) @@ -339,11 +338,10 @@ fn parse_media( clock_rate = type_.clock_rate; channels = type_.channels; if type_.media != media { - bail!( + return Err(format!( "SDP media type {} must match RTP payload type {:#?}", - &media, - type_ - ); + &media, type_ + )); } } } @@ -366,35 +364,48 @@ fn parse_media( } /// Parses a successful RTSP `DESCRIBE` response into a [Presentation]. +/// On error, returns a string which is expected to be packed into an `RtspProtocolError`. pub(crate) fn parse_describe( request_url: Url, - response: rtsp_types::Response, -) -> Result { + response: &rtsp_types::Response, +) -> Result { if !matches!(response.header(&rtsp_types::headers::CONTENT_TYPE), Some(v) if v.as_str() == "application/sdp") { - bail!( + return Err(format!( "Describe response not of expected application/sdp content type: {:#?}", &response - ); + )); } let sdp; { let mut cursor = std::io::Cursor::new(&response.body()[..]); - sdp = sdp::session_description::SessionDescription::unmarshal(&mut cursor)?; + sdp = + sdp::session_description::SessionDescription::unmarshal(&mut cursor).map_err(|e| { + format!( + "Unable to parse SDP: {}\n\n{:#?}", + e, + response.body().hex_dump() + ) + })?; if cursor.has_remaining() { - bail!( + return Err(format!( "garbage after sdp: {:?}", &response.body()[usize::try_from(cursor.position()).unwrap()..] - ); + )); } } // https://tools.ietf.org/html/rfc2326#appendix-C.1.1 let base_url = response .header(&rtsp_types::headers::CONTENT_BASE) - .or_else(|| response.header(&rtsp_types::headers::CONTENT_LOCATION)) - .map(|v| Url::parse(v.as_str())) + .map(|v| (rtsp_types::headers::CONTENT_BASE, v)) + .or_else(|| { + response + .header(&rtsp_types::headers::CONTENT_LOCATION) + .map(|v| (rtsp_types::headers::CONTENT_LOCATION, v)) + }) + .map(|(h, v)| Url::parse(v.as_str()).map_err(|e| format!("bad {} {:?}: {}", h, v, e))) .unwrap_or(Ok(request_url))?; let mut alt_base_url = base_url.clone(); alt_base_url.set_path(&format!("{}/", base_url.path())); @@ -410,7 +421,7 @@ pub(crate) fn parse_describe( break; } } - let control = control.ok_or_else(|| format_err!("no control url"))?; + let control = control.ok_or_else(|| "no control url".to_string())?; let streams = sdp .media_descriptions @@ -418,10 +429,9 @@ pub(crate) fn parse_describe( .enumerate() .map(|(i, m)| { parse_media(&base_url, &alt_base_url, &m) - .with_context(|_| format!("Unable to parse stream {}: {:#?}", i, &m)) - .map_err(Error::from) + .map_err(|e| format!("Unable to parse stream {}: {}\n\n{:#?}", i, &e, &m)) }) - .collect::, Error>>()?; + .collect::, String>>()?; let accept_dynamic_rate = matches!(response.header(&crate::X_ACCEPT_DYNAMIC_RATE), Some(h) if h.as_str() == "1"); @@ -445,43 +455,40 @@ pub(crate) struct SetupResponse<'a> { /// `session_id` is checked for assignment or reassignment. /// Returns an assigned interleaved channel id (implying the next channel id /// is also assigned) or errors. -pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result { +pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result { let session = response .header(&rtsp_types::headers::SESSION) - .ok_or_else(|| format_err!("SETUP response has no Session header"))?; + .ok_or_else(|| "Missing Session header".to_string())?; let session_id = match session.as_str().find(';') { None => session.as_str(), Some(i) => &session.as_str()[..i], }; let transport = response .header(&rtsp_types::headers::TRANSPORT) - .ok_or_else(|| format_err!("SETUP response has no Transport header"))?; + .ok_or_else(|| "Missing Transport header".to_string())?; let mut channel_id = None; let mut ssrc = None; for part in transport.as_str().split(';') { if let Some(v) = part.strip_prefix("ssrc=") { - let v = - u32::from_str_radix(v, 16).map_err(|_| format_err!("Unparseable ssrc {}", v))?; + let v = u32::from_str_radix(v, 16).map_err(|_| format!("Unparseable ssrc {}", v))?; ssrc = Some(v); break; } else if let Some(interleaved) = part.strip_prefix("interleaved=") { let mut channels = interleaved.splitn(2, '-'); let n = channels.next().expect("splitn returns at least one part"); - let n = - u8::from_str_radix(n, 10).map_err(|_| format_err!("bad channel number {}", n))?; + let n = u8::from_str_radix(n, 10).map_err(|_| format!("bad channel number {}", n))?; if let Some(m) = channels.next() { let m = u8::from_str_radix(m, 10) - .map_err(|_| format_err!("bad second channel number {}", m))?; + .map_err(|_| format!("bad second channel number {}", m))?; if n.checked_add(1) != Some(m) { - bail!("Expected adjacent channels; got {}-{}", n, m); + format!("Expected adjacent channels; got {}-{}", n, m); } } channel_id = Some(n); } } - let channel_id = channel_id.ok_or_else(|| { - format_err!("SETUP response Transport header has no interleaved parameter") - })?; + let channel_id = + channel_id.ok_or_else(|| "Transport header has no interleaved parameter".to_string())?; Ok(SetupResponse { session_id, ssrc, @@ -489,14 +496,15 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result, + response: &rtsp_types::Response, presentation: &mut Presentation, -) -> Result<(), Error> { +) -> Result<(), String> { // https://tools.ietf.org/html/rfc2326#section-12.33 let rtp_info = response .header(&rtsp_types::headers::RTP_INFO) - .ok_or_else(|| format_err!("PLAY response has no RTP-Info header"))?; + .ok_or_else(|| "PLAY response has no RTP-Info header".to_string())?; for s in rtp_info.as_str().split(',') { let s = s.trim(); let mut parts = s.split(';'); @@ -504,7 +512,7 @@ pub(crate) fn parse_play( .next() .expect("split always returns at least one part") .strip_prefix("url=") - .ok_or_else(|| format_err!("RTP-Info missing stream URL"))?; + .ok_or_else(|| "RTP-Info missing stream URL".to_string())?; let url = join_control(&presentation.base_url, url)?; let mut stream; if presentation.streams.len() == 1 { @@ -533,7 +541,7 @@ pub(crate) fn parse_play( .find(|s| matches!(&s.alt_control, Some(u) if u == &url)); } } - let stream = stream.ok_or_else(|| format_err!("can't find RTP-Info stream {}", url))?; + let stream = stream.ok_or_else(|| format!("RTP-Info contains unknown stream {}", url))?; let state = match &mut stream.state { super::StreamState::Uninit => { // This appears to happen for Reolink devices when we did not send a SETUP request @@ -551,21 +559,21 @@ pub(crate) fn parse_play( for part in parts { let (key, value) = part .split_once('=') - .ok_or_else(|| format_err!("RTP-Info param has no ="))?; + .ok_or_else(|| "RTP-Info param has no =".to_string())?; match key { "seq" => { let seq = u16::from_str_radix(value, 10) - .map_err(|_| format_err!("bad seq {:?}", value))?; + .map_err(|_| format!("bad seq {:?}", value))?; state.initial_seq = Some(seq); } "rtptime" => { let rtptime = u32::from_str_radix(value, 10) - .map_err(|_| format_err!("bad rtptime {:?}", value))?; + .map_err(|_| format!("bad rtptime {:?}", value))?; state.initial_rtptime = Some(rtptime); } "ssrc" => { let ssrc = u32::from_str_radix(value, 16) - .map_err(|_| format_err!("Unparseable ssrc {}", value))?; + .map_err(|_| format!("Unparseable ssrc {}", value))?; state.ssrc = Some(ssrc); } _ => {} @@ -580,7 +588,6 @@ mod tests { use std::num::NonZeroU16; use bytes::Bytes; - use failure::Error; use url::Url; use crate::{client::StreamStateInit, codec::Parameters}; @@ -599,9 +606,9 @@ mod tests { fn parse_describe( raw_url: &str, raw_response: &'static [u8], - ) -> Result { + ) -> Result { let url = Url::parse(raw_url).unwrap(); - super::parse_describe(url, response(raw_response)) + super::parse_describe(url, &response(raw_response)) } #[test] @@ -681,7 +688,7 @@ mod tests { }); // PLAY. - super::parse_play(response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap(); + super::parse_play(&response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap(); match &p.streams[0].state { StreamState::Init(s) => { assert_eq!(s.initial_seq, Some(47121)); @@ -779,7 +786,7 @@ mod tests { // PLAY. super::parse_play( - response(include_bytes!("testdata/hikvision_play.txt")), + &response(include_bytes!("testdata/hikvision_play.txt")), &mut p, ) .unwrap(); @@ -851,7 +858,7 @@ mod tests { // PLAY. super::parse_play( - response(include_bytes!("testdata/reolink_play.txt")), + &response(include_bytes!("testdata/reolink_play.txt")), &mut p, ) .unwrap(); @@ -928,7 +935,7 @@ mod tests { p.streams[1].state = StreamState::Init(StreamStateInit::default()); // PLAY. - super::parse_play(response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap(); + super::parse_play(&response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap(); match p.streams[1].state { StreamState::Init(state) => { assert_eq!(state.initial_rtptime, Some(0)); @@ -1055,7 +1062,7 @@ mod tests { // PLAY. super::parse_play( - response(include_bytes!("testdata/gw_main_play.txt")), + &response(include_bytes!("testdata/gw_main_play.txt")), &mut p, ) .unwrap(); @@ -1119,7 +1126,11 @@ mod tests { }); // PLAY. - super::parse_play(response(include_bytes!("testdata/gw_sub_play.txt")), &mut p).unwrap(); + super::parse_play( + &response(include_bytes!("testdata/gw_sub_play.txt")), + &mut p, + ) + .unwrap(); match &p.streams[0].state { StreamState::Init(s) => { assert_eq!(s.initial_seq, Some(273)); diff --git a/src/client/rtp.rs b/src/client/rtp.rs index cd21f7c..cc52546 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -4,17 +4,19 @@ //! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550). use bytes::{Buf, Bytes}; -use failure::{bail, format_err, Error}; -use log::{debug, trace}; +use log::debug; use pretty_hex::PrettyHex; use crate::client::PacketItem; +use crate::{Error, ErrorInt}; -/// An RTP packet. +/// A received RTP packet. pub struct Packet { - pub rtsp_ctx: crate::Context, + pub ctx: crate::RtspMessageContext, + pub channel_id: u8, pub stream_id: usize, pub timestamp: crate::Timestamp, + pub ssrc: u32, pub sequence_number: u16, /// Number of skipped sequence numbers since the last packet. @@ -22,7 +24,7 @@ pub struct Packet { /// In the case of the first packet on the stream, this may also report loss /// packets since the `RTP-Info` header's `seq` value. However, currently /// that header is not required to be present and may be ignored (see - /// [`crate::client::PlayPolicy::ignore_zero_seq()`].) + /// [`retina::client::PlayPolicy::ignore_zero_seq()`].) pub loss: u16, pub mark: bool, @@ -34,9 +36,11 @@ pub struct Packet { impl std::fmt::Debug for Packet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Packet") - .field("rtsp_ctx", &self.rtsp_ctx) + .field("ctx", &self.ctx) + .field("channel_id", &self.channel_id) .field("stream_id", &self.stream_id) .field("timestamp", &self.timestamp) + .field("ssrc", &self.ssrc) .field("sequence_number", &self.sequence_number) .field("loss", &self.loss) .field("mark", &self.mark) @@ -49,7 +53,7 @@ impl std::fmt::Debug for Packet { #[derive(Debug)] pub struct SenderReport { pub stream_id: usize, - pub rtsp_ctx: crate::Context, + pub ctx: crate::RtspMessageContext, pub timestamp: crate::Timestamp, pub ntp_timestamp: crate::NtpTimestamp, } @@ -83,8 +87,10 @@ impl StrictSequenceChecker { pub fn rtp( &mut self, - rtsp_ctx: crate::Context, + conn_ctx: &crate::ConnectionContext, + msg_ctx: &crate::RtspMessageContext, timeline: &mut super::Timeline, + channel_id: u8, stream_id: usize, mut data: Bytes, ) -> Result { @@ -102,58 +108,70 @@ impl StrictSequenceChecker { } let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| { - format_err!( - "corrupt RTP header while expecting seq={:04x?} at {:#?}: {:?}\n{:#?}", - self.next_seq, - &rtsp_ctx, - e, - data.hex_dump() - ) + wrap!(ErrorInt::RtspDataMessageError { + conn_ctx: *conn_ctx, + msg_ctx: *msg_ctx, + channel_id, + stream_id, + description: format!( + "corrupt RTP header while expecting seq={:04x?}: {:?}\n{:#?}", + &self.next_seq, + e, + data.hex_dump(), + ), + }) })?; let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq. + let ssrc = reader.ssrc(); let timestamp = match timeline.advance_to(reader.timestamp()) { Ok(ts) => ts, - Err(e) => { - return Err(e - .context(format!( - "timestamp error in stream {} seq={:04x} {:#?}", - stream_id, sequence_number, &rtsp_ctx - )) - .into()) - } - }; - let ssrc = reader.ssrc(); - let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); - if matches!(self.ssrc, Some(s) if s != ssrc) || loss > 0x80_00 { - bail!( - "Expected ssrc={:08x?} seq={:04x?} got ssrc={:08x} seq={:04x} ts={} at {:#?}", - self.ssrc, - self.next_seq, + Err(description) => bail!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + msg_ctx: *msg_ctx, + channel_id, + stream_id, ssrc, sequence_number, - timestamp, - &rtsp_ctx - ); + description, + }), + }; + let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); + if matches!(self.ssrc, Some(s) if s != ssrc) || loss > 0x80_00 { + bail!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + msg_ctx: *msg_ctx, + channel_id, + stream_id, + ssrc, + sequence_number, + description: format!( + "Expected ssrc={:08x?} seq={:04x?}", + self.ssrc, self.next_seq + ), + }); } self.ssrc = Some(ssrc); let mark = reader.mark(); - let payload_range = crate::as_range(&data, reader.payload()) - .ok_or_else(|| format_err!("empty payload at {:#?}", &rtsp_ctx))?; - trace!( - "{:?} pkt {:04x}{} ts={} len={}", - &rtsp_ctx, - sequence_number, - if mark { " " } else { "(M)" }, - ×tamp, - payload_range.len() - ); + let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| { + wrap!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + msg_ctx: *msg_ctx, + channel_id, + stream_id, + ssrc, + sequence_number, + description: "empty payload".into(), + }) + })?; data.truncate(payload_range.end); data.advance(payload_range.start); self.next_seq = Some(sequence_number.wrapping_add(1)); Ok(PacketItem::RtpPacket(Packet { + ctx: *msg_ctx, + channel_id, stream_id, - rtsp_ctx, timestamp, + ssrc, sequence_number, loss, mark, @@ -163,56 +181,45 @@ impl StrictSequenceChecker { pub fn rtcp( &mut self, - rtsp_ctx: crate::Context, + msg_ctx: &crate::RtspMessageContext, timeline: &mut super::Timeline, stream_id: usize, mut data: Bytes, - ) -> Result, Error> { + ) -> Result, String> { use rtcp::packet::Packet; let mut sr = None; let mut i = 0; while !data.is_empty() { let h = match rtcp::header::Header::unmarshal(&data) { - Err(e) => bail!("corrupt RTCP header at {:#?}: {}", &rtsp_ctx, e), + Err(e) => return Err(format!("corrupt RTCP header: {}", e)), Ok(h) => h, }; let pkt_len = (usize::from(h.length) + 1) * 4; if pkt_len > data.len() { - bail!( - "rtcp pkt len {} vs remaining body len {} at {:#?}", + return Err(format!( + "RTCP packet length {} exceeds remaining data message length {}", pkt_len, data.len(), - &rtsp_ctx - ); + )); } let pkt = data.split_to(pkt_len); match h.packet_type { rtcp::header::PacketType::SenderReport => { if i > 0 { - bail!("RTCP SR must be first in packet"); + return Err("RTCP SR must be first in packet".into()); } - let pkt = match rtcp::sender_report::SenderReport::unmarshal(&pkt) { - Err(e) => bail!("corrupt RTCP SR at {:#?}: {}", &rtsp_ctx, e), - Ok(p) => p, - }; - - let timestamp = match timeline.place(pkt.rtp_time) { - Ok(ts) => ts, - Err(e) => { - return Err(e - .context(format!( - "bad RTP timestamp in RTCP SR {:#?} at {:#?}", - &pkt, &rtsp_ctx - )) - .into()) - } - }; + let pkt = rtcp::sender_report::SenderReport::unmarshal(&pkt) + .map_err(|e| format!("corrupt RTCP SR: {}", e))?; + let timestamp = timeline.place(pkt.rtp_time).map_err(|mut description| { + description.push_str(" in RTCP SR"); + description + })?; // TODO: verify ssrc. sr = Some(SenderReport { stream_id, - rtsp_ctx, + ctx: *msg_ctx, timestamp, ntp_timestamp: crate::NtpTimestamp(pkt.ntp_time), }); diff --git a/src/client/timeline.rs b/src/client/timeline.rs index 6fad322..4a36916 100644 --- a/src/client/timeline.rs +++ b/src/client/timeline.rs @@ -1,11 +1,8 @@ // Copyright (C) 2021 Scott Lamb // SPDX-License-Identifier: MIT OR Apache-2.0 -use failure::{bail, format_err, Error}; -use std::{ - convert::TryFrom, - num::{NonZeroI32, NonZeroU32}, -}; +use std::convert::TryFrom; +use std::num::{NonZeroI32, NonZeroU32}; use crate::Timestamp; @@ -34,19 +31,19 @@ impl Timeline { start: Option, clock_rate: u32, enforce_with_max_forward_jump_secs: Option, - ) -> Result { + ) -> Result { let clock_rate = NonZeroU32::new(clock_rate) - .ok_or_else(|| format_err!("clock_rate=0 rejected to prevent division by zero"))?; - let max_forward_jump = - enforce_with_max_forward_jump_secs - .map(|j| i32::try_from(u64::from(j.get()) * u64::from(clock_rate.get()))) - .transpose() - .map_err(|_| { - format_err!( - "clock_rate={} rejected because max forward jump of {} sec exceeds i32::MAX", - clock_rate, MAX_FORWARD_TIME_JUMP_SECS) - })? - .map(|j| NonZeroI32::new(j).expect("non-zero times non-zero must be non-zero")); + .ok_or_else(|| "clock_rate=0 rejected to prevent division by zero".to_string())?; + let max_forward_jump = enforce_with_max_forward_jump_secs + .map(|j| i32::try_from(u64::from(j.get()) * u64::from(clock_rate.get()))) + .transpose() + .map_err(|_| { + format!( + "clock_rate={} rejected because max forward jump of {} sec exceeds i32::MAX", + clock_rate, MAX_FORWARD_TIME_JUMP_SECS + ) + })? + .map(|j| NonZeroI32::new(j).expect("non-zero times non-zero must be non-zero")); Ok(Timeline { timestamp: i64::from(start.unwrap_or(0)), start, @@ -62,10 +59,10 @@ impl Timeline { /// /// If enforcement was enabled, this produces a monotonically increasing /// [Timestamp], erroring on excessive or backward time jumps. - pub fn advance_to(&mut self, rtp_timestamp: u32) -> Result { + pub fn advance_to(&mut self, rtp_timestamp: u32) -> Result { let (timestamp, delta) = self.ts_and_delta(rtp_timestamp)?; if matches!(self.max_forward_jump, Some(j) if !(0..j.get()).contains(&delta)) { - bail!( + return Err(format!( "Timestamp jumped {} ({:.03} sec) from {} to {}; \ policy is to allow 0..{} sec only", delta, @@ -73,7 +70,7 @@ impl Timeline { self.timestamp, timestamp, self.max_forward_jump_secs - ); + )); } self.timestamp = timestamp.timestamp; Ok(timestamp) @@ -84,11 +81,11 @@ impl Timeline { /// /// This is useful for RTP timestamps in RTCP packets. They commonly refer /// to time slightly before the most timestamp of the matching RTP stream. - pub fn place(&mut self, rtp_timestamp: u32) -> Result { + pub fn place(&mut self, rtp_timestamp: u32) -> Result { Ok(self.ts_and_delta(rtp_timestamp)?.0) } - fn ts_and_delta(&mut self, rtp_timestamp: u32) -> Result<(Timestamp, i32), Error> { + fn ts_and_delta(&mut self, rtp_timestamp: u32) -> Result<(Timestamp, i32), String> { let start = match self.start { None => { self.start = Some(rtp_timestamp); @@ -106,21 +103,18 @@ impl Timeline { // take ~2^31 packets (~ 4 billion) to advance the time this far // forward or backward even with no limits on time jump per // packet. - format_err!( + format!( "timestamp {} + delta {} won't fit in i64!", - self.timestamp, - delta + self.timestamp, delta ) })?; // Also error in similarly-unlikely NPT underflow. if timestamp.checked_sub(i64::from(start)).is_none() { - bail!( + return Err(format!( "timestamp {} + delta {} - start {} underflows i64!", - self.timestamp, - delta, - start - ); + self.timestamp, delta, start + )); } Ok(( Timestamp { diff --git a/src/codec/aac.rs b/src/codec/aac.rs index 7e9a897..6f78ad6 100644 --- a/src/codec/aac.rs +++ b/src/codec/aac.rs @@ -15,14 +15,13 @@ //! * ISO/IEC 14496-14: MP4 File Format. use bytes::{Buf, BufMut, Bytes, BytesMut}; -use failure::{bail, format_err, Error}; use std::{ convert::TryFrom, fmt::Debug, num::{NonZeroU16, NonZeroU32}, }; -use crate::client::rtp::Packet; +use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error}; use super::CodecItem; @@ -64,15 +63,25 @@ const CHANNEL_CONFIGS: [Option; 8] = [ impl AudioSpecificConfig { /// Parses from raw bytes. - fn parse(config: &[u8]) -> Result { + fn parse(config: &[u8]) -> Result { let mut r = bitreader::BitReader::new(config); - let audio_object_type = match r.read_u8(5)? { - 31 => 32 + r.read_u8(6)?, + let audio_object_type = match r + .read_u8(5) + .map_err(|e| format!("unable to read audio_object_type: {}", e))? + { + 31 => { + 32 + r + .read_u8(6) + .map_err(|e| format!("unable to read audio_object_type ext: {}", e))? + } o => o, }; // ISO/IEC 14496-3 section 1.6.3.4. - let sampling_frequency = match r.read_u8(4)? { + let sampling_frequency = match r + .read_u8(4) + .map_err(|e| format!("unable to read sampling_frequency: {}", e))? + { 0x0 => 96_000, 0x1 => 88_200, 0x2 => 64_000, @@ -85,39 +94,58 @@ impl AudioSpecificConfig { 0xa => 11_025, 0xb => 8_000, 0xc => 7_350, - v @ 0xd | v @ 0xe => bail!("reserved sampling_frequency_index value 0x{:x}", v), - 0xf => r.read_u32(24)?, + v @ 0xd | v @ 0xe => { + return Err(format!("reserved sampling_frequency_index value 0x{:x}", v)) + } + 0xf => r + .read_u32(24) + .map_err(|e| format!("unable to read sampling_frequency ext: {}", e))?, _ => unreachable!(), }; let channels = { - let c = r.read_u8(4)?; + let c = r + .read_u8(4) + .map_err(|e| format!("unable to read channels: {}", e))?; CHANNEL_CONFIGS .get(usize::from(c)) - .ok_or_else(|| format_err!("reserved channelConfiguration 0x{:x}", c))? + .ok_or_else(|| format!("reserved channelConfiguration 0x{:x}", c))? .as_ref() - .ok_or_else(|| format_err!("program_config_element parsing unimplemented"))? + .ok_or_else(|| "program_config_element parsing unimplemented".to_string())? }; if audio_object_type == 5 || audio_object_type == 29 { // extensionSamplingFrequencyIndex + extensionSamplingFrequency. - if r.read_u8(4)? == 0xf { - r.skip(24)?; + if r.read_u8(4) + .map_err(|e| format!("unable to read extensionSamplingFrequencyIndex: {}", e))? + == 0xf + { + r.skip(24) + .map_err(|e| format!("unable to read extensionSamplingFrequency: {}", e))?; } // audioObjectType (a different one) + extensionChannelConfiguration. - if r.read_u8(5)? == 22 { - r.skip(4)?; + if r.read_u8(5) + .map_err(|e| format!("unable to read second audioObjectType: {}", e))? + == 22 + { + r.skip(4) + .map_err(|e| format!("unable to read extensionChannelConfiguration: {}", e))?; } } // The supported types here are the ones that use GASpecificConfig. match audio_object_type { 1 | 2 | 3 | 4 | 6 | 7 | 17 | 19 | 20 | 21 | 22 | 23 => {} - o => bail!("unsupported audio_object_type {}", o), + o => return Err(format!("unsupported audio_object_type {}", o)), } // GASpecificConfig, ISO/IEC 14496-3 section 4.4.1. - let frame_length = match (audio_object_type, r.read_bool()?) { + let frame_length_flag = r + .read_bool() + .map_err(|e| format!("unable to read frame_length_flag: {}", e))?; + let frame_length = match (audio_object_type, frame_length_flag) { (3 /* AAC SR */, false) => NonZeroU16::new(256).expect("non-zero"), - (3 /* AAC SR */, true) => bail!("frame_length_flag must be false for AAC SSR"), + (3 /* AAC SR */, true) => { + return Err("frame_length_flag must be false for AAC SSR".into()) + } (23 /* ER AAC LD */, false) => NonZeroU16::new(512).expect("non-zero"), (23 /* ER AAC LD */, true) => NonZeroU16::new(480).expect("non-zero"), (_, false) => NonZeroU16::new(1024).expect("non-zero"), @@ -135,7 +163,7 @@ impl AudioSpecificConfig { /// Overwrites a buffer with a varint length, returning the length of the length. /// See ISO/IEC 14496-1 section 8.3.3. -fn set_length(len: usize, data: &mut [u8]) -> Result { +fn set_length(len: usize, data: &mut [u8]) -> Result { if len < 1 << 7 { data[0] = len as u8; Ok(1) @@ -156,7 +184,7 @@ fn set_length(len: usize, data: &mut [u8]) -> Result { Ok(4) } else { // BaseDescriptor sets a maximum length of 2**28 - 1. - bail!("length {} too long", len); + return Err(format!("length {} too long", len)); } } @@ -173,7 +201,11 @@ macro_rules! write_box { }; let pos_end = $buf.len(); let len = pos_end.checked_sub(pos_start).unwrap(); - $buf[pos_start..pos_start + 4].copy_from_slice(&u32::try_from(len)?.to_be_bytes()[..]); + $buf[pos_start..pos_start + 4].copy_from_slice( + &u32::try_from(len) + .map_err(|_| format!("box length {} exceeds u32::MAX", len))? + .to_be_bytes()[..], + ); r }}; } @@ -208,7 +240,7 @@ macro_rules! write_descriptor { /// Returns an MP4AudioSampleEntry (`mp4a`) box as in ISO/IEC 14496-14 section 5.6.1. /// `config` should be a raw AudioSpecificConfig (matching `parsed`). -pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result { +pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result { let parsed = match parameters.config { super::AudioCodecConfig::Aac(ref c) => c, _ => unreachable!(), @@ -238,7 +270,7 @@ pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result Result Result { +) -> Result { let mut mode = None; let mut config = None; let mut size_length = None; @@ -321,29 +353,28 @@ fn parse_format_specific_params( } let (key, value) = p .split_once('=') - .ok_or_else(|| format_err!("bad format-specific-param {}", p))?; + .ok_or_else(|| format!("bad format-specific-param {}", p))?; match &key.to_ascii_lowercase()[..] { "config" => { config = Some( hex::decode(value) - .map_err(|_| format_err!("config has invalid hex encoding"))?, + .map_err(|_| "config has invalid hex encoding".to_string())?, ); } "mode" => mode = Some(value), "sizelength" => { - size_length = Some( - u16::from_str_radix(value, 10).map_err(|_| format_err!("bad sizeLength"))?, - ); + size_length = + Some(u16::from_str_radix(value, 10).map_err(|_| "bad sizeLength".to_string())?); } "indexlength" => { index_length = Some( - u16::from_str_radix(value, 10).map_err(|_| format_err!("bad indexLength"))?, + u16::from_str_radix(value, 10).map_err(|_| "bad indexLength".to_string())?, ); } "indexdeltalength" => { index_delta_length = Some( u16::from_str_radix(value, 10) - .map_err(|_| format_err!("bad indexDeltaLength"))?, + .map_err(|_| "bad indexDeltaLength".to_string())?, ); } _ => {} @@ -351,27 +382,24 @@ fn parse_format_specific_params( } // https://datatracker.ietf.org/doc/html/rfc3640#section-3.3.6 AAC-hbr if mode != Some("AAC-hbr") { - bail!("Expected mode AAC-hbr, got {:#?}", mode); + return Err(format!("Expected mode AAC-hbr, got {:#?}", mode)); } - let config = config.ok_or_else(|| format_err!("config must be specified"))?; + let config = config.ok_or_else(|| "config must be specified".to_string())?; if size_length != Some(13) || index_length != Some(3) || index_delta_length != Some(3) { - bail!( + return Err(format!( "Unexpected sizeLength={:?} indexLength={:?} indexDeltaLength={:?}", - size_length, - index_length, - index_delta_length - ); + size_length, index_length, index_delta_length + )); } let parsed = AudioSpecificConfig::parse(&config[..])?; // TODO: is this a requirement? I might have read somewhere one can be a multiple of the other. if clock_rate != parsed.sampling_frequency { - bail!( + return Err(format!( "Expected RTP clock rate {} and AAC sampling frequency {} to match", - clock_rate, - parsed.sampling_frequency - ); + clock_rate, parsed.sampling_frequency + )); } // https://datatracker.ietf.org/doc/html/rfc6381#section-3.3 @@ -397,7 +425,7 @@ pub(crate) struct Depacketizer { #[derive(Debug)] struct Aggregate { - ctx: crate::Context, + ctx: crate::RtspMessageContext, /// RTP packets lost before the next frame in this aggregate. Includes old /// loss that caused a previous fragment to be too short. @@ -410,7 +438,10 @@ struct Aggregate { /// to be too short. loss_since_mark: bool, + channel_id: u8, stream_id: usize, + ssrc: u32, + sequence_number: u16, /// The RTP-level timestamp; frame `i` is at timestamp `timestamp + frame_length*i`. timestamp: crate::Timestamp, @@ -463,20 +494,19 @@ impl Depacketizer { clock_rate: u32, channels: Option, format_specific_params: Option<&str>, - ) -> Result { + ) -> Result { let format_specific_params = format_specific_params - .ok_or_else(|| format_err!("AAC requires format specific params"))?; + .ok_or_else(|| "AAC requires format specific params".to_string())?; let parameters = parse_format_specific_params(clock_rate, format_specific_params)?; let parsed = match parameters.config { super::AudioCodecConfig::Aac(ref c) => c, _ => unreachable!(), }; if matches!(channels, Some(c) if c.get() != parsed.channels.channels) { - bail!( + return Err(format!( "Expected RTP channels {:?} and AAC channels {:?} to match", - channels, - parsed.channels - ); + channels, parsed.channels + )); } let frame_length = parsed.frame_length; Ok(Self { @@ -490,7 +520,7 @@ impl Depacketizer { Some(&self.parameters) } - pub(super) fn push(&mut self, mut pkt: Packet) -> Result<(), Error> { + pub(super) fn push(&mut self, mut pkt: Packet) -> Result<(), String> { if pkt.loss > 0 && matches!(self.state, DepacketizerState::Fragmented(_)) { log::debug!( "Discarding fragmented AAC frame due to loss of {} RTP packets.", @@ -501,38 +531,37 @@ impl Depacketizer { // Read the AU headers. if pkt.payload.len() < 2 { - bail!("packet too short for au-header-length"); + return Err("packet too short for au-header-length".to_string()); } let au_headers_length_bits = pkt.payload.get_u16(); // AAC-hbr requires 16-bit AU headers: 13-bit size, 3-bit index. if (au_headers_length_bits & 0x7) != 0 { - bail!("bad au-headers-length {}", au_headers_length_bits); + return Err(format!("bad au-headers-length {}", au_headers_length_bits)); } let au_headers_count = au_headers_length_bits >> 4; let data_off = usize::from(au_headers_count) << 1; if pkt.payload.len() < (usize::from(au_headers_count) << 1) { - bail!("packet too short for au-headers"); + return Err("packet too short for au-headers".to_string()); } match &mut self.state { DepacketizerState::Fragmented(ref mut frag) => { if au_headers_count != 1 { - bail!( + return Err(format!( "Got {}-AU packet while fragment in progress", au_headers_count - ); + )); } if (pkt.timestamp.timestamp as u16) != frag.rtp_timestamp { - bail!( + return Err(format!( "Timestamp changed from 0x{:04x} to 0x{:04x} mid-fragment", - frag.rtp_timestamp, - pkt.timestamp.timestamp as u16 - ); + frag.rtp_timestamp, pkt.timestamp.timestamp as u16 + )); } let au_header = u16::from_be_bytes([pkt.payload[0], pkt.payload[1]]); let size = usize::from(au_header >> 3); if size != usize::from(frag.size) { - bail!("size changed {}->{} mid-fragment", frag.size, size); + return Err(format!("size changed {}->{} mid-fragment", frag.size, size)); } let data = &pkt.payload[data_off..]; match (frag.buf.len() + data.len()).cmp(&size) { @@ -544,22 +573,24 @@ impl Depacketizer { }; return Ok(()); } - bail!( + return Err(format!( "frag marked complete when {}+{}<{}", frag.buf.len(), data.len(), size - ); + )); } } std::cmp::Ordering::Equal => { if !pkt.mark { - bail!("frag not marked complete when full data present"); + return Err( + "frag not marked complete when full data present".to_string() + ); } frag.buf.extend_from_slice(data); println!("au {}: len-{}, fragmented", &pkt.timestamp, size); self.state = DepacketizerState::Ready(super::AudioFrame { - ctx: pkt.rtsp_ctx, + ctx: pkt.ctx, loss: frag.loss, frame_length: NonZeroU32::from(self.frame_length), stream_id: pkt.stream_id, @@ -567,19 +598,22 @@ impl Depacketizer { data: std::mem::take(&mut frag.buf).freeze(), }); } - std::cmp::Ordering::Greater => bail!("too much data in fragment"), + std::cmp::Ordering::Greater => return Err("too much data in fragment".into()), } } DepacketizerState::Aggregated(_) => panic!("push when already in state aggregated"), DepacketizerState::Idle { prev_loss } => { if au_headers_count == 0 { - bail!("aggregate with no headers"); + return Err("aggregate with no headers".to_string()); } self.state = DepacketizerState::Aggregated(Aggregate { - ctx: pkt.rtsp_ctx, + ctx: pkt.ctx, loss: *prev_loss + pkt.loss, loss_since_mark: pkt.loss > 0, + channel_id: pkt.channel_id, stream_id: pkt.stream_id, + ssrc: pkt.ssrc, + sequence_number: pkt.sequence_number, timestamp: pkt.timestamp, buf: pkt.payload, frame_i: 0, @@ -593,7 +627,10 @@ impl Depacketizer { Ok(()) } - pub(super) fn pull(&mut self) -> Result, Error> { + pub(super) fn pull( + &mut self, + conn_ctx: &ConnectionContext, + ) -> Result, Error> { match std::mem::replace(&mut self.state, DepacketizerState::Idle { prev_loss: 0 }) { s @ DepacketizerState::Idle { .. } | s @ DepacketizerState::Fragmented(..) => { self.state = s; @@ -613,15 +650,27 @@ impl Depacketizer { // indicate interleaving, which we don't support. // TODO: https://datatracker.ietf.org/doc/html/rfc3640#section-3.3.6 // says "receivers MUST support de-interleaving". - bail!("interleaving not yet supported"); + return Err(error( + *conn_ctx, + agg, + "interleaving not yet supported".to_owned(), + )); } if size > agg.buf.len() - agg.data_off { // start of fragment if agg.frame_count != 1 { - bail!("fragmented AUs must not share packets"); + return Err(error( + *conn_ctx, + agg, + "fragmented AUs must not share packets".to_owned(), + )); } if agg.mark { - bail!("mark can't be set on beginning of fragment"); + return Err(error( + *conn_ctx, + agg, + "mark can't be set on beginning of fragment".to_owned(), + )); } let mut buf = BytesMut::with_capacity(size); buf.extend_from_slice(&agg.buf[agg.data_off..]); @@ -635,8 +684,15 @@ impl Depacketizer { return Ok(None); } if !agg.mark { - bail!("mark must be set on non-fragmented au"); + return Err(error( + *conn_ctx, + agg, + "mark must be set on non-fragmented au".to_owned(), + )); } + + let delta = u32::from(agg.frame_i) * u32::from(self.frame_length.get()); + let agg_timestamp = agg.timestamp; let frame = super::AudioFrame { ctx: agg.ctx, loss: agg.loss, @@ -644,9 +700,19 @@ impl Depacketizer { frame_length: NonZeroU32::from(self.frame_length), // u16 * u16 can't overflow u32, but i64 + u32 can overflow i64. - timestamp: agg - .timestamp - .try_add(u32::from(agg.frame_i) * u32::from(self.frame_length.get()))?, + timestamp: match agg_timestamp.try_add(delta) { + Some(t) => t, + None => { + return Err(error( + *conn_ctx, + agg, + format!( + "aggregate timestamp {} + {} overflows", + agg_timestamp, delta + ), + )) + } + }, data: agg.buf.slice(agg.data_off..agg.data_off + size), }; agg.loss = 0; @@ -661,6 +727,18 @@ impl Depacketizer { } } +fn error(conn_ctx: ConnectionContext, agg: Aggregate, description: String) -> Error { + Error(Box::new(ErrorInt::RtpPacketError { + conn_ctx, + msg_ctx: agg.ctx, + channel_id: agg.channel_id, + stream_id: agg.stream_id, + ssrc: agg.ssrc, + sequence_number: agg.sequence_number, + description, + })) +} + #[cfg(test)] mod tests { #[test] diff --git a/src/codec/g723.rs b/src/codec/g723.rs index 4f50676..10f4777 100644 --- a/src/codec/g723.rs +++ b/src/codec/g723.rs @@ -6,7 +6,6 @@ use std::num::NonZeroU32; use bytes::Bytes; -use failure::{bail, Error}; use pretty_hex::PrettyHex; #[derive(Debug)] @@ -17,9 +16,12 @@ pub(crate) struct Depacketizer { impl Depacketizer { /// Creates a new Depacketizer. - pub(super) fn new(clock_rate: u32) -> Result { + pub(super) fn new(clock_rate: u32) -> Result { if clock_rate != 8_000 { - bail!("Expected clock rate of 8000 for G.723, got {}", clock_rate); + return Err(format!( + "Expected clock rate of 8000 for G.723, got {}", + clock_rate + )); } Ok(Self { parameters: super::Parameters::Audio(super::AudioParameters { @@ -48,13 +50,16 @@ impl Depacketizer { actual_hdr_bits == expected_hdr_bits } - pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), Error> { + pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> { assert!(self.pending.is_none()); if !Self::validate(&pkt) { - bail!("Invalid G.723 packet: {:#?}", pkt.payload.hex_dump()); + return Err(format!( + "Invalid G.723 packet: {:#?}", + pkt.payload.hex_dump() + )); } self.pending = Some(super::AudioFrame { - ctx: pkt.rtsp_ctx, + ctx: pkt.ctx, loss: pkt.loss, stream_id: pkt.stream_id, timestamp: pkt.timestamp, @@ -64,7 +69,7 @@ impl Depacketizer { Ok(()) } - pub(super) fn pull(&mut self) -> Result, Error> { - Ok(self.pending.take().map(super::CodecItem::AudioFrame)) + pub(super) fn pull(&mut self) -> Option { + self.pending.take().map(super::CodecItem::AudioFrame) } } diff --git a/src/codec/h264.rs b/src/codec/h264.rs index 9ea8cdc..5134061 100644 --- a/src/codec/h264.rs +++ b/src/codec/h264.rs @@ -6,11 +6,10 @@ use std::convert::TryFrom; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use failure::{bail, format_err, Error}; use h264_reader::nal::{NalHeader, UnitType}; use log::debug; -use crate::{client::rtp::Packet, Timestamp}; +use crate::{client::rtp::Packet, Error, Timestamp}; use super::VideoFrame; @@ -56,8 +55,8 @@ struct Nal { /// An access unit that is currently being accumulated during `PreMark` state. #[derive(Debug)] struct AccessUnit { - start_ctx: crate::Context, - end_ctx: crate::Context, + start_ctx: crate::RtspMessageContext, + end_ctx: crate::RtspMessageContext, timestamp: crate::Timestamp, stream_id: usize, @@ -69,7 +68,7 @@ struct AccessUnit { } #[derive(Debug)] -#[allow(clippy::clippy::large_enum_variant)] +#[allow(clippy::large_enum_variant)] enum DepacketizerInputState { /// Not yet processing an access unit. New, @@ -95,14 +94,17 @@ impl Depacketizer { pub(super) fn new( clock_rate: u32, format_specific_params: Option<&str>, - ) -> Result { + ) -> Result { if clock_rate != 90_000 { - bail!("H.264 clock rate must always be 90000"); + return Err(format!( + "invalid H.264 clock rate {}; must always be 90000", + clock_rate + )); } // TODO: the spec doesn't require out-of-band parameters, so we shouldn't either. let format_specific_params = format_specific_params - .ok_or_else(|| format_err!("H.264 depacketizer expects out-of-band parameters"))?; + .ok_or_else(|| "H.264 depacketizer expects out-of-band parameters".to_owned())?; Ok(Depacketizer { input_state: DepacketizerInputState::New, pending: None, @@ -116,105 +118,100 @@ impl Depacketizer { Some(&self.parameters.generic_parameters) } - pub(super) fn push(&mut self, pkt: Packet) -> Result<(), Error> { + pub(super) fn push(&mut self, pkt: Packet) -> Result<(), String> { // Push shouldn't be called until pull is exhausted. if let Some(p) = self.pending.as_ref() { panic!("push with data already pending: {:?}", p); } - let seq = pkt.sequence_number; - let mut access_unit = match std::mem::replace( - &mut self.input_state, - DepacketizerInputState::New, - ) { - DepacketizerInputState::New => { - debug_assert!(self.nals.is_empty()); - debug_assert!(self.pieces.is_empty()); - AccessUnit::start(&pkt, 0) - } - DepacketizerInputState::PreMark(mut access_unit) => { - if pkt.loss > 0 { - self.nals.clear(); - self.pieces.clear(); - if access_unit.timestamp.timestamp == pkt.timestamp.timestamp { - // Loss within this access unit. Ignore until mark or new timestamp. - self.input_state = if pkt.mark { - DepacketizerInputState::PostMark { - timestamp: pkt.timestamp, - loss: pkt.loss, - } - } else { - self.pieces.clear(); - self.nals.clear(); - DepacketizerInputState::Loss { - timestamp: pkt.timestamp, - pkts: pkt.loss, - } - }; + let mut access_unit = + match std::mem::replace(&mut self.input_state, DepacketizerInputState::New) { + DepacketizerInputState::New => { + debug_assert!(self.nals.is_empty()); + debug_assert!(self.pieces.is_empty()); + AccessUnit::start(&pkt, 0) + } + DepacketizerInputState::PreMark(mut access_unit) => { + if pkt.loss > 0 { + self.nals.clear(); + self.pieces.clear(); + if access_unit.timestamp.timestamp == pkt.timestamp.timestamp { + // Loss within this access unit. Ignore until mark or new timestamp. + self.input_state = if pkt.mark { + DepacketizerInputState::PostMark { + timestamp: pkt.timestamp, + loss: pkt.loss, + } + } else { + self.pieces.clear(); + self.nals.clear(); + DepacketizerInputState::Loss { + timestamp: pkt.timestamp, + pkts: pkt.loss, + } + }; + return Ok(()); + } + // A suffix of a previous access unit was lost; discard it. + // A prefix of the new one may have been lost; try parsing. + AccessUnit::start(&pkt, 0) + } else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp { + if access_unit.in_fu_a { + return Err(format!( + "Timestamp changed from {} to {} in the middle of a fragmented NAL", + access_unit.timestamp, pkt.timestamp + )); + } + access_unit.end_ctx = pkt.ctx; + self.pending = Some(self.finalize_access_unit(access_unit)?); + AccessUnit::start(&pkt, 0) + } else { + access_unit + } + } + DepacketizerInputState::PostMark { + timestamp: state_ts, + loss, + } => { + debug_assert!(self.nals.is_empty()); + debug_assert!(self.pieces.is_empty()); + if state_ts.timestamp == pkt.timestamp.timestamp { + return Err("packet follows marked packet with same timestamp".into()); + } + AccessUnit::start(&pkt, loss) + } + DepacketizerInputState::Loss { + timestamp, + mut pkts, + } => { + debug_assert!(self.nals.is_empty()); + debug_assert!(self.pieces.is_empty()); + if pkt.timestamp.timestamp == timestamp.timestamp { + pkts += pkt.loss; + self.input_state = DepacketizerInputState::Loss { timestamp, pkts }; return Ok(()); } - // A suffix of a previous access unit was lost; discard it. - // A prefix of the new one may have been lost; try parsing. - AccessUnit::start(&pkt, 0) - } else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp { - if access_unit.in_fu_a { - bail!("Timestamp changed from {} to {} in the middle of a fragmented NAL at seq={:04x} {:#?}", access_unit.timestamp, pkt.timestamp, seq, &pkt.rtsp_ctx); - } - access_unit.end_ctx = pkt.rtsp_ctx; - self.pending = Some(self.finalize_access_unit(access_unit)?); - AccessUnit::start(&pkt, 0) - } else { - access_unit + AccessUnit::start(&pkt, pkts) } - } - DepacketizerInputState::PostMark { - timestamp: state_ts, - loss, - } => { - debug_assert!(self.nals.is_empty()); - debug_assert!(self.pieces.is_empty()); - if state_ts.timestamp == pkt.timestamp.timestamp { - bail!("Received packet with timestamp {} after marked packet with same timestamp at seq={:04x} {:#?}", pkt.timestamp, seq, &pkt.rtsp_ctx); - } - AccessUnit::start(&pkt, loss) - } - DepacketizerInputState::Loss { - timestamp, - mut pkts, - } => { - debug_assert!(self.nals.is_empty()); - debug_assert!(self.pieces.is_empty()); - if pkt.timestamp.timestamp == timestamp.timestamp { - pkts += pkt.loss; - self.input_state = DepacketizerInputState::Loss { timestamp, pkts }; - return Ok(()); - } - AccessUnit::start(&pkt, pkts) - } - }; + }; let mut data = pkt.payload; if data.is_empty() { - bail!("Empty NAL at RTP seq {:04x}, {:#?}", seq, &pkt.rtsp_ctx); + return Err("Empty NAL".into()); } // https://tools.ietf.org/html/rfc6184#section-5.2 let nal_header = data[0]; if (nal_header >> 7) != 0 { - bail!( - "NAL header has F bit set at seq {:04x} {:#?}", - seq, - &pkt.rtsp_ctx - ); + return Err(format!("NAL header {:02x} has F bit set", nal_header)); } data.advance(1); // skip the header byte. match nal_header & 0b11111 { 1..=23 => { if access_unit.in_fu_a { - bail!( - "Non-fragmented NAL while fragment in progress seq {:04x} {:#?}", - seq, - &pkt.rtsp_ctx - ); + return Err(format!( + "Non-fragmented NAL {:02x} while fragment in progress", + nal_header + )); } let len = u32::try_from(data.len()).expect("data len < u16::MAX") + 1; let next_piece_idx = self.add_piece(data)?; @@ -228,24 +225,25 @@ impl Depacketizer { // STAP-A. https://tools.ietf.org/html/rfc6184#section-5.7.1 loop { if data.remaining() < 3 { - bail!( + return Err(format!( "STAP-A has {} remaining bytes; expecting 2-byte length, non-empty NAL", data.remaining() - ); + )); } let len = data.get_u16(); - //let len = usize::from(data.get_u16()); if len == 0 { - bail!("zero length in STAP-A"); + return Err("zero length in STAP-A".into()); } - let hdr = - NalHeader::new(data[0]).map_err(|_| format_err!("bad header in STAP-A"))?; + let hdr = NalHeader::new(data[0]) + .map_err(|_| format!("bad header {:02x} in STAP-A", data[0]))?; match data.remaining().cmp(&usize::from(len)) { - std::cmp::Ordering::Less => bail!( - "STAP-A too short: {} bytes remaining, expecting {}-byte NAL", - data.remaining(), - len - ), + std::cmp::Ordering::Less => { + return Err(format!( + "STAP-A too short: {} bytes remaining, expecting {}-byte NAL", + data.remaining(), + len + )) + } std::cmp::Ordering::Equal => { data.advance(1); let next_piece_idx = self.add_piece(data)?; @@ -269,21 +267,16 @@ impl Depacketizer { } } } - 25..=27 | 29 => bail!( - "unimplemented NAL (header 0x{:02x}) at seq {:04x} {:#?}", - nal_header, - seq, - &pkt.rtsp_ctx - ), + 25..=27 | 29 => { + return Err(format!( + "unimplemented/unexpected interleaved mode NAL ({:02x})", + nal_header, + )) + } 28 => { // FU-A. https://tools.ietf.org/html/rfc6184#section-5.8 if data.len() < 2 { - bail!( - "FU-A len {} too short at seq {:04x} {:#?}", - data.len(), - seq, - &pkt.rtsp_ctx - ); + return Err(format!("FU-A len {} too short", data.len())); } let fu_header = data[0]; let start = (fu_header & 0b10000000) != 0; @@ -294,27 +287,14 @@ impl Depacketizer { .expect("NalHeader is valid"); data.advance(1); if (start && end) || reserved { - bail!( - "Invalid FU-A header {:08b} at seq {:04x} {:#?}", - fu_header, - seq, - &pkt.rtsp_ctx - ); + return Err(format!("Invalid FU-A header {:02x}", fu_header)); } if !end && pkt.mark { - bail!( - "FU-A pkt with MARK && !END at seq {:04x} {:#?}", - seq, - &pkt.rtsp_ctx - ); + return Err("FU-A pkt with MARK && !END".into()); } let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX"); match (start, access_unit.in_fu_a) { - (true, true) => bail!( - "FU-A with start bit while frag in progress at seq {:04x} {:#?}", - seq, - &pkt.rtsp_ctx - ), + (true, true) => return Err("FU-A with start bit while frag in progress".into()), (true, false) => { self.add_piece(data)?; self.nals.push(Nal { @@ -328,24 +308,17 @@ impl Depacketizer { let pieces = self.add_piece(data)?; let nal = self.nals.last_mut().expect("nals non-empty while in fu-a"); if u8::from(nal_header) != u8::from(nal.hdr) { - bail!( - "FU-A has inconsistent NAL type: {:?} then {:?} at {:02x} {:?}", - nal.hdr, - nal_header, - seq, - &pkt.rtsp_ctx - ); + return Err(format!( + "FU-A has inconsistent NAL type: {:?} then {:?}", + nal.hdr, nal_header, + )); } nal.len += u32_len; if end { nal.next_piece_idx = pieces; access_unit.in_fu_a = false; } else if pkt.mark { - bail!( - "FU-A with MARK and no END at seq {:04x} {:#?}", - seq, - pkt.rtsp_ctx - ); + return Err("FU-A has MARK and no END".into()); } } (false, false) => { @@ -358,23 +331,14 @@ impl Depacketizer { }; return Ok(()); } - bail!( - "FU-A with start bit unset while no frag in progress at {:04x} {:#?}", - seq, - &pkt.rtsp_ctx - ); + return Err("FU-A has start bit unset while no frag in progress".into()); } } } - _ => bail!( - "bad nal header {:0x} at seq {:04x} {:#?}", - nal_header, - seq, - &pkt.rtsp_ctx - ), + _ => return Err(format!("bad nal header {:02x}", nal_header)), } self.input_state = if pkt.mark { - access_unit.end_ctx = pkt.rtsp_ctx; + access_unit.end_ctx = pkt.ctx; self.pending = Some(self.finalize_access_unit(access_unit)?); DepacketizerInputState::PostMark { timestamp: pkt.timestamp, @@ -386,17 +350,17 @@ impl Depacketizer { Ok(()) } - pub(super) fn pull(&mut self) -> Result, Error> { - Ok(self.pending.take().map(super::CodecItem::VideoFrame)) + pub(super) fn pull(&mut self) -> Option { + self.pending.take().map(super::CodecItem::VideoFrame) } /// Adds a piece to `self.pieces`, erroring if it becomes absurdly large. - fn add_piece(&mut self, piece: Bytes) -> Result { + fn add_piece(&mut self, piece: Bytes) -> Result { self.pieces.push(piece); - u32::try_from(self.pieces.len()).map_err(|_| format_err!("more than u32::MAX pieces!")) + u32::try_from(self.pieces.len()).map_err(|_| "more than u32::MAX pieces!".to_string()) } - fn finalize_access_unit(&mut self, au: AccessUnit) -> Result { + fn finalize_access_unit(&mut self, au: AccessUnit) -> Result { let mut piece_idx = 0; let mut retained_len = 0usize; let mut is_random_access_point = false; @@ -453,6 +417,7 @@ impl Depacketizer { let new_parameters = if new_sps.is_some() || new_pps.is_some() { let sps_nal = new_sps.as_deref().unwrap_or(&self.parameters.sps_nal); let pps_nal = new_pps.as_deref().unwrap_or(&self.parameters.pps_nal); + // TODO: could map this to a RtpPacketError more accurately. self.parameters = InternalParameters::parse_sps_and_pps(sps_nal, pps_nal)?; match self.parameters.generic_parameters { super::Parameters::Video(ref p) => Some(p.clone()), @@ -478,8 +443,8 @@ impl Depacketizer { impl AccessUnit { fn start(pkt: &crate::client::rtp::Packet, additional_loss: u16) -> Self { AccessUnit { - start_ctx: pkt.rtsp_ctx, - end_ctx: pkt.rtsp_ctx, + start_ctx: pkt.ctx, + end_ctx: pkt.ctx, timestamp: pkt.timestamp, stream_id: pkt.stream_id, in_fu_a: false, @@ -503,7 +468,7 @@ struct InternalParameters { impl InternalParameters { /// Parses metadata from the `format-specific-params` of a SDP `fmtp` media attribute. - fn parse_format_specific_params(format_specific_params: &str) -> Result { + fn parse_format_specific_params(format_specific_params: &str) -> Result { let mut sprop_parameter_sets = None; for p in format_specific_params.split(';') { let (key, value) = p.trim().split_once('=').unwrap(); @@ -511,38 +476,37 @@ impl InternalParameters { sprop_parameter_sets = Some(value); } } - let sprop_parameter_sets = sprop_parameter_sets.ok_or_else(|| { - format_err!("no sprop-parameter-sets in H.264 format-specific-params") - })?; + let sprop_parameter_sets = sprop_parameter_sets + .ok_or_else(|| "no sprop-parameter-sets in H.264 format-specific-params".to_string())?; let mut sps_nal = None; let mut pps_nal = None; for nal in sprop_parameter_sets.split(',') { let nal = - base64::decode(nal).map_err(|_| format_err!("NAL has invalid base64 encoding"))?; + base64::decode(nal).map_err(|_| "NAL has invalid base64 encoding".to_string())?; if nal.is_empty() { - bail!("empty NAL"); + return Err("empty NAL".into()); } let header = h264_reader::nal::NalHeader::new(nal[0]) - .map_err(|_| format_err!("bad NAL header {:0x}", nal[0]))?; + .map_err(|_| format!("bad NAL header {:0x}", nal[0]))?; match header.nal_unit_type() { UnitType::SeqParameterSet => { if sps_nal.is_some() { - bail!("multiple SPSs"); + return Err("multiple SPSs".into()); } sps_nal = Some(nal); } UnitType::PicParameterSet => { if pps_nal.is_some() { - bail!("multiple PPSs"); + return Err("multiple PPSs".into()); } pps_nal = Some(nal); } - _ => bail!("only SPS and PPS expected in parameter sets"), + _ => return Err("only SPS and PPS expected in parameter sets".into()), } } - let sps_nal = sps_nal.ok_or_else(|| format_err!("no sps"))?; - let pps_nal = pps_nal.ok_or_else(|| format_err!("no pps"))?; + let sps_nal = sps_nal.ok_or_else(|| "no sps".to_string())?; + let pps_nal = pps_nal.ok_or_else(|| "no pps".to_string())?; // GW security GW4089IP leaves Annex B start codes at the end of both // SPS and PPS in the sprop-parameter-sets. Leaving them in means @@ -557,22 +521,22 @@ impl InternalParameters { Self::parse_sps_and_pps(sps_nal, pps_nal) } - fn parse_sps_and_pps(sps_nal: &[u8], pps_nal: &[u8]) -> Result { + fn parse_sps_and_pps(sps_nal: &[u8], pps_nal: &[u8]) -> Result { let sps_rbsp = h264_reader::rbsp::decode_nal(&sps_nal[1..]); if sps_rbsp.len() < 4 { - bail!("bad sps"); + return Err("bad sps".into()); } let rfc6381_codec = format!( "avc1.{:02X}{:02X}{:02X}", sps_rbsp[0], sps_rbsp[1], sps_rbsp[2] ); let sps = h264_reader::nal::sps::SeqParameterSet::from_bytes(&sps_rbsp) - .map_err(|e| format_err!("Bad SPS: {:?}", e))?; + .map_err(|e| format!("Bad SPS: {:?}", e))?; debug!("sps: {:#?}", &sps); let pixel_dimensions = sps .pixel_dimensions() - .map_err(|e| format_err!("SPS has invalid pixel dimensions: {:?}", e))?; + .map_err(|e| format!("SPS has invalid pixel dimensions: {:?}", e))?; // Create the AVCDecoderConfiguration, ISO/IEC 14496-15 section 5.2.4.1. // The beginning of the AVCDecoderConfiguration takes a few values from @@ -591,12 +555,20 @@ impl InternalParameters { // ffmpeg's ff_isom_write_avcc has the same limitation, so it's probably // fine. This next byte is a reserved 0b111 + a 5-bit # of SPSs (1). avc_decoder_config.put_u8(0xe1); - avc_decoder_config.extend(&u16::try_from(sps_nal.len())?.to_be_bytes()[..]); + avc_decoder_config.extend( + &u16::try_from(sps_nal.len()) + .map_err(|_| format!("SPS NAL is {} bytes long; must fit in u16", sps_nal.len()))? + .to_be_bytes()[..], + ); let sps_nal_start = avc_decoder_config.len(); avc_decoder_config.extend_from_slice(sps_nal); let sps_nal_end = avc_decoder_config.len(); avc_decoder_config.put_u8(1); // # of PPSs. - avc_decoder_config.extend(&u16::try_from(pps_nal.len())?.to_be_bytes()[..]); + avc_decoder_config.extend( + &u16::try_from(pps_nal.len()) + .map_err(|_| format!("PPS NAL is {} bytes long; must fit in u16", pps_nal.len()))? + .to_be_bytes()[..], + ); let pps_nal_start = avc_decoder_config.len(); avc_decoder_config.extend_from_slice(pps_nal); let pps_nal_end = avc_decoder_config.len(); @@ -651,7 +623,7 @@ fn matches(nal: &[u8], hdr: NalHeader, pieces: &[Bytes]) -> bool { if nal.len() < new_pos { return false; } - if &piece[..] != &nal[nal_pos..new_pos] { + if piece[..] != nal[nal_pos..new_pos] { return false; } nal_pos = new_pos; @@ -688,10 +660,10 @@ impl Packetizer { max_payload_size: u16, stream_id: usize, initial_sequence_number: u16, - ) -> Result { + ) -> Result { if max_payload_size < 3 { // minimum size to make progress with FU-A packets. - bail!("max_payload_size must be > 3"); + return Err("max_payload_size must be > 3".into()); } Ok(Self { max_payload_size, @@ -707,39 +679,43 @@ impl Packetizer { Ok(()) } - pub fn pull(&mut self) -> Result, Error> { + // TODO: better error type? + pub fn pull(&mut self) -> Result, String> { let max_payload_size = usize::from(self.max_payload_size); match std::mem::replace(&mut self.state, PacketizerState::Idle) { - PacketizerState::Idle => return Ok(None), + PacketizerState::Idle => Ok(None), PacketizerState::HaveData { timestamp, mut data, } => { if data.len() < 5 { - bail!( + return Err(format!( "have only {} bytes; expected 4-byte length + non-empty NAL", data.len() - ); + )); } let len = data.get_u32(); let usize_len = usize::try_from(len).expect("u32 fits in usize"); if data.len() < usize_len || len == 0 { - bail!("bad length of {} bytes; expected [1, {}]", len, data.len()); + return Err(format!( + "bad length of {} bytes; expected [1, {}]", + len, + data.len() + )); } let sequence_number = self.next_sequence_number; self.next_sequence_number = self.next_sequence_number.wrapping_add(1); - let hdr = - NalHeader::new(data[0]).map_err(|_| format_err!("F bit in NAL header"))?; + let hdr = NalHeader::new(data[0]).map_err(|_| "F bit in NAL header".to_owned())?; if matches!(hdr.nal_unit_type(), UnitType::Unspecified(_)) { // This can clash with fragmentation/aggregation NAL types. - bail!("bad NAL header {:?}", hdr); + return Err(format!("bad NAL header {:?}", hdr)); } if usize_len > max_payload_size { // start a FU-A. data.advance(1); let mut payload = Vec::with_capacity(max_payload_size); let fu_indicator = (hdr.nal_ref_idc() << 5) | 28; - let fu_header = 0b100_00000 | hdr.nal_unit_type().id(); // START bit set. + let fu_header = 0b1000_0000 | hdr.nal_unit_type().id(); // START bit set. payload.extend_from_slice(&[fu_indicator, fu_header]); payload.extend_from_slice(&data[..max_payload_size - 2]); data.advance(max_payload_size - 2); @@ -749,10 +725,13 @@ impl Packetizer { left: len + 1 - u32::from(self.max_payload_size), data, }; + // TODO: ctx, channel_id, and ssrc are placeholders. return Ok(Some(Packet { - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: self.stream_id, timestamp, + ssrc: 0, sequence_number, loss: 0, mark: false, @@ -772,9 +751,11 @@ impl Packetizer { mark = false; } Ok(Some(Packet { - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: self.stream_id, timestamp, + ssrc: 0, sequence_number, loss: 0, mark, @@ -809,7 +790,7 @@ impl Packetizer { let usize_left = usize::try_from(left).expect("u32 fits in usize"); payload = Vec::with_capacity(usize_left + 2); let fu_indicator = (hdr.nal_ref_idc() << 5) | 28; - let fu_header = 0b010_00000 | hdr.nal_unit_type().id(); // END bit set. + let fu_header = 0b0100_0000 | hdr.nal_unit_type().id(); // END bit set. payload.extend_from_slice(&[fu_indicator, fu_header]); payload.extend_from_slice(&data[..usize_left]); if data.len() == usize_left { @@ -821,10 +802,13 @@ impl Packetizer { self.state = PacketizerState::HaveData { timestamp, data }; } } + // TODO: placeholders. Ok(Some(Packet { - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: self.stream_id, timestamp, + ssrc: 0, sequence_number, loss: 0, mark, @@ -927,57 +911,67 @@ mod tests { }; d.push(Packet { // plain SEI packet. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 0, loss: 0, mark: false, payload: Bytes::from_static(b"\x06plain"), }) .unwrap(); - assert!(d.pull().unwrap().is_none()); + assert!(d.pull().is_none()); d.push(Packet { // STAP-A packet. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 1, loss: 0, mark: false, payload: Bytes::from_static(b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2"), }) .unwrap(); - assert!(d.pull().unwrap().is_none()); + assert!(d.pull().is_none()); d.push(Packet { // FU-A packet, start. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 2, loss: 0, mark: false, payload: Bytes::from_static(b"\x7c\x86fu-a start, "), }) .unwrap(); - assert!(d.pull().unwrap().is_none()); + assert!(d.pull().is_none()); d.push(Packet { // FU-A packet, middle. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 3, loss: 0, mark: false, payload: Bytes::from_static(b"\x7c\x06fu-a middle, "), }) .unwrap(); - assert!(d.pull().unwrap().is_none()); + assert!(d.pull().is_none()); d.push(Packet { // FU-A packet, end. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 4, loss: 0, mark: true, @@ -985,7 +979,7 @@ mod tests { }) .unwrap(); let frame = match d.pull() { - Ok(Some(CodecItem::VideoFrame(frame))) => frame, + Some(CodecItem::VideoFrame(frame)) => frame, _ => panic!(), }; assert_eq!( @@ -1012,20 +1006,24 @@ mod tests { start: 0, }; d.push(Packet { // new SPS. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 0, loss: 0, mark: false, payload: Bytes::from_static(b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10"), }).unwrap(); - assert!(d.pull().unwrap().is_none()); + assert!(d.pull().is_none()); d.push(Packet { // same PPS again. - rtsp_ctx: crate::Context::dummy(), + ctx: crate::RtspMessageContext::dummy(), + channel_id: 0, stream_id: 0, timestamp, + ssrc: 0, sequence_number: 1, loss: 0, mark: true, @@ -1033,7 +1031,7 @@ mod tests { }) .unwrap(); let frame = match d.pull() { - Ok(Some(CodecItem::VideoFrame(frame))) => frame, + Some(CodecItem::VideoFrame(frame)) => frame, _ => panic!(), }; assert!(frame.new_parameters.is_some()); diff --git a/src/codec/mod.rs b/src/codec/mod.rs index dd38053..ee70c8b 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -10,8 +10,10 @@ use std::num::{NonZeroU16, NonZeroU32}; use crate::client::rtp; +use crate::error::ErrorInt; +use crate::ConnectionContext; +use crate::Error; use bytes::{Buf, Bytes}; -use failure::{bail, Error}; use pretty_hex::PrettyHex; pub(crate) mod aac; @@ -155,13 +157,15 @@ impl AudioParameters { /// Not all codecs can be placed into a `.mp4` file, and even for supported codecs there /// may be unsupported edge cases. pub fn sample_entry(&self) -> Result { - aac::get_mp4a_box(self) + // TODO: InvalidArgument doesn't seem quite right. We probably should + // produce the mp4a eagerly anyway. + aac::get_mp4a_box(self).map_err(|description| wrap!(ErrorInt::InvalidArgument(description))) } } /// An audio frame, which consists of one or more samples. pub struct AudioFrame { - pub ctx: crate::Context, + pub ctx: crate::RtspMessageContext, pub stream_id: usize, pub timestamp: crate::Timestamp, pub frame_length: NonZeroU32, @@ -206,7 +210,7 @@ impl Buf for AudioFrame { pub struct MessageParameters(onvif::CompressionType); pub struct MessageFrame { - pub ctx: crate::Context, + pub ctx: crate::RtspMessageContext, pub timestamp: crate::Timestamp, pub stream_id: usize, @@ -245,8 +249,8 @@ pub struct VideoFrame { // A pair of contexts: for the start and for the end. // Having both can be useful to measure the total time elapsed while receiving the frame. - start_ctx: crate::Context, - end_ctx: crate::Context, + start_ctx: crate::RtspMessageContext, + end_ctx: crate::RtspMessageContext, /// This picture's timestamp in the time base associated with the stream. pub timestamp: crate::Timestamp, @@ -268,12 +272,12 @@ pub struct VideoFrame { impl VideoFrame { #[inline] - pub fn start_ctx(&self) -> crate::Context { + pub fn start_ctx(&self) -> crate::RtspMessageContext { self.start_ctx } #[inline] - pub fn end_ctx(&self) -> crate::Context { + pub fn end_ctx(&self) -> crate::RtspMessageContext { self.end_ctx } @@ -327,7 +331,7 @@ impl Depacketizer { clock_rate: u32, channels: Option, format_specific_params: Option<&str>, - ) -> Result { + ) -> Result { use onvif::CompressionType; // RTP Payload Format Media Types @@ -382,11 +386,10 @@ impl Depacketizer { media, encoding_name ); - bail!( + return Err(format!( "no depacketizer for media/encoding_name {}/{}", - media, - encoding_name - ); + media, encoding_name + )); } })) } @@ -401,7 +404,7 @@ impl Depacketizer { } } - pub fn push(&mut self, input: rtp::Packet) -> Result<(), Error> { + pub fn push(&mut self, input: rtp::Packet) -> Result<(), String> { match &mut self.0 { DepacketizerInner::Aac(d) => d.push(input), DepacketizerInner::G723(d) => d.push(input), @@ -411,13 +414,13 @@ impl Depacketizer { } } - pub fn pull(&mut self) -> Result, Error> { + pub fn pull(&mut self, conn_ctx: &ConnectionContext) -> Result, Error> { match &mut self.0 { - DepacketizerInner::Aac(d) => d.pull(), - DepacketizerInner::G723(d) => d.pull(), - DepacketizerInner::H264(d) => d.pull(), - DepacketizerInner::Onvif(d) => d.pull(), - DepacketizerInner::SimpleAudio(d) => d.pull(), + DepacketizerInner::Aac(d) => d.pull(conn_ctx), + DepacketizerInner::G723(d) => Ok(d.pull()), + DepacketizerInner::H264(d) => Ok(d.pull()), + DepacketizerInner::Onvif(d) => Ok(d.pull()), + DepacketizerInner::SimpleAudio(d) => Ok(d.pull()), } } } diff --git a/src/codec/onvif.rs b/src/codec/onvif.rs index 5ea8a81..17209ac 100644 --- a/src/codec/onvif.rs +++ b/src/codec/onvif.rs @@ -9,7 +9,6 @@ //! bit set end messages. use bytes::{Buf, BufMut, BytesMut}; -use failure::{bail, Error}; use super::CodecItem; @@ -37,7 +36,7 @@ enum State { #[derive(Debug)] struct InProgress { - ctx: crate::Context, + ctx: crate::RtspMessageContext, timestamp: crate::Timestamp, data: BytesMut, loss: u16, @@ -56,7 +55,7 @@ impl Depacketizer { Some(&self.parameters) } - pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), failure::Error> { + pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> { if pkt.loss > 0 { if let State::InProgress(in_progress) = &self.state { log::debug!( @@ -70,12 +69,10 @@ impl Depacketizer { let mut in_progress = match std::mem::replace(&mut self.state, State::Idle) { State::InProgress(in_progress) => { if in_progress.timestamp.timestamp != pkt.timestamp.timestamp { - bail!( - "Timestamp changed from {} to {} (@ seq {:04x}) with message in progress", - &in_progress.timestamp, - &pkt.timestamp, - pkt.sequence_number - ); + return Err(format!( + "Timestamp changed from {} to {} with message in progress", + &in_progress.timestamp, &pkt.timestamp, + )); } in_progress } @@ -86,7 +83,7 @@ impl Depacketizer { self.state = State::Ready(super::MessageFrame { stream_id: pkt.stream_id, loss: pkt.loss, - ctx: pkt.rtsp_ctx, + ctx: pkt.ctx, timestamp: pkt.timestamp, data: pkt.payload, }); @@ -94,7 +91,7 @@ impl Depacketizer { } InProgress { loss: pkt.loss, - ctx: pkt.rtsp_ctx, + ctx: pkt.ctx, timestamp: pkt.timestamp, data: BytesMut::with_capacity(self.high_water_size), } @@ -117,13 +114,13 @@ impl Depacketizer { Ok(()) } - pub(super) fn pull(&mut self) -> Result, Error> { - Ok(match std::mem::replace(&mut self.state, State::Idle) { + pub(super) fn pull(&mut self) -> Option { + match std::mem::replace(&mut self.state, State::Idle) { State::Ready(message) => Some(CodecItem::MessageFrame(message)), s => { self.state = s; None } - }) + } } } diff --git a/src/codec/simple_audio.rs b/src/codec/simple_audio.rs index eede742..1042123 100644 --- a/src/codec/simple_audio.rs +++ b/src/codec/simple_audio.rs @@ -7,8 +7,6 @@ use std::num::NonZeroU32; use bytes::Bytes; -use failure::format_err; -use failure::Error; use super::CodecItem; @@ -50,10 +48,10 @@ impl Depacketizer { } } - pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), Error> { + pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> { assert!(self.pending.is_none()); let frame_length = self.frame_length(pkt.payload.len()).ok_or_else(|| { - format_err!( + format!( "invalid length {} for payload of {}-bit audio samples", pkt.payload.len(), self.bits_per_sample @@ -61,7 +59,7 @@ impl Depacketizer { })?; self.pending = Some(super::AudioFrame { loss: pkt.loss, - ctx: pkt.rtsp_ctx, + ctx: pkt.ctx, stream_id: pkt.stream_id, timestamp: pkt.timestamp, frame_length, @@ -70,7 +68,7 @@ impl Depacketizer { Ok(()) } - pub(super) fn pull(&mut self) -> Result, Error> { - Ok(self.pending.take().map(CodecItem::AudioFrame)) + pub(super) fn pull(&mut self) -> Option { + self.pending.take().map(CodecItem::AudioFrame) } } diff --git a/src/lib.rs b/src/lib.rs index c7a1201..1915a1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,36 @@ // Copyright (C) 2021 Scott Lamb // SPDX-License-Identifier: MIT OR Apache-2.0 -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use failure::{bail, format_err, Error}; +use bytes::Bytes; +//use failure::{bail, format_err, Error}; use once_cell::sync::Lazy; -use pretty_hex::PrettyHex; -use rtsp_types::{Data, Message}; -use std::convert::TryFrom; +use rtsp_types::Message; use std::fmt::{Debug, Display}; use std::num::NonZeroU32; +mod error; + +pub use error::Error; + +/// Wraps the supplied `ErrorInt` and returns it as an `Err`. +macro_rules! bail { + ($e:expr) => { + return Err(crate::error::Error(Box::new($e))) + }; +} + +macro_rules! wrap { + ($e:expr) => { + crate::error::Error(Box::new($e)) + }; +} + pub mod client; pub mod codec; +//mod error; +mod tokio; + +use error::ErrorInt; pub static X_ACCEPT_DYNAMIC_RATE: Lazy = Lazy::new(|| { rtsp_types::HeaderName::from_static_str("x-Accept-Dynamic-Rate").expect("is ascii") @@ -19,10 +38,11 @@ pub static X_ACCEPT_DYNAMIC_RATE: Lazy = Lazy::new(|| { pub static X_DYNAMIC_RATE: Lazy = Lazy::new(|| rtsp_types::HeaderName::from_static_str("x-Dynamic-Rate").expect("is ascii")); +/// A received RTSP message. #[derive(Debug)] -pub struct ReceivedMessage { - pub ctx: Context, - pub msg: Message, +struct ReceivedMessage { + ctx: RtspMessageContext, + msg: Message, } /// A monotonically increasing timestamp within an RTP stream. @@ -47,16 +67,14 @@ pub struct Timestamp { } impl Timestamp { - /// Creates a new timestamp, ensuring `timestamp - start` doesn't underflow. - pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Result { - match timestamp.checked_sub(i64::from(start)) { - None => bail!("timestamp - start must not underflow"), - Some(_) => Ok(Timestamp { - timestamp, - clock_rate, - start, - }), - } + /// Creates a new timestamp unless `timestamp - start` underflows. + #[inline] + pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Option { + timestamp.checked_sub(i64::from(start)).map(|_| Timestamp { + timestamp, + clock_rate, + start, + }) } /// Returns time since some arbitrary point before the stream started. @@ -90,17 +108,17 @@ impl Timestamp { (self.elapsed() as f64) / (self.clock_rate.get() as f64) } - pub fn try_add(&self, delta: u32) -> Result { + /// Returns `self + delta` unless it would overflow. + pub fn try_add(&self, delta: u32) -> Option { // Check for `timestamp` overflow only. We don't need to check for // `timestamp - start` underflow because delta is non-negative. - Ok(Timestamp { - timestamp: self - .timestamp - .checked_add(i64::from(delta)) - .ok_or_else(|| format_err!("overflow on {:?} + {}", &self, delta))?, - clock_rate: self.clock_rate, - start: self.start, - }) + self.timestamp + .checked_add(i64::from(delta)) + .map(|timestamp| Timestamp { + timestamp, + clock_rate: self.clock_rate, + start: self.start, + }) } } @@ -159,78 +177,106 @@ impl std::fmt::Debug for NtpTimestamp { } } -/// Context of a received message within an RTSP stream. -/// This is meant to help find the correct TCP stream and packet in a matching -/// packet capture. -#[derive(Copy, Clone)] -pub struct Context { - conn_local_addr: std::net::SocketAddr, - conn_peer_addr: std::net::SocketAddr, - conn_established_wall: time::Timespec, - conn_established: std::time::Instant, +/// A wall time taken from the local machine's realtime clock, used in error reporting. +/// +/// Currently this just allows formatting via `Debug` and `Display`. +#[derive(Copy, Clone, Debug)] +pub struct WallTime(time::Timespec); - /// The byte position within the input stream. The bottom 32 bits can be - /// compared to the TCP sequence number. - msg_pos: u64, - - /// Time when the application parsed the message. Caveat: this may not - /// closely match the time on a packet capture if the application is - /// overloaded (or `CLOCK_REALTIME` jumps). - msg_received_wall: time::Timespec, - msg_received: std::time::Instant, +impl WallTime { + fn now() -> Self { + Self(time::get_time()) + } } -impl Context { +impl Display for WallTime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt( + &time::at(self.0) + .strftime("%FT%T") + .map_err(|_| std::fmt::Error)?, + f, + ) + } +} + +/// RTSP connection context. +/// +/// This gives enough information to pick out the flow in a packet capture. +#[derive(Copy, Clone, Debug)] +pub struct ConnectionContext { + local_addr: std::net::SocketAddr, + peer_addr: std::net::SocketAddr, + established_wall: WallTime, + established: std::time::Instant, +} + +impl ConnectionContext { #[doc(hidden)] pub fn dummy() -> Self { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); Self { - conn_local_addr: addr, - conn_peer_addr: addr, - conn_established_wall: time::get_time(), - conn_established: std::time::Instant::now(), - msg_pos: 0, - msg_received_wall: time::get_time(), - msg_received: std::time::Instant::now(), + local_addr: addr, + peer_addr: addr, + established_wall: WallTime::now(), + established: std::time::Instant::now(), } } - - pub fn conn_established(&self) -> std::time::Instant { - self.conn_established - } - - pub fn msg_received(&self) -> std::time::Instant { - self.msg_received - } - - pub fn msg_pos(&self) -> u64 { - self.msg_pos - } } -impl Debug for Context { +impl Display for ConnectionContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // TODO: this current hardcodes the assumption we are the client. // Change if/when adding server code. write!( f, - "[{}(me)->{}@{} pos={}@{}]", - &self.conn_local_addr, - &self.conn_peer_addr, - time::at(self.conn_established_wall) - .strftime("%FT%T") - .map_err(|_| std::fmt::Error)?, - self.msg_pos, - time::at(self.msg_received_wall) - .strftime("%FT%T") - .map_err(|_| std::fmt::Error)? + "{}(me)->{}@{}", + &self.local_addr, &self.peer_addr, &self.established_wall, ) } } -struct Codec { - ctx: Context, +/// Context of a received message (or read error) within an RTSP connection. +/// +/// When paired with a [`ConnectionContext`], this should allow picking the +/// message out of a packet capture. +#[derive(Copy, Clone, Debug)] +pub struct RtspMessageContext { + /// The starting byte position within the input stream. The bottom 32 bits + /// can be compared to the relative TCP sequence number. + pos: u64, + + /// Time when the application parsed the message. Caveat: this may not + /// closely match the time on a packet capture if the application is + /// overloaded (or if `CLOCK_REALTIME` jumps). + received_wall: WallTime, + received: std::time::Instant, +} + +impl RtspMessageContext { + #[doc(hidden)] + pub fn dummy() -> Self { + Self { + pos: 0, + received_wall: WallTime::now(), + received: std::time::Instant::now(), + } + } + + pub fn received(&self) -> std::time::Instant { + self.received + } + + pub fn pos(&self) -> u64 { + self.pos + } +} + +impl Display for RtspMessageContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}@{}", self.pos, &self.received_wall) + } } /// Returns the range within `buf` that represents `subset`. @@ -253,113 +299,3 @@ pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option Result)>, Error> { - if !src.is_empty() && src[0] == b'$' { - // Fast path for interleaved data, avoiding MessageRef -> Message<&[u8]> -> - // Message conversion. This speeds things up quite a bit in practice, - // avoiding a bunch of memmove calls. - if src.len() < 4 { - return Ok(None); - } - let channel_id = src[1]; - let len = 4 + usize::from(u16::from_be_bytes([src[2], src[3]])); - if src.len() < len { - src.reserve(len - src.len()); - return Ok(None); - } - let mut msg = src.split_to(len); - msg.advance(4); - return Ok(Some(( - len, - Message::Data(Data::new(channel_id, msg.freeze())), - ))); - } - - let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) { - Ok((m, l)) => (m, l), - Err(rtsp_types::ParseError::Error) => { - let snippet = &src[0..std::cmp::min(128, src.len())]; - bail!( - "RTSP parse error at {:#?}: next bytes are {:#?}", - &self.ctx, - snippet.hex_dump() - ) - } - Err(rtsp_types::ParseError::Incomplete) => return Ok(None), - }; - - // Map msg's body to a Bytes representation and advance `src`. Awkward: - // 1. lifetime concerns require mapping twice: first so the message - // doesn't depend on the BytesMut, which needs to be split/advanced; - // then to get the proper Bytes body in place post-split. - // 2. rtsp_types messages must be AsRef<[u8]>, so we can't use the - // range as an intermediate body. - // 3. within a match because the rtsp_types::Message enum itself - // doesn't have body/replace_body/map_body methods. - let msg = match msg { - Message::Request(msg) => { - let body_range = as_range(src, msg.body()); - let msg = msg.replace_body(rtsp_types::Empty); - if let Some(r) = body_range { - let mut raw_msg = src.split_to(len); - raw_msg.advance(r.start); - raw_msg.truncate(r.len()); - Message::Request(msg.replace_body(raw_msg.freeze())) - } else { - src.advance(len); - Message::Request(msg.replace_body(Bytes::new())) - } - } - Message::Response(msg) => { - let body_range = as_range(src, msg.body()); - let msg = msg.replace_body(rtsp_types::Empty); - if let Some(r) = body_range { - let mut raw_msg = src.split_to(len); - raw_msg.advance(r.start); - raw_msg.truncate(r.len()); - Message::Response(msg.replace_body(raw_msg.freeze())) - } else { - src.advance(len); - Message::Response(msg.replace_body(Bytes::new())) - } - } - Message::Data(_) => unreachable!(), - }; - Ok(Some((len, msg))) - } -} - -impl tokio_util::codec::Decoder for Codec { - type Item = ReceivedMessage; - type Error = failure::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - let (len, msg) = match self.parse_msg(src) { - Err(e) => return Err(e), - Ok(None) => return Ok(None), - Ok(Some((len, msg))) => (len, msg), - }; - self.ctx.msg_received_wall = time::get_time(); - self.ctx.msg_received = std::time::Instant::now(); - let msg = ReceivedMessage { ctx: self.ctx, msg }; - self.ctx.msg_pos += u64::try_from(len).expect("usize fits in u64"); - Ok(Some(msg)) - } -} - -impl tokio_util::codec::Encoder> for Codec { - type Error = failure::Error; - - fn encode( - &mut self, - item: rtsp_types::Message, - dst: &mut BytesMut, - ) -> Result<(), Self::Error> { - let mut w = std::mem::replace(dst, BytesMut::new()).writer(); - item.write(&mut w).expect("bytes Writer is infallible"); - *dst = w.into_inner(); - Ok(()) - } -}