overhaul error type

*   stop using deprecated failure crate and its deps

*   more uniformly detailed error messages

*   an enum of the type of error, currently internal-only. I'm not
    confident enough I understand what cases a caller might want to
    distinguish, so I added a comment inviting input instead of
    exposing it now.

*   while I'm at it, separate connection context and message contexts.
    This shrinks some of the data memmoved around in PacketItem and
    CodecItem.
This commit is contained in:
Scott Lamb 2021-07-08 10:52:39 -07:00
parent 265b6b9490
commit e9a5e4e34a
20 changed files with 1157 additions and 1014 deletions

View File

@ -1,5 +1,8 @@
## unreleased
* New opaque error type with more uniform, richer error messages. No more
`failure` dependency.
## v0.0.4 (2021-06-28)
* bugfix: Retina stopped receiving packets after receiving a keepalive response.

112
Cargo.lock generated
View File

@ -2,21 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03345e98af8f3d786b6d9f656ccfa6ac316d954e92bc4841f0bba20789d5fb5a"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ansi_term"
version = "0.11.0"
@ -26,6 +11,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61"
[[package]]
name = "arrayvec"
version = "0.5.2"
@ -49,21 +40,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backtrace"
version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7815ea54e4d821e791162e078acbebfd6d8c8939cd559c9335dceb1c8ca7282"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -139,12 +115,6 @@ dependencies = [
"rustc_version",
]
[[package]]
name = "cc"
version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a72c244c1ff497a746a7e1fb3d14bd08420ecda70c8f25c7112f2781652d787"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -326,28 +296,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "failure"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86"
dependencies = [
"backtrace",
"failure_derive",
]
[[package]]
name = "failure_derive"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "form_urlencoded"
version = "1.0.1"
@ -485,12 +433,6 @@ dependencies = [
"wasi",
]
[[package]]
name = "gimli"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4075386626662786ddb0ec9081e7c7eeb1ba31951f447ca780ef9f5d568189"
[[package]]
name = "h264-reader"
version = "0.5.0"
@ -661,16 +603,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
]
[[package]]
name = "mio"
version = "0.7.11"
@ -770,15 +702,6 @@ dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9023c1c0973b327f073c7f2fceb9bcc049862f93a7d14c6feb46c8a56460a0d5"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.7.2"
@ -1066,14 +989,14 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "retina"
version = "0.0.4"
version = "0.0.5-dev"
dependencies = [
"anyhow",
"base64",
"bitreader",
"bytes",
"criterion",
"digest_auth",
"failure",
"futures",
"h264-reader",
"hex",
@ -1088,6 +1011,7 @@ dependencies = [
"sdp",
"smallvec",
"structopt",
"thiserror",
"time",
"tokio",
"tokio-util",
@ -1133,12 +1057,6 @@ dependencies = [
"url",
]
[[package]]
name = "rustc-demangle"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "410f7acf3cb3a44527c5d9546bad4bf4e6c460915d5f9f2fc524498bfe8f70ce"
[[package]]
name = "rustc_version"
version = "0.3.3"
@ -1317,18 +1235,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "synstructure"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]]
name = "tap"
version = "1.0.1"

View File

@ -1,6 +1,6 @@
[package]
name = "retina"
version = "0.0.4"
version = "0.0.5-dev"
authors = ["Scott Lamb <slamb@slamb.org>"]
license = "MIT/Apache-2.0"
edition = "2018"
@ -15,7 +15,6 @@ base64 = "0.13.0"
bitreader = "0.3.3"
bytes = "1.0.1"
digest_auth = "0.3.0"
failure = "0.1.8"
futures = "0.3.14"
hex = "0.4.3"
h264-reader = "0.5.0"
@ -28,12 +27,14 @@ rtp-rs = "0.6.0"
rtsp-types = "0.0.2"
sdp = "0.1.4"
smallvec = { version = "1.6.1", features = ["union"] }
thiserror = "1.0.25"
time = "0.1.43"
tokio = { version = "1.5.0", features = ["macros", "net", "time"] }
tokio-util = { version = "0.6.6", features = ["codec"] }
url = "2.2.1"
[dev-dependencies]
anyhow = "1.0.41"
criterion = { version = "0.3.4", features = ["async_tokio"] }
mylog = { git = "https://github.com/scottlamb/mylog" }
structopt = "0.3.21"

View File

@ -44,8 +44,9 @@ Progress:
* [x] application: ONVIF metadata
* [ ] uniform, documented API. (Currently haphazard in terms of naming, what
fields are exposed directly vs use an accessors, etc.)
* [ ] rich errors. (Currently uses untyped errors with the deprecated
`failure` crate; some error messages are quite detailed, others aren't.)
* quality errors
* * [x] detailed error description text.
* * [ ] programmatically inspectable error type.
* [ ] good functional testing coverage. (Currently lightly / unevenly tested.)
Most depacketizers have no tests.)
* [ ] fuzz testing. (In progress.)

View File

@ -31,7 +31,8 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
Depacketizer::new("audio", "mpeg4-generic", 12_000, NonZeroU16::new(2), Some("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490")).unwrap(),
Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(),
];
let ctx = retina::Context::dummy();
let conn_ctx = retina::ConnectionContext::dummy();
let msg_ctx = retina::RtspMessageContext::dummy();
while !remaining.is_empty() {
assert!(remaining.len() > 4);
assert_eq!(remaining[0], b'$');
@ -46,12 +47,19 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
1 | 3 => continue, // RTCP
_ => unreachable!(),
};
let pkt = match rtps[stream_id].rtp(ctx, &mut timelines[stream_id], stream_id, data) {
let pkt = match rtps[stream_id].rtp(
&conn_ctx,
&msg_ctx,
&mut timelines[stream_id],
channel_id,
stream_id,
data,
) {
Ok(retina::client::PacketItem::RtpPacket(rtp)) => rtp,
_ => unreachable!(),
};
depacketizers[stream_id].push(pkt).unwrap();
while let Some(pkt) = depacketizers[stream_id].pull().unwrap() {
while let Some(pkt) = depacketizers[stream_id].pull(&conn_ctx).unwrap() {
f(pkt);
}
}

View File

@ -6,9 +6,9 @@
mod metadata;
mod mp4;
use failure::Error;
use anyhow::Error;
use log::{error, info};
use std::{fmt::Write, str::FromStr};
use std::str::FromStr;
use structopt::StructOpt;
#[derive(StructOpt)]
@ -29,24 +29,6 @@ enum Cmd {
Metadata(metadata::Opts),
}
/// Returns a pretty-and-informative version of `e`.
pub fn prettify_failure(e: &failure::Error) -> String {
let mut msg = e.to_string();
for cause in e.iter_causes() {
write!(&mut msg, "\ncaused by: {}", cause).unwrap();
}
if e.backtrace().is_empty() {
write!(
&mut msg,
"\n\n(set environment variable RUST_BACKTRACE=1 to see backtraces)"
)
.unwrap();
} else {
write!(&mut msg, "\n\nBacktrace:\n{}", e.backtrace()).unwrap();
}
msg
}
fn init_logging() -> mylog::Handle {
let h = mylog::Builder::new()
.set_format(
@ -68,7 +50,7 @@ async fn main() {
let _a = h.async_scope();
main_inner().await
} {
error!("Fatal: {}", prettify_failure(&e));
error!("Fatal: {}", e);
std::process::exit(1);
}
info!("Done");

View File

@ -1,7 +1,7 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
use failure::{format_err, Error};
use anyhow::{anyhow, Error};
use futures::StreamExt;
use log::info;
use retina::codec::CodecItem;
@ -21,7 +21,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
.streams()
.iter()
.position(|s| matches!(s.parameters(), Some(retina::codec::Parameters::Message(..))))
.ok_or_else(|| format_err!("couldn't find onvif stream"))?;
.ok_or_else(|| anyhow!("couldn't find onvif stream"))?;
session.setup(onvif_stream_i).await?;
let session = session
.play(retina::client::PlayPolicy::default().ignore_zero_seq(true))
@ -33,7 +33,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
loop {
tokio::select! {
item = session.next() => {
match item.ok_or_else(|| format_err!("EOF"))?? {
match item.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::MessageFrame(m) => {
info!("{}: {}\n", &m.timestamp, std::str::from_utf8(&m.data[..]).unwrap());
},

View File

@ -17,8 +17,8 @@
//! https://github.com/scottlamb/moonfire-nvr/wiki/Standards-and-specifications
//! https://standards.iso.org/ittf/PubliclyAvailableStandards/c068960_ISO_IEC_14496-12_2015.zip
use anyhow::{anyhow, bail, Error};
use bytes::{Buf, BufMut, BytesMut};
use failure::{bail, format_err, Error};
use futures::StreamExt;
use log::info;
use retina::codec::{AudioParameters, CodecItem, VideoParameters};
@ -485,7 +485,7 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
Ok(())
}
async fn video(&mut self, frame: retina::codec::VideoFrame) -> Result<(), failure::Error> {
async fn video(&mut self, frame: retina::codec::VideoFrame) -> Result<(), Error> {
println!(
"{}: {}-byte video frame",
&frame.timestamp,
@ -500,7 +500,7 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
self.mdat_pos = self
.mdat_pos
.checked_add(size)
.ok_or_else(|| format_err!("mdat_pos overflow"))?;
.ok_or_else(|| anyhow!("mdat_pos overflow"))?;
if frame.is_random_access_point {
self.video_sync_sample_nums
.push(u32::try_from(self.video_trak.samples)?);
@ -510,7 +510,7 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
Ok(())
}
async fn audio(&mut self, mut frame: retina::codec::AudioFrame) -> Result<(), failure::Error> {
async fn audio(&mut self, mut frame: retina::codec::AudioFrame) -> Result<(), Error> {
println!(
"{}: {}-byte audio frame",
&frame.timestamp,
@ -522,7 +522,7 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
self.mdat_pos = self
.mdat_pos
.checked_add(size)
.ok_or_else(|| format_err!("mdat_pos overflow"))?;
.ok_or_else(|| anyhow!("mdat_pos overflow"))?;
write_all_buf(&mut self.inner, &mut frame.data).await?;
Ok(())
}
@ -585,7 +585,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
loop {
tokio::select! {
pkt = session.next() => {
match pkt.ok_or_else(|| format_err!("EOF"))?? {
match pkt.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::VideoFrame(f) => mp4.video(f).await?,
CodecItem::AudioFrame(f) => mp4.audio(f).await?,
_ => continue,

View File

@ -5,8 +5,6 @@
use std::num::NonZeroU8;
use failure::{bail, Error};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum ChannelType {
Rtp,
@ -39,27 +37,27 @@ pub struct ChannelMapping {
pub struct ChannelMappings(smallvec::SmallVec<[Option<NonZeroU8>; 16]>);
impl ChannelMappings {
/// Returns the next unassigned even channel id, or errors.
pub fn next_unassigned(&self) -> Result<u8, Error> {
/// Returns the next unassigned even channel id, or `None` if all assigned.
pub fn next_unassigned(&self) -> Option<u8> {
if let Some(i) = self.0.iter().position(Option::is_none) {
return Ok((i as u8) << 1);
return Some((i as u8) << 1);
}
if self.0.len() < 128 {
return Ok((self.0.len() as u8) << 1);
return Some((self.0.len() as u8) << 1);
}
bail!("all RTSP channels have been assigned");
None
}
/// Assigns an even channel id (to RTP) and its odd successor (to RTCP) or errors.
pub fn assign(&mut self, channel_id: u8, stream_i: usize) -> Result<(), Error> {
pub fn assign(&mut self, channel_id: u8, stream_i: usize) -> Result<(), String> {
if (channel_id & 1) != 0 {
bail!("Can't assign odd channel id {}", channel_id);
return Err(format!("Can't assign odd channel id {}", channel_id));
}
if stream_i >= 255 {
bail!(
return Err(format!(
"Can't assign channel to stream id {} because it's >= 255",
stream_i
);
));
}
let i = usize::from(channel_id >> 1);
if i >= self.0.len() {
@ -67,12 +65,12 @@ impl ChannelMappings {
}
let c = &mut self.0[i];
if let Some(c) = c {
bail!(
return Err(format!(
"Channel id {} is already assigned to stream {}; won't reassign to stream {}",
channel_id,
c.get() - 1,
channel_id
);
));
}
*c = Some(NonZeroU8::new((stream_i + 1) as u8).expect("[0, 255) + 1 is non-zero"));
Ok(())

View File

@ -8,16 +8,15 @@ use std::{borrow::Cow, fmt::Debug, num::NonZeroU16, pin::Pin};
use self::channel_mapping::*;
pub use self::timeline::Timeline;
use bytes::Bytes;
use failure::{bail, format_err, Error};
use futures::{ready, Future, SinkExt, StreamExt};
use log::{debug, trace, warn};
use pin_project::pin_project;
use sdp::session_description::SessionDescription;
use tokio::pin;
use tokio_util::codec::Framed;
use url::Url;
use crate::{codec::CodecItem, Context};
use crate::codec::CodecItem;
use crate::{Error, ErrorInt, RtspMessageContext};
mod channel_mapping;
mod parse;
@ -59,12 +58,12 @@ impl Default for InitialTimestampPolicy {
impl std::fmt::Display for InitialTimestampPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InitialTimestampPolicy::Default => f.pad("default"),
InitialTimestampPolicy::Require => f.pad("require"),
InitialTimestampPolicy::Ignore => f.pad("ignore"),
InitialTimestampPolicy::Permissive => f.pad("permissive"),
}
f.pad(match self {
InitialTimestampPolicy::Default => "default",
InitialTimestampPolicy::Require => "require",
InitialTimestampPolicy::Ignore => "ignore",
InitialTimestampPolicy::Permissive => "permissive",
})
}
}
@ -77,7 +76,11 @@ impl std::str::FromStr for InitialTimestampPolicy {
"require" => InitialTimestampPolicy::Require,
"ignore" => InitialTimestampPolicy::Ignore,
"permissive" => InitialTimestampPolicy::Permissive,
_ => bail!("Initial timestamp mode {:?} not understood", s),
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad InitialTimestampPolicy {}; \
expected default, require, ignore or permissive",
s
))),
})
}
}
@ -165,7 +168,7 @@ pub struct Stream {
/// Number of audio channels, if applicable (`media` is `audio`) and known.
pub channels: Option<NonZeroU16>,
depacketizer: Result<crate::codec::Depacketizer, Error>,
depacketizer: Result<crate::codec::Depacketizer, String>,
/// The specified control URL.
/// This is needed with multiple streams to send `SETUP` requests and
@ -240,15 +243,26 @@ pub struct Credentials {
/// `DESCRIBE` changes the connection's state such that another one will fail,
/// before assigning a session id. Thus [Session] represents something more like
/// an RTSP connection than an RTSP session.
#[doc(hidden)]
pub trait State {}
/// Initial state after a `DESCRIBE`.
/// Initial state after a `DESCRIBE`; use via `Session<Described>`.
/// One or more `SETUP`s may have also been issued, in which case a `session_id`
/// will be assigned.
#[doc(hidden)]
pub struct Described {
presentation: Presentation,
session_id: Option<String>,
channels: ChannelMappings,
// Keep some information about the DESCRIBE response. If a depacketizer
// couldn't be constructed correctly for one or more streams, this will be
// used to create a RtspResponseError on `State<Playing>::demuxed()`.
// We defer such errors from DESCRIBE time until then because they only
// matter if the stream is setup and the caller wants depacketization.
describe_ctx: RtspMessageContext,
describe_cseq: u32,
describe_status: rtsp_types::StatusCode,
}
impl State for Described {}
@ -258,13 +272,17 @@ enum KeepaliveState {
Waiting(u32),
}
/// State after a `PLAY`.
/// State after a `PLAY`; use via `Session<Playing>`.
#[doc(hidden)]
#[pin_project(project = PlayingProj)]
pub struct Playing {
presentation: Presentation,
session_id: String,
channels: ChannelMappings,
keepalive_state: KeepaliveState,
describe_ctx: RtspMessageContext,
describe_cseq: u32,
describe_status: rtsp_types::StatusCode,
#[pin]
keepalive_timer: tokio::time::Sleep,
@ -275,7 +293,7 @@ impl State for Playing {}
struct RtspConnection {
creds: Option<Credentials>,
requested_auth: Option<digest_auth::WwwAuthenticateHeader>,
stream: Framed<tokio::net::TcpStream, crate::Codec>,
inner: crate::tokio::Connection,
user_agent: String,
/// The next `CSeq` header value to use when sending an RTSP request.
@ -294,44 +312,36 @@ pub struct Session<S: State> {
impl RtspConnection {
async fn connect(url: &Url, creds: Option<Credentials>) -> Result<Self, Error> {
let host =
RtspConnection::validate_url(url).map_err(|e| wrap!(ErrorInt::InvalidArgument(e)))?;
let port = url.port().unwrap_or(554);
let inner = crate::tokio::Connection::connect(host, port)
.await
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
Ok(Self {
inner,
creds,
requested_auth: None,
user_agent: "moonfire-rtsp test".to_string(),
next_cseq: 1,
})
}
fn validate_url(url: &Url) -> Result<url::Host<&str>, String> {
if url.scheme() != "rtsp" {
bail!("Only rtsp urls supported (no rtsps yet)");
return Err(format!(
"Bad URL {}; only scheme rtsp supported",
url.as_str()
));
}
if url.username() != "" || url.password().is_some() {
// Url apparently doesn't even have a way to clear the credentials,
// so this has to be an error.
bail!("URL must not contain credentials");
// TODO: that's not true; revisit this.
return Err("URL must not contain credentials".to_owned());
}
let host = url
.host_str()
.ok_or_else(|| format_err!("Must specify host in rtsp url {}", &url))?;
let port = url.port().unwrap_or(554);
let stream = tokio::net::TcpStream::connect((host, port)).await?;
let conn_established_wall = time::get_time();
let conn_established = std::time::Instant::now();
let conn_local_addr = stream.local_addr()?;
let conn_peer_addr = stream.peer_addr()?;
let stream = Framed::new(
stream,
crate::Codec {
ctx: crate::Context {
conn_established_wall,
conn_established,
conn_local_addr,
conn_peer_addr,
msg_pos: 0,
msg_received_wall: conn_established_wall,
msg_received: conn_established,
},
},
);
Ok(Self {
creds,
requested_auth: None,
stream,
user_agent: "moonfire-rtsp test".to_string(),
next_cseq: 1,
})
url.host()
.ok_or_else(|| format!("Must specify host in rtsp url {}", &url))
}
/// Sends a request and expects the next message from the peer to be its response.
@ -339,63 +349,111 @@ impl RtspConnection {
async fn send(
&mut self,
req: &mut rtsp_types::Request<Bytes>,
) -> Result<rtsp_types::Response<Bytes>, Error> {
) -> Result<(RtspMessageContext, u32, rtsp_types::Response<Bytes>), Error> {
loop {
let cseq = self.fill_req(req)?;
self.stream
self.inner
.send(rtsp_types::Message::Request(req.clone()))
.await?;
let msg = self
.stream
.next()
.await
.ok_or_else(|| format_err!("unexpected EOF while waiting for reply"))??;
.map_err(|e| wrap!(e))?;
let method: &str = req.method().into();
let msg = self.inner.next().await.unwrap_or_else(|| {
bail!(ErrorInt::ReadError {
conn_ctx: *self.inner.ctx(),
msg_ctx: self.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("EOF while expecting reply to {} CSeq {}", method, cseq),
),
})
})?;
let resp = match msg.msg {
rtsp_types::Message::Response(r) => r,
o => bail!("Unexpected RTSP message {:?}", &o),
rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => r,
o => bail!(ErrorInt::RtspFramingError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
description: format!("Expected reply to {} CSeq {}, got {:?}", method, cseq, o,),
}),
};
if parse::get_cseq(&resp) != Some(cseq) {
bail!(
"didn't get expected CSeq {:?} on {:?} at {:#?}",
&cseq,
&resp,
&msg.ctx
);
}
if resp.status() == rtsp_types::StatusCode::Unauthorized {
if self.requested_auth.is_some() {
bail!(
"Received Unauthorized after trying digest auth at {:#?}",
&msg.ctx
);
// TODO: the WWW-Authenticate might indicate a new domain or nonce.
// In that case, we should retry rather than returning error.
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: "Received Unauthorized after trying digest auth".into(),
})
}
let www_authenticate = match resp.header(&rtsp_types::headers::WWW_AUTHENTICATE) {
None => bail!(
"Unauthorized without WWW-Authenticate header at {:#?}",
&msg.ctx
),
None => bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: "Unauthorized without WWW-Authenticate header".into(),
}),
Some(h) => h,
};
let www_authenticate = www_authenticate.as_str();
if !www_authenticate.starts_with("Digest ") {
bail!(
"Non-digest authentication requested at {:#?}: {}",
&msg.ctx,
www_authenticate
);
// TODO: the header(s) might also indicate both Basic and Digest; we shouldn't
// error or not based on ordering.
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: format!(
"Non-digest authentication requested: {}",
www_authenticate
),
})
}
let www_authenticate = digest_auth::WwwAuthenticateHeader::parse(www_authenticate)?;
if self.creds.is_none() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: "Authentication requested and no credentials supplied"
.to_owned(),
})
}
let msg_ctx = msg.ctx;
let www_authenticate = digest_auth::WwwAuthenticateHeader::parse(www_authenticate)
.map_err(|e| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: format!(
"Bad WWW-Authenticate header {:?}: {}",
www_authenticate, e
),
})
})?;
self.requested_auth = Some(www_authenticate);
continue;
} else if !resp.status().is_success() {
bail!(
"RTSP {:?} request returned {} at {:#?}",
req.method(),
resp.status(),
&msg.ctx
);
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
method: req.method().clone(),
cseq,
status: resp.status(),
description: "Unexpected RTSP response status".into(),
});
}
return Ok(resp);
return Ok((msg.ctx, cseq, resp));
}
}
@ -403,22 +461,27 @@ impl RtspConnection {
fn fill_req(&mut self, req: &mut rtsp_types::Request<Bytes>) -> Result<u32, Error> {
let cseq = self.next_cseq;
self.next_cseq += 1;
match (self.requested_auth.as_mut(), self.creds.as_ref()) {
(None, _) => {}
(Some(auth), Some(creds)) => {
let uri = req.request_uri().map(|u| u.as_str()).unwrap_or("*");
let method = digest_auth::HttpMethod(Cow::Borrowed(req.method().into()));
let ctx = digest_auth::AuthContext::new_with_method(
&creds.username,
&creds.password,
uri,
Option::<&'static [u8]>::None,
method,
);
let authorization = auth.respond(&ctx)?.to_string();
req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization);
}
(Some(_), None) => bail!("Authentication required; no credentials supplied"),
if let Some(ref mut auth) = self.requested_auth {
let creds = self
.creds
.as_ref()
.expect("creds were checked when filling request_auth");
let uri = req.request_uri().map(|u| u.as_str()).unwrap_or("*");
let method = digest_auth::HttpMethod(Cow::Borrowed(req.method().into()));
let ctx = digest_auth::AuthContext::new_with_method(
&creds.username,
&creds.password,
uri,
Option::<&'static [u8]>::None,
method,
);
// digest_auth's comments seem to say 'respond' failing means a parser bug.
let authorization = auth
.respond(&ctx)
.map_err(|e| wrap!(ErrorInt::Internal(e.into())))?
.to_string();
req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization);
}
req.insert_header(rtsp_types::headers::CSEQ, cseq.to_string());
req.insert_header(rtsp_types::headers::USER_AGENT, self.user_agent.clone());
@ -427,6 +490,13 @@ impl RtspConnection {
}
impl Session<Described> {
/// Creates a new session from a `DESCRIBE` request on the given URL.
///
/// This method is permissive; it will return success even if there are
/// errors in the SDP that would prevent one or more streams from being
/// depacketized correctly. If those streams are setup via
/// `Session<Described>::setup`, the erorrs in question will be ultimately
/// returned from `Stream<Playing>::demuxed`.
pub async fn describe(url: Url, creds: Option<Credentials>) -> Result<Self, Error> {
let mut conn = RtspConnection::connect(&url, creds).await?;
let mut req =
@ -434,14 +504,26 @@ impl Session<Described> {
.header(rtsp_types::headers::ACCEPT, "application/sdp")
.request_uri(url.clone())
.build(Bytes::new());
let response = conn.send(&mut req).await?;
let presentation = parse::parse_describe(url, response)?;
let (msg_ctx, cseq, response) = conn.send(&mut req).await?;
let presentation = parse::parse_describe(url, &response).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Describe,
cseq,
status: response.status(),
description,
})
})?;
Ok(Session {
conn,
state: Described {
presentation,
session_id: None,
channels: ChannelMappings::default(),
describe_ctx: msg_ctx,
describe_cseq: cseq,
describe_status: response.status(),
},
})
}
@ -451,6 +533,7 @@ impl Session<Described> {
}
/// Sends a `SETUP` request for a stream.
///
/// Note these can't reasonably be pipelined because subsequent requests
/// are expected to adopt the previous response's `Session`. Likewise,
/// the server may override the preferred interleaved channel id and it
@ -461,9 +544,13 @@ impl Session<Described> {
pub async fn setup(&mut self, stream_i: usize) -> Result<(), Error> {
let stream = &mut self.state.presentation.streams[stream_i];
if !matches!(stream.state, StreamState::Uninit) {
bail!("stream already set up");
bail!(ErrorInt::FailedPrecondition("stream already set up".into()));
}
let proposed_channel_id = self.state.channels.next_unassigned()?;
let proposed_channel_id = self.state.channels.next_unassigned().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition(
"no unassigned channels".into()
))
})?;
let url = stream
.control
.as_ref()
@ -484,21 +571,51 @@ impl Session<Described> {
if let Some(ref s) = self.state.session_id {
req = req.header(rtsp_types::headers::SESSION, s.clone());
}
let response = self.conn.send(&mut req.build(Bytes::new())).await?;
let (msg_ctx, cseq, response) = self.conn.send(&mut req.build(Bytes::new())).await?;
debug!("SETUP response: {:#?}", &response);
let response = parse::parse_setup(&response)?;
let conn_ctx = self.conn.inner.ctx();
let status = response.status();
let response = parse::parse_setup(&response).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: rtsp_types::Method::Setup,
cseq,
status,
description,
})
})?;
match self.state.session_id.as_ref() {
Some(old) if old != response.session_id => {
bail!(
"SETUP response changed session id from {:?} to {:?}",
old,
response.session_id
);
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Setup,
cseq,
status,
description: format!(
"session id changed from {:?} to {:?}",
old, response.session_id,
),
});
}
Some(_) => {}
None => self.state.session_id = Some(response.session_id.to_owned()),
};
self.state.channels.assign(response.channel_id, stream_i)?;
let conn_ctx = self.conn.inner.ctx();
self.state
.channels
.assign(response.channel_id, stream_i)
.map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: rtsp_types::Method::Setup,
cseq,
status,
description,
})
})?;
stream.state = StreamState::Init(StreamStateInit {
ssrc: response.ssrc,
initial_seq: None,
@ -508,16 +625,17 @@ impl Session<Described> {
}
/// Sends a `PLAY` request for the entire presentation.
///
/// The presentation must support aggregate control, as defined in [RFC 2326
/// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3).
pub async fn play(mut self, policy: PlayPolicy) -> Result<Session<Playing>, Error> {
let session_id = self
.state
.session_id
.take()
.ok_or_else(|| format_err!("must SETUP before PLAY"))?;
let session_id = self.state.session_id.take().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition(
"must SETUP before PLAY".into()
))
})?;
trace!("PLAY with channel mappings: {:#?}", &self.state.channels);
let response = self
let (msg_ctx, cseq, response) = self
.conn
.send(
&mut rtsp_types::Request::builder(
@ -530,7 +648,16 @@ impl Session<Described> {
.build(Bytes::new()),
)
.await?;
parse::parse_play(response, &mut self.state.presentation)?;
parse::parse_play(&response, &mut self.state.presentation).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *self.conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Play,
cseq,
status: response.status(),
description,
})
})?;
// Count how many streams have been setup (not how many are in the presentation).
let setup_streams = self
@ -563,27 +690,34 @@ impl Session<Described> {
ssrc,
..
}) => {
let initial_rtptime =
match policy.initial_timestamp {
InitialTimestampPolicy::Require | InitialTimestampPolicy::Default
if setup_streams > 1 =>
{
if initial_rtptime.is_none() {
bail!(
"Expected rtptime on PLAY with mode {:?}, missing on stream \
{} ({:?}). Consider setting initial timestamp mode \
use-if-all-present.",
policy.initial_timestamp, i, &s.control);
}
initial_rtptime
let initial_rtptime = match policy.initial_timestamp {
InitialTimestampPolicy::Require | InitialTimestampPolicy::Default
if setup_streams > 1 =>
{
if initial_rtptime.is_none() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Play,
cseq,
status: response.status(),
description: format!(
"Expected rtptime on PLAY with mode {:?}, missing on \
stream {} ({:?}). Consider setting initial timestamp \
mode use-if-all-present.",
policy.initial_timestamp, i, &s.control
),
});
}
InitialTimestampPolicy::Permissive
if setup_streams > 1 && all_have_time =>
{
initial_rtptime
}
_ => None,
};
initial_rtptime
}
InitialTimestampPolicy::Permissive
if setup_streams > 1 && all_have_time =>
{
initial_rtptime
}
_ => None,
};
let initial_seq = match initial_seq {
Some(0) if policy.ignore_zero_seq => {
log::info!("Ignoring seq=0 on stream {}", i);
@ -591,12 +725,23 @@ impl Session<Described> {
}
o => o,
};
let conn_ctx = self.conn.inner.ctx();
s.state = StreamState::Playing {
timeline: Timeline::new(
initial_rtptime,
s.clock_rate,
policy.enforce_timestamps_with_max_jump_secs,
)?,
)
.map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: rtsp_types::Method::Play,
cseq,
status: response.status(),
description,
})
})?,
rtp_handler: rtp::StrictSequenceChecker::new(ssrc, initial_seq),
};
}
@ -612,6 +757,9 @@ impl Session<Described> {
channels: self.state.channels,
keepalive_state: KeepaliveState::Idle,
keepalive_timer: tokio::time::sleep(KEEPALIVE_DURATION),
describe_ctx: self.state.describe_ctx,
describe_cseq: self.state.describe_cseq,
describe_status: self.state.describe_status,
},
})
}
@ -624,11 +772,20 @@ pub enum PacketItem {
impl Session<Playing> {
/// Returns a wrapper which demuxes/depacketizes into frames.
///
/// Fails if a stream that has been setup can't be depacketized.
pub fn demuxed(mut self) -> Result<Demuxed, Error> {
for s in &mut self.state.presentation.streams {
if matches!(s.state, StreamState::Playing { .. }) {
if let Err(ref mut e) = s.depacketizer {
return Err(std::mem::replace(e, format_err!("(placeholder)")));
if let Err(ref description) = s.depacketizer {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.conn.inner.ctx(),
msg_ctx: self.state.describe_ctx,
method: rtsp_types::Method::Describe,
cseq: self.state.describe_cseq,
status: self.state.describe_status,
description: description.clone(),
});
}
}
}
@ -645,23 +802,36 @@ impl Session<Playing> {
) -> Result<(), Error> {
// Expect the previous keepalive request to have finished.
match state.keepalive_state {
KeepaliveState::Flushing(cseq) => bail!(
"Unable to write keepalive {} within {:?}",
cseq,
KEEPALIVE_DURATION,
),
KeepaliveState::Waiting(cseq) => bail!(
"Server failed to respond to keepalive {} within {:?}",
cseq,
KEEPALIVE_DURATION,
),
KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError {
conn_ctx: *conn.inner.ctx(),
source: std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"Unable to write keepalive {} within {:?}",
cseq, KEEPALIVE_DURATION,
),
),
}),
KeepaliveState::Waiting(cseq) => bail!(ErrorInt::ReadError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: conn.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"Server failed to respond to keepalive {} within {:?}",
cseq, KEEPALIVE_DURATION,
),
),
}),
KeepaliveState::Idle => {}
}
// Currently the only outbound data should be keepalives, and the previous one
// has already been flushed, so there's no reason the Sink shouldn't be ready.
if matches!(conn.stream.poll_ready_unpin(cx), Poll::Pending) {
bail!("Unexpectedly not ready to send keepalive");
if matches!(conn.inner.poll_ready_unpin(cx), Poll::Pending) {
bail!(ErrorInt::Internal(
"Unexpectedly not ready to send keepalive".into()
));
}
// Send a new one and reset the timer.
@ -673,12 +843,12 @@ impl Session<Playing> {
.header(rtsp_types::headers::SESSION, state.session_id.clone())
.build(Bytes::new());
let cseq = conn.fill_req(&mut req)?;
conn.stream
conn.inner
.start_send_unpin(rtsp_types::Message::Request(req))
.expect("encoding is infallible");
*state.keepalive_state = match conn.stream.poll_flush_unpin(cx) {
*state.keepalive_state = match conn.inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq),
Poll::Ready(Err(e)) => return Err(e),
Poll::Ready(Err(e)) => bail!(e),
Poll::Pending => KeepaliveState::Flushing(cseq),
};
@ -691,6 +861,8 @@ impl Session<Playing> {
fn handle_response(
state: &mut PlayingProj<'_>,
conn: &RtspConnection,
msg_ctx: &crate::RtspMessageContext,
response: rtsp_types::Response<Bytes>,
) -> Result<(), Error> {
if matches!(*state.keepalive_state,
@ -702,18 +874,27 @@ impl Session<Playing> {
}
// The only response we expect in this state is to our keepalive request.
bail!("Unexpected RTSP response {:#?}", response);
bail!(ErrorInt::RtspFramingError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: *msg_ctx,
description: format!("Unexpected RTSP response {:#?}", response),
})
}
fn handle_data(
state: &mut PlayingProj<'_>,
ctx: Context,
conn: &RtspConnection,
msg_ctx: &RtspMessageContext,
data: rtsp_types::Data<Bytes>,
) -> Result<Option<PacketItem>, Error> {
let c = data.channel_id();
let m = match state.channels.lookup(c) {
let channel_id = data.channel_id();
let m = match state.channels.lookup(channel_id) {
Some(m) => m,
None => bail!("Data message on unexpected channel {} at {:#?}", c, &ctx),
None => bail!(ErrorInt::RtspUnassignedChannelError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: *msg_ctx,
channel_id,
}),
};
let stream = &mut state.presentation.streams[m.stream_i];
let (mut timeline, rtp_handler) = match &mut stream.state {
@ -721,18 +902,31 @@ impl Session<Playing> {
timeline,
rtp_handler,
} => (timeline, rtp_handler),
_ => unreachable!("Session<Playing>'s {}->{:?} not in Playing state", c, m),
_ => unreachable!(
"Session<Playing>'s {}->{:?} not in Playing state",
channel_id, m
),
};
match m.channel_type {
ChannelType::Rtp => Ok(Some(rtp_handler.rtp(
ctx,
conn.inner.ctx(),
msg_ctx,
&mut timeline,
channel_id,
m.stream_i,
data.into_body(),
)?)),
ChannelType::Rtcp => {
Ok(rtp_handler.rtcp(ctx, &mut timeline, m.stream_i, data.into_body())?)
}
ChannelType::Rtcp => rtp_handler
.rtcp(msg_ctx, &mut timeline, m.stream_i, data.into_body())
.map_err(|description| {
wrap!(ErrorInt::RtspDataMessageError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: *msg_ctx,
channel_id,
stream_id: m.stream_i,
description,
})
}),
}
}
@ -753,17 +947,19 @@ impl futures::Stream for Session<Playing> {
loop {
// First try receiving data. Let this starve keepalive handling; if we can't keep up,
// the server should probably drop us.
match Pin::new(&mut this.conn.stream).poll_next(cx) {
match Pin::new(&mut this.conn.inner).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => match msg.msg {
rtsp_types::Message::Data(data) => {
match Session::handle_data(&mut state, msg.ctx, data) {
match Session::handle_data(&mut state, &this.conn, &msg.ctx, data) {
Err(e) => return Poll::Ready(Some(Err(e))),
Ok(Some(pkt)) => return Poll::Ready(Some(Ok(pkt))),
Ok(None) => continue,
};
}
rtsp_types::Message::Response(response) => {
if let Err(e) = Session::handle_response(&mut state, response) {
if let Err(e) =
Session::handle_response(&mut state, &this.conn, &msg.ctx, response)
{
return Poll::Ready(Some(Err(e)));
}
continue;
@ -785,9 +981,9 @@ impl futures::Stream for Session<Playing> {
// Then finish flushing the current keepalive if necessary.
if let KeepaliveState::Flushing(cseq) = state.keepalive_state {
match this.conn.stream.poll_flush_unpin(cx) {
match this.conn.inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => *state.keepalive_state = KeepaliveState::Waiting(*cseq),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Box::new(e))))),
Poll::Pending => {}
}
}
@ -801,6 +997,7 @@ impl futures::Stream for Session<Playing> {
enum DemuxedState {
Waiting,
Pulling(usize),
Fused,
}
/// Wrapper returned by [`Session<Playing>::demuxed`] which demuxes/depacketizes into frames.
@ -830,6 +1027,7 @@ impl futures::Stream for Demuxed {
None => return Poll::Ready(None),
},
DemuxedState::Pulling(stream_id) => (*stream_id, None),
DemuxedState::Fused => return Poll::Ready(None),
};
let session = this.session.as_mut().project();
let playing = session.state.project();
@ -837,10 +1035,26 @@ impl futures::Stream for Demuxed {
Ok(d) => d,
Err(_) => unreachable!("depacketizer was Ok"),
};
let conn_ctx = session.conn.inner.ctx();
if let Some(p) = pkt {
depacketizer.push(p)?;
let msg_ctx = p.ctx;
let channel_id = p.channel_id;
let stream_id = p.stream_id;
let ssrc = p.ssrc;
let sequence_number = p.sequence_number;
depacketizer.push(p).map_err(|description| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description,
})
})?;
}
match depacketizer.pull() {
match depacketizer.pull(conn_ctx) {
Ok(Some(item)) => {
*this.state = DemuxedState::Pulling(stream_id);
return Poll::Ready(Some(Ok(item)));
@ -849,7 +1063,10 @@ impl futures::Stream for Demuxed {
*this.state = DemuxedState::Waiting;
continue;
}
Err(e) => return Poll::Ready(Some(Err(e))),
Err(e) => {
*this.state = DemuxedState::Fused;
return Poll::Ready(Some(Err(e)));
}
}
}
}

View File

@ -2,8 +2,8 @@
// SPDX-License-Identifier: MIT OR Apache-2.0
use bytes::{Buf, Bytes};
use failure::{bail, format_err, Error, ResultExt};
use log::debug;
use pretty_hex::PrettyHex;
use sdp::media_description::MediaDescription;
use std::{convert::TryFrom, num::NonZeroU16};
use url::Url;
@ -184,17 +184,16 @@ static STATIC_PAYLOAD_TYPES: [Option<StaticPayloadType>; 35] = [
}),
];
fn join_control(base_url: &Url, control: &str) -> Result<Url, Error> {
fn join_control(base_url: &Url, control: &str) -> Result<Url, String> {
if control == "*" {
return Ok(base_url.clone());
}
Ok(base_url.join(control).with_context(|_| {
format_err!(
"unable to join base url {} with control url {:?}",
base_url,
control
base_url.join(control).map_err(|e| {
format!(
"unable to join base url {} with control url {:?}: {}",
base_url, control, e
)
})?)
})
}
/// Returns the `CSeq` from an RTSP response as a `u32`, or `None` if missing/unparseable.
@ -206,12 +205,12 @@ pub(crate) fn get_cseq(response: &rtsp_types::Response<Bytes>) -> Option<u32> {
/// Parses a [MediaDescription] to a [Stream].
/// On failure, returns an error which is expected to be supplemented with
/// the [MediaDescription] debug string.
/// the [MediaDescription] debug string and packed into a `RtspResponseError`.
fn parse_media(
base_url: &Url,
alt_base_url: &Url,
media_description: &MediaDescription,
) -> Result<Stream, Error> {
) -> Result<Stream, String> {
let media = media_description.media_name.media.clone();
// https://tools.ietf.org/html/rfc8866#section-5.14 says "If the <proto>
@ -226,7 +225,7 @@ fn parse_media(
.iter()
.any(|p| p == "RTP")
{
bail!("Expected RTP-based proto");
return Err("Expected RTP-based proto".into());
}
// RFC 8866 continues: "When a list of payload type numbers is given,
@ -238,11 +237,11 @@ fn parse_media(
.media_name
.formats
.first()
.ok_or_else(|| format_err!("missing RTP payload type"))?;
.ok_or_else(|| "missing RTP payload type".to_string())?;
let rtp_payload_type = u8::from_str_radix(rtp_payload_type_str, 10)
.map_err(|_| format_err!("invalid RTP payload type"))?;
.map_err(|_| format!("invalid RTP payload type {:?}", rtp_payload_type_str))?;
if (rtp_payload_type & 0x80) != 0 {
bail!("invalid RTP payload type");
return Err(format!("invalid RTP payload type {}", rtp_payload_type));
}
// Capture interesting attributes.
@ -260,7 +259,7 @@ fn parse_media(
let v = a
.value
.as_ref()
.ok_or_else(|| format_err!("rtpmap attribute with no value"))?;
.ok_or_else(|| "rtpmap attribute with no value".to_string())?;
// https://tools.ietf.org/html/rfc8866#section-6.6
// rtpmap-value = payload-type SP encoding-name
// "/" clock-rate [ "/" encoding-params ]
@ -271,7 +270,7 @@ fn parse_media(
// channels = integer
let (rtpmap_payload_type, v) = v
.split_once(' ')
.ok_or_else(|| format_err!("invalid rtmap attribute"))?;
.ok_or_else(|| "invalid rtmap attribute".to_string())?;
if rtpmap_payload_type == rtp_payload_type_str {
rtpmap = Some(v);
}
@ -280,10 +279,10 @@ fn parse_media(
let v = a
.value
.as_ref()
.ok_or_else(|| format_err!("rtpmap attribute with no value"))?;
.ok_or_else(|| "fmtp attribute with no value".to_string())?;
let (fmtp_payload_type, v) = v
.split_once(' ')
.ok_or_else(|| format_err!("invalid rtmap attribute"))?;
.ok_or_else(|| "invalid fmtp attribute".to_string())?;
if fmtp_payload_type == rtp_payload_type_str {
fmtp = Some(v);
}
@ -308,20 +307,20 @@ fn parse_media(
Some(rtpmap) => {
let (e, rtpmap) = rtpmap
.split_once('/')
.ok_or_else(|| format_err!("invalid rtpmap attribute"))?;
.ok_or_else(|| "invalid rtpmap attribute".to_string())?;
encoding_name = e;
let (clock_rate_str, channels_str) = match rtpmap.find('/') {
None => (rtpmap, None),
Some(i) => (&rtpmap[..i], Some(&rtpmap[i + 1..])),
};
clock_rate = u32::from_str_radix(clock_rate_str, 10)
.map_err(|_| format_err!("bad clockrate in rtpmap"))?;
.map_err(|_| "bad clockrate in rtpmap".to_string())?;
channels = channels_str
.map(|c| {
u16::from_str_radix(c, 10)
.ok()
.and_then(NonZeroU16::new)
.ok_or_else(|| format_err!("Invalid channels specification {:?}", c))
.ok_or_else(|| format!("Invalid channels specification {:?}", c))
})
.transpose()?;
}
@ -330,7 +329,7 @@ fn parse_media(
.get(usize::from(rtp_payload_type))
.and_then(Option::as_ref)
.ok_or_else(|| {
format_err!(
format!(
"Expected rtpmap parameter or assigned static payload type (got {})",
rtp_payload_type
)
@ -339,11 +338,10 @@ fn parse_media(
clock_rate = type_.clock_rate;
channels = type_.channels;
if type_.media != media {
bail!(
return Err(format!(
"SDP media type {} must match RTP payload type {:#?}",
&media,
type_
);
&media, type_
));
}
}
}
@ -366,35 +364,48 @@ fn parse_media(
}
/// Parses a successful RTSP `DESCRIBE` response into a [Presentation].
/// On error, returns a string which is expected to be packed into an `RtspProtocolError`.
pub(crate) fn parse_describe(
request_url: Url,
response: rtsp_types::Response<Bytes>,
) -> Result<Presentation, Error> {
response: &rtsp_types::Response<Bytes>,
) -> Result<Presentation, String> {
if !matches!(response.header(&rtsp_types::headers::CONTENT_TYPE), Some(v) if v.as_str() == "application/sdp")
{
bail!(
return Err(format!(
"Describe response not of expected application/sdp content type: {:#?}",
&response
);
));
}
let sdp;
{
let mut cursor = std::io::Cursor::new(&response.body()[..]);
sdp = sdp::session_description::SessionDescription::unmarshal(&mut cursor)?;
sdp =
sdp::session_description::SessionDescription::unmarshal(&mut cursor).map_err(|e| {
format!(
"Unable to parse SDP: {}\n\n{:#?}",
e,
response.body().hex_dump()
)
})?;
if cursor.has_remaining() {
bail!(
return Err(format!(
"garbage after sdp: {:?}",
&response.body()[usize::try_from(cursor.position()).unwrap()..]
);
));
}
}
// https://tools.ietf.org/html/rfc2326#appendix-C.1.1
let base_url = response
.header(&rtsp_types::headers::CONTENT_BASE)
.or_else(|| response.header(&rtsp_types::headers::CONTENT_LOCATION))
.map(|v| Url::parse(v.as_str()))
.map(|v| (rtsp_types::headers::CONTENT_BASE, v))
.or_else(|| {
response
.header(&rtsp_types::headers::CONTENT_LOCATION)
.map(|v| (rtsp_types::headers::CONTENT_LOCATION, v))
})
.map(|(h, v)| Url::parse(v.as_str()).map_err(|e| format!("bad {} {:?}: {}", h, v, e)))
.unwrap_or(Ok(request_url))?;
let mut alt_base_url = base_url.clone();
alt_base_url.set_path(&format!("{}/", base_url.path()));
@ -410,7 +421,7 @@ pub(crate) fn parse_describe(
break;
}
}
let control = control.ok_or_else(|| format_err!("no control url"))?;
let control = control.ok_or_else(|| "no control url".to_string())?;
let streams = sdp
.media_descriptions
@ -418,10 +429,9 @@ pub(crate) fn parse_describe(
.enumerate()
.map(|(i, m)| {
parse_media(&base_url, &alt_base_url, &m)
.with_context(|_| format!("Unable to parse stream {}: {:#?}", i, &m))
.map_err(Error::from)
.map_err(|e| format!("Unable to parse stream {}: {}\n\n{:#?}", i, &e, &m))
})
.collect::<Result<Vec<Stream>, Error>>()?;
.collect::<Result<Vec<Stream>, String>>()?;
let accept_dynamic_rate =
matches!(response.header(&crate::X_ACCEPT_DYNAMIC_RATE), Some(h) if h.as_str() == "1");
@ -445,43 +455,40 @@ pub(crate) struct SetupResponse<'a> {
/// `session_id` is checked for assignment or reassignment.
/// Returns an assigned interleaved channel id (implying the next channel id
/// is also assigned) or errors.
pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<SetupResponse, Error> {
pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<SetupResponse, String> {
let session = response
.header(&rtsp_types::headers::SESSION)
.ok_or_else(|| format_err!("SETUP response has no Session header"))?;
.ok_or_else(|| "Missing Session header".to_string())?;
let session_id = match session.as_str().find(';') {
None => session.as_str(),
Some(i) => &session.as_str()[..i],
};
let transport = response
.header(&rtsp_types::headers::TRANSPORT)
.ok_or_else(|| format_err!("SETUP response has no Transport header"))?;
.ok_or_else(|| "Missing Transport header".to_string())?;
let mut channel_id = None;
let mut ssrc = None;
for part in transport.as_str().split(';') {
if let Some(v) = part.strip_prefix("ssrc=") {
let v =
u32::from_str_radix(v, 16).map_err(|_| format_err!("Unparseable ssrc {}", v))?;
let v = u32::from_str_radix(v, 16).map_err(|_| format!("Unparseable ssrc {}", v))?;
ssrc = Some(v);
break;
} else if let Some(interleaved) = part.strip_prefix("interleaved=") {
let mut channels = interleaved.splitn(2, '-');
let n = channels.next().expect("splitn returns at least one part");
let n =
u8::from_str_radix(n, 10).map_err(|_| format_err!("bad channel number {}", n))?;
let n = u8::from_str_radix(n, 10).map_err(|_| format!("bad channel number {}", n))?;
if let Some(m) = channels.next() {
let m = u8::from_str_radix(m, 10)
.map_err(|_| format_err!("bad second channel number {}", m))?;
.map_err(|_| format!("bad second channel number {}", m))?;
if n.checked_add(1) != Some(m) {
bail!("Expected adjacent channels; got {}-{}", n, m);
format!("Expected adjacent channels; got {}-{}", n, m);
}
}
channel_id = Some(n);
}
}
let channel_id = channel_id.ok_or_else(|| {
format_err!("SETUP response Transport header has no interleaved parameter")
})?;
let channel_id =
channel_id.ok_or_else(|| "Transport header has no interleaved parameter".to_string())?;
Ok(SetupResponse {
session_id,
ssrc,
@ -489,14 +496,15 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<Setu
})
}
/// Parses a `PLAY` response. The error should always be packed into a `RtspProtocolError`.
pub(crate) fn parse_play(
response: rtsp_types::Response<Bytes>,
response: &rtsp_types::Response<Bytes>,
presentation: &mut Presentation,
) -> Result<(), Error> {
) -> Result<(), String> {
// https://tools.ietf.org/html/rfc2326#section-12.33
let rtp_info = response
.header(&rtsp_types::headers::RTP_INFO)
.ok_or_else(|| format_err!("PLAY response has no RTP-Info header"))?;
.ok_or_else(|| "PLAY response has no RTP-Info header".to_string())?;
for s in rtp_info.as_str().split(',') {
let s = s.trim();
let mut parts = s.split(';');
@ -504,7 +512,7 @@ pub(crate) fn parse_play(
.next()
.expect("split always returns at least one part")
.strip_prefix("url=")
.ok_or_else(|| format_err!("RTP-Info missing stream URL"))?;
.ok_or_else(|| "RTP-Info missing stream URL".to_string())?;
let url = join_control(&presentation.base_url, url)?;
let mut stream;
if presentation.streams.len() == 1 {
@ -533,7 +541,7 @@ pub(crate) fn parse_play(
.find(|s| matches!(&s.alt_control, Some(u) if u == &url));
}
}
let stream = stream.ok_or_else(|| format_err!("can't find RTP-Info stream {}", url))?;
let stream = stream.ok_or_else(|| format!("RTP-Info contains unknown stream {}", url))?;
let state = match &mut stream.state {
super::StreamState::Uninit => {
// This appears to happen for Reolink devices when we did not send a SETUP request
@ -551,21 +559,21 @@ pub(crate) fn parse_play(
for part in parts {
let (key, value) = part
.split_once('=')
.ok_or_else(|| format_err!("RTP-Info param has no ="))?;
.ok_or_else(|| "RTP-Info param has no =".to_string())?;
match key {
"seq" => {
let seq = u16::from_str_radix(value, 10)
.map_err(|_| format_err!("bad seq {:?}", value))?;
.map_err(|_| format!("bad seq {:?}", value))?;
state.initial_seq = Some(seq);
}
"rtptime" => {
let rtptime = u32::from_str_radix(value, 10)
.map_err(|_| format_err!("bad rtptime {:?}", value))?;
.map_err(|_| format!("bad rtptime {:?}", value))?;
state.initial_rtptime = Some(rtptime);
}
"ssrc" => {
let ssrc = u32::from_str_radix(value, 16)
.map_err(|_| format_err!("Unparseable ssrc {}", value))?;
.map_err(|_| format!("Unparseable ssrc {}", value))?;
state.ssrc = Some(ssrc);
}
_ => {}
@ -580,7 +588,6 @@ mod tests {
use std::num::NonZeroU16;
use bytes::Bytes;
use failure::Error;
use url::Url;
use crate::{client::StreamStateInit, codec::Parameters};
@ -599,9 +606,9 @@ mod tests {
fn parse_describe(
raw_url: &str,
raw_response: &'static [u8],
) -> Result<super::Presentation, Error> {
) -> Result<super::Presentation, String> {
let url = Url::parse(raw_url).unwrap();
super::parse_describe(url, response(raw_response))
super::parse_describe(url, &response(raw_response))
}
#[test]
@ -681,7 +688,7 @@ mod tests {
});
// PLAY.
super::parse_play(response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap();
super::parse_play(&response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap();
match &p.streams[0].state {
StreamState::Init(s) => {
assert_eq!(s.initial_seq, Some(47121));
@ -779,7 +786,7 @@ mod tests {
// PLAY.
super::parse_play(
response(include_bytes!("testdata/hikvision_play.txt")),
&response(include_bytes!("testdata/hikvision_play.txt")),
&mut p,
)
.unwrap();
@ -851,7 +858,7 @@ mod tests {
// PLAY.
super::parse_play(
response(include_bytes!("testdata/reolink_play.txt")),
&response(include_bytes!("testdata/reolink_play.txt")),
&mut p,
)
.unwrap();
@ -928,7 +935,7 @@ mod tests {
p.streams[1].state = StreamState::Init(StreamStateInit::default());
// PLAY.
super::parse_play(response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap();
super::parse_play(&response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap();
match p.streams[1].state {
StreamState::Init(state) => {
assert_eq!(state.initial_rtptime, Some(0));
@ -1055,7 +1062,7 @@ mod tests {
// PLAY.
super::parse_play(
response(include_bytes!("testdata/gw_main_play.txt")),
&response(include_bytes!("testdata/gw_main_play.txt")),
&mut p,
)
.unwrap();
@ -1119,7 +1126,11 @@ mod tests {
});
// PLAY.
super::parse_play(response(include_bytes!("testdata/gw_sub_play.txt")), &mut p).unwrap();
super::parse_play(
&response(include_bytes!("testdata/gw_sub_play.txt")),
&mut p,
)
.unwrap();
match &p.streams[0].state {
StreamState::Init(s) => {
assert_eq!(s.initial_seq, Some(273));

View File

@ -4,17 +4,19 @@
//! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550).
use bytes::{Buf, Bytes};
use failure::{bail, format_err, Error};
use log::{debug, trace};
use log::debug;
use pretty_hex::PrettyHex;
use crate::client::PacketItem;
use crate::{Error, ErrorInt};
/// An RTP packet.
/// A received RTP packet.
pub struct Packet {
pub rtsp_ctx: crate::Context,
pub ctx: crate::RtspMessageContext,
pub channel_id: u8,
pub stream_id: usize,
pub timestamp: crate::Timestamp,
pub ssrc: u32,
pub sequence_number: u16,
/// Number of skipped sequence numbers since the last packet.
@ -22,7 +24,7 @@ pub struct Packet {
/// In the case of the first packet on the stream, this may also report loss
/// packets since the `RTP-Info` header's `seq` value. However, currently
/// that header is not required to be present and may be ignored (see
/// [`crate::client::PlayPolicy::ignore_zero_seq()`].)
/// [`retina::client::PlayPolicy::ignore_zero_seq()`].)
pub loss: u16,
pub mark: bool,
@ -34,9 +36,11 @@ pub struct Packet {
impl std::fmt::Debug for Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Packet")
.field("rtsp_ctx", &self.rtsp_ctx)
.field("ctx", &self.ctx)
.field("channel_id", &self.channel_id)
.field("stream_id", &self.stream_id)
.field("timestamp", &self.timestamp)
.field("ssrc", &self.ssrc)
.field("sequence_number", &self.sequence_number)
.field("loss", &self.loss)
.field("mark", &self.mark)
@ -49,7 +53,7 @@ impl std::fmt::Debug for Packet {
#[derive(Debug)]
pub struct SenderReport {
pub stream_id: usize,
pub rtsp_ctx: crate::Context,
pub ctx: crate::RtspMessageContext,
pub timestamp: crate::Timestamp,
pub ntp_timestamp: crate::NtpTimestamp,
}
@ -83,8 +87,10 @@ impl StrictSequenceChecker {
pub fn rtp(
&mut self,
rtsp_ctx: crate::Context,
conn_ctx: &crate::ConnectionContext,
msg_ctx: &crate::RtspMessageContext,
timeline: &mut super::Timeline,
channel_id: u8,
stream_id: usize,
mut data: Bytes,
) -> Result<PacketItem, Error> {
@ -102,58 +108,70 @@ impl StrictSequenceChecker {
}
let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| {
format_err!(
"corrupt RTP header while expecting seq={:04x?} at {:#?}: {:?}\n{:#?}",
self.next_seq,
&rtsp_ctx,
e,
data.hex_dump()
)
wrap!(ErrorInt::RtspDataMessageError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
description: format!(
"corrupt RTP header while expecting seq={:04x?}: {:?}\n{:#?}",
&self.next_seq,
e,
data.hex_dump(),
),
})
})?;
let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq.
let ssrc = reader.ssrc();
let timestamp = match timeline.advance_to(reader.timestamp()) {
Ok(ts) => ts,
Err(e) => {
return Err(e
.context(format!(
"timestamp error in stream {} seq={:04x} {:#?}",
stream_id, sequence_number, &rtsp_ctx
))
.into())
}
};
let ssrc = reader.ssrc();
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
if matches!(self.ssrc, Some(s) if s != ssrc) || loss > 0x80_00 {
bail!(
"Expected ssrc={:08x?} seq={:04x?} got ssrc={:08x} seq={:04x} ts={} at {:#?}",
self.ssrc,
self.next_seq,
Err(description) => bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
timestamp,
&rtsp_ctx
);
description,
}),
};
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
if matches!(self.ssrc, Some(s) if s != ssrc) || loss > 0x80_00 {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description: format!(
"Expected ssrc={:08x?} seq={:04x?}",
self.ssrc, self.next_seq
),
});
}
self.ssrc = Some(ssrc);
let mark = reader.mark();
let payload_range = crate::as_range(&data, reader.payload())
.ok_or_else(|| format_err!("empty payload at {:#?}", &rtsp_ctx))?;
trace!(
"{:?} pkt {:04x}{} ts={} len={}",
&rtsp_ctx,
sequence_number,
if mark { " " } else { "(M)" },
&timestamp,
payload_range.len()
);
let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description: "empty payload".into(),
})
})?;
data.truncate(payload_range.end);
data.advance(payload_range.start);
self.next_seq = Some(sequence_number.wrapping_add(1));
Ok(PacketItem::RtpPacket(Packet {
ctx: *msg_ctx,
channel_id,
stream_id,
rtsp_ctx,
timestamp,
ssrc,
sequence_number,
loss,
mark,
@ -163,56 +181,45 @@ impl StrictSequenceChecker {
pub fn rtcp(
&mut self,
rtsp_ctx: crate::Context,
msg_ctx: &crate::RtspMessageContext,
timeline: &mut super::Timeline,
stream_id: usize,
mut data: Bytes,
) -> Result<Option<PacketItem>, Error> {
) -> Result<Option<PacketItem>, String> {
use rtcp::packet::Packet;
let mut sr = None;
let mut i = 0;
while !data.is_empty() {
let h = match rtcp::header::Header::unmarshal(&data) {
Err(e) => bail!("corrupt RTCP header at {:#?}: {}", &rtsp_ctx, e),
Err(e) => return Err(format!("corrupt RTCP header: {}", e)),
Ok(h) => h,
};
let pkt_len = (usize::from(h.length) + 1) * 4;
if pkt_len > data.len() {
bail!(
"rtcp pkt len {} vs remaining body len {} at {:#?}",
return Err(format!(
"RTCP packet length {} exceeds remaining data message length {}",
pkt_len,
data.len(),
&rtsp_ctx
);
));
}
let pkt = data.split_to(pkt_len);
match h.packet_type {
rtcp::header::PacketType::SenderReport => {
if i > 0 {
bail!("RTCP SR must be first in packet");
return Err("RTCP SR must be first in packet".into());
}
let pkt = match rtcp::sender_report::SenderReport::unmarshal(&pkt) {
Err(e) => bail!("corrupt RTCP SR at {:#?}: {}", &rtsp_ctx, e),
Ok(p) => p,
};
let timestamp = match timeline.place(pkt.rtp_time) {
Ok(ts) => ts,
Err(e) => {
return Err(e
.context(format!(
"bad RTP timestamp in RTCP SR {:#?} at {:#?}",
&pkt, &rtsp_ctx
))
.into())
}
};
let pkt = rtcp::sender_report::SenderReport::unmarshal(&pkt)
.map_err(|e| format!("corrupt RTCP SR: {}", e))?;
let timestamp = timeline.place(pkt.rtp_time).map_err(|mut description| {
description.push_str(" in RTCP SR");
description
})?;
// TODO: verify ssrc.
sr = Some(SenderReport {
stream_id,
rtsp_ctx,
ctx: *msg_ctx,
timestamp,
ntp_timestamp: crate::NtpTimestamp(pkt.ntp_time),
});

View File

@ -1,11 +1,8 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
use failure::{bail, format_err, Error};
use std::{
convert::TryFrom,
num::{NonZeroI32, NonZeroU32},
};
use std::convert::TryFrom;
use std::num::{NonZeroI32, NonZeroU32};
use crate::Timestamp;
@ -34,19 +31,19 @@ impl Timeline {
start: Option<u32>,
clock_rate: u32,
enforce_with_max_forward_jump_secs: Option<NonZeroU32>,
) -> Result<Self, Error> {
) -> Result<Self, String> {
let clock_rate = NonZeroU32::new(clock_rate)
.ok_or_else(|| format_err!("clock_rate=0 rejected to prevent division by zero"))?;
let max_forward_jump =
enforce_with_max_forward_jump_secs
.map(|j| i32::try_from(u64::from(j.get()) * u64::from(clock_rate.get())))
.transpose()
.map_err(|_| {
format_err!(
"clock_rate={} rejected because max forward jump of {} sec exceeds i32::MAX",
clock_rate, MAX_FORWARD_TIME_JUMP_SECS)
})?
.map(|j| NonZeroI32::new(j).expect("non-zero times non-zero must be non-zero"));
.ok_or_else(|| "clock_rate=0 rejected to prevent division by zero".to_string())?;
let max_forward_jump = enforce_with_max_forward_jump_secs
.map(|j| i32::try_from(u64::from(j.get()) * u64::from(clock_rate.get())))
.transpose()
.map_err(|_| {
format!(
"clock_rate={} rejected because max forward jump of {} sec exceeds i32::MAX",
clock_rate, MAX_FORWARD_TIME_JUMP_SECS
)
})?
.map(|j| NonZeroI32::new(j).expect("non-zero times non-zero must be non-zero"));
Ok(Timeline {
timestamp: i64::from(start.unwrap_or(0)),
start,
@ -62,10 +59,10 @@ impl Timeline {
///
/// If enforcement was enabled, this produces a monotonically increasing
/// [Timestamp], erroring on excessive or backward time jumps.
pub fn advance_to(&mut self, rtp_timestamp: u32) -> Result<Timestamp, Error> {
pub fn advance_to(&mut self, rtp_timestamp: u32) -> Result<Timestamp, String> {
let (timestamp, delta) = self.ts_and_delta(rtp_timestamp)?;
if matches!(self.max_forward_jump, Some(j) if !(0..j.get()).contains(&delta)) {
bail!(
return Err(format!(
"Timestamp jumped {} ({:.03} sec) from {} to {}; \
policy is to allow 0..{} sec only",
delta,
@ -73,7 +70,7 @@ impl Timeline {
self.timestamp,
timestamp,
self.max_forward_jump_secs
);
));
}
self.timestamp = timestamp.timestamp;
Ok(timestamp)
@ -84,11 +81,11 @@ impl Timeline {
///
/// This is useful for RTP timestamps in RTCP packets. They commonly refer
/// to time slightly before the most timestamp of the matching RTP stream.
pub fn place(&mut self, rtp_timestamp: u32) -> Result<Timestamp, Error> {
pub fn place(&mut self, rtp_timestamp: u32) -> Result<Timestamp, String> {
Ok(self.ts_and_delta(rtp_timestamp)?.0)
}
fn ts_and_delta(&mut self, rtp_timestamp: u32) -> Result<(Timestamp, i32), Error> {
fn ts_and_delta(&mut self, rtp_timestamp: u32) -> Result<(Timestamp, i32), String> {
let start = match self.start {
None => {
self.start = Some(rtp_timestamp);
@ -106,21 +103,18 @@ impl Timeline {
// take ~2^31 packets (~ 4 billion) to advance the time this far
// forward or backward even with no limits on time jump per
// packet.
format_err!(
format!(
"timestamp {} + delta {} won't fit in i64!",
self.timestamp,
delta
self.timestamp, delta
)
})?;
// Also error in similarly-unlikely NPT underflow.
if timestamp.checked_sub(i64::from(start)).is_none() {
bail!(
return Err(format!(
"timestamp {} + delta {} - start {} underflows i64!",
self.timestamp,
delta,
start
);
self.timestamp, delta, start
));
}
Ok((
Timestamp {

View File

@ -15,14 +15,13 @@
//! * ISO/IEC 14496-14: MP4 File Format.
use bytes::{Buf, BufMut, Bytes, BytesMut};
use failure::{bail, format_err, Error};
use std::{
convert::TryFrom,
fmt::Debug,
num::{NonZeroU16, NonZeroU32},
};
use crate::client::rtp::Packet;
use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error};
use super::CodecItem;
@ -64,15 +63,25 @@ const CHANNEL_CONFIGS: [Option<ChannelConfig>; 8] = [
impl AudioSpecificConfig {
/// Parses from raw bytes.
fn parse(config: &[u8]) -> Result<Self, Error> {
fn parse(config: &[u8]) -> Result<Self, String> {
let mut r = bitreader::BitReader::new(config);
let audio_object_type = match r.read_u8(5)? {
31 => 32 + r.read_u8(6)?,
let audio_object_type = match r
.read_u8(5)
.map_err(|e| format!("unable to read audio_object_type: {}", e))?
{
31 => {
32 + r
.read_u8(6)
.map_err(|e| format!("unable to read audio_object_type ext: {}", e))?
}
o => o,
};
// ISO/IEC 14496-3 section 1.6.3.4.
let sampling_frequency = match r.read_u8(4)? {
let sampling_frequency = match r
.read_u8(4)
.map_err(|e| format!("unable to read sampling_frequency: {}", e))?
{
0x0 => 96_000,
0x1 => 88_200,
0x2 => 64_000,
@ -85,39 +94,58 @@ impl AudioSpecificConfig {
0xa => 11_025,
0xb => 8_000,
0xc => 7_350,
v @ 0xd | v @ 0xe => bail!("reserved sampling_frequency_index value 0x{:x}", v),
0xf => r.read_u32(24)?,
v @ 0xd | v @ 0xe => {
return Err(format!("reserved sampling_frequency_index value 0x{:x}", v))
}
0xf => r
.read_u32(24)
.map_err(|e| format!("unable to read sampling_frequency ext: {}", e))?,
_ => unreachable!(),
};
let channels = {
let c = r.read_u8(4)?;
let c = r
.read_u8(4)
.map_err(|e| format!("unable to read channels: {}", e))?;
CHANNEL_CONFIGS
.get(usize::from(c))
.ok_or_else(|| format_err!("reserved channelConfiguration 0x{:x}", c))?
.ok_or_else(|| format!("reserved channelConfiguration 0x{:x}", c))?
.as_ref()
.ok_or_else(|| format_err!("program_config_element parsing unimplemented"))?
.ok_or_else(|| "program_config_element parsing unimplemented".to_string())?
};
if audio_object_type == 5 || audio_object_type == 29 {
// extensionSamplingFrequencyIndex + extensionSamplingFrequency.
if r.read_u8(4)? == 0xf {
r.skip(24)?;
if r.read_u8(4)
.map_err(|e| format!("unable to read extensionSamplingFrequencyIndex: {}", e))?
== 0xf
{
r.skip(24)
.map_err(|e| format!("unable to read extensionSamplingFrequency: {}", e))?;
}
// audioObjectType (a different one) + extensionChannelConfiguration.
if r.read_u8(5)? == 22 {
r.skip(4)?;
if r.read_u8(5)
.map_err(|e| format!("unable to read second audioObjectType: {}", e))?
== 22
{
r.skip(4)
.map_err(|e| format!("unable to read extensionChannelConfiguration: {}", e))?;
}
}
// The supported types here are the ones that use GASpecificConfig.
match audio_object_type {
1 | 2 | 3 | 4 | 6 | 7 | 17 | 19 | 20 | 21 | 22 | 23 => {}
o => bail!("unsupported audio_object_type {}", o),
o => return Err(format!("unsupported audio_object_type {}", o)),
}
// GASpecificConfig, ISO/IEC 14496-3 section 4.4.1.
let frame_length = match (audio_object_type, r.read_bool()?) {
let frame_length_flag = r
.read_bool()
.map_err(|e| format!("unable to read frame_length_flag: {}", e))?;
let frame_length = match (audio_object_type, frame_length_flag) {
(3 /* AAC SR */, false) => NonZeroU16::new(256).expect("non-zero"),
(3 /* AAC SR */, true) => bail!("frame_length_flag must be false for AAC SSR"),
(3 /* AAC SR */, true) => {
return Err("frame_length_flag must be false for AAC SSR".into())
}
(23 /* ER AAC LD */, false) => NonZeroU16::new(512).expect("non-zero"),
(23 /* ER AAC LD */, true) => NonZeroU16::new(480).expect("non-zero"),
(_, false) => NonZeroU16::new(1024).expect("non-zero"),
@ -135,7 +163,7 @@ impl AudioSpecificConfig {
/// Overwrites a buffer with a varint length, returning the length of the length.
/// See ISO/IEC 14496-1 section 8.3.3.
fn set_length(len: usize, data: &mut [u8]) -> Result<usize, Error> {
fn set_length(len: usize, data: &mut [u8]) -> Result<usize, String> {
if len < 1 << 7 {
data[0] = len as u8;
Ok(1)
@ -156,7 +184,7 @@ fn set_length(len: usize, data: &mut [u8]) -> Result<usize, Error> {
Ok(4)
} else {
// BaseDescriptor sets a maximum length of 2**28 - 1.
bail!("length {} too long", len);
return Err(format!("length {} too long", len));
}
}
@ -173,7 +201,11 @@ macro_rules! write_box {
};
let pos_end = $buf.len();
let len = pos_end.checked_sub(pos_start).unwrap();
$buf[pos_start..pos_start + 4].copy_from_slice(&u32::try_from(len)?.to_be_bytes()[..]);
$buf[pos_start..pos_start + 4].copy_from_slice(
&u32::try_from(len)
.map_err(|_| format!("box length {} exceeds u32::MAX", len))?
.to_be_bytes()[..],
);
r
}};
}
@ -208,7 +240,7 @@ macro_rules! write_descriptor {
/// Returns an MP4AudioSampleEntry (`mp4a`) box as in ISO/IEC 14496-14 section 5.6.1.
/// `config` should be a raw AudioSpecificConfig (matching `parsed`).
pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result<Bytes, Error> {
pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result<Bytes, String> {
let parsed = match parameters.config {
super::AudioCodecConfig::Aac(ref c) => c,
_ => unreachable!(),
@ -238,7 +270,7 @@ pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result<Bytes,
// version/structure of the AudioSampleEntryBox and the version of the
// stsd box. Just support the former for now.
let sampling_frequency = u16::try_from(parsed.sampling_frequency).map_err(|_| {
format_err!(
format!(
"aac sampling_frequency={} unsupported",
parsed.sampling_frequency
)
@ -307,7 +339,7 @@ pub(super) fn get_mp4a_box(parameters: &super::AudioParameters) -> Result<Bytes,
fn parse_format_specific_params(
clock_rate: u32,
format_specific_params: &str,
) -> Result<super::AudioParameters, Error> {
) -> Result<super::AudioParameters, String> {
let mut mode = None;
let mut config = None;
let mut size_length = None;
@ -321,29 +353,28 @@ fn parse_format_specific_params(
}
let (key, value) = p
.split_once('=')
.ok_or_else(|| format_err!("bad format-specific-param {}", p))?;
.ok_or_else(|| format!("bad format-specific-param {}", p))?;
match &key.to_ascii_lowercase()[..] {
"config" => {
config = Some(
hex::decode(value)
.map_err(|_| format_err!("config has invalid hex encoding"))?,
.map_err(|_| "config has invalid hex encoding".to_string())?,
);
}
"mode" => mode = Some(value),
"sizelength" => {
size_length = Some(
u16::from_str_radix(value, 10).map_err(|_| format_err!("bad sizeLength"))?,
);
size_length =
Some(u16::from_str_radix(value, 10).map_err(|_| "bad sizeLength".to_string())?);
}
"indexlength" => {
index_length = Some(
u16::from_str_radix(value, 10).map_err(|_| format_err!("bad indexLength"))?,
u16::from_str_radix(value, 10).map_err(|_| "bad indexLength".to_string())?,
);
}
"indexdeltalength" => {
index_delta_length = Some(
u16::from_str_radix(value, 10)
.map_err(|_| format_err!("bad indexDeltaLength"))?,
.map_err(|_| "bad indexDeltaLength".to_string())?,
);
}
_ => {}
@ -351,27 +382,24 @@ fn parse_format_specific_params(
}
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.3.6 AAC-hbr
if mode != Some("AAC-hbr") {
bail!("Expected mode AAC-hbr, got {:#?}", mode);
return Err(format!("Expected mode AAC-hbr, got {:#?}", mode));
}
let config = config.ok_or_else(|| format_err!("config must be specified"))?;
let config = config.ok_or_else(|| "config must be specified".to_string())?;
if size_length != Some(13) || index_length != Some(3) || index_delta_length != Some(3) {
bail!(
return Err(format!(
"Unexpected sizeLength={:?} indexLength={:?} indexDeltaLength={:?}",
size_length,
index_length,
index_delta_length
);
size_length, index_length, index_delta_length
));
}
let parsed = AudioSpecificConfig::parse(&config[..])?;
// TODO: is this a requirement? I might have read somewhere one can be a multiple of the other.
if clock_rate != parsed.sampling_frequency {
bail!(
return Err(format!(
"Expected RTP clock rate {} and AAC sampling frequency {} to match",
clock_rate,
parsed.sampling_frequency
);
clock_rate, parsed.sampling_frequency
));
}
// https://datatracker.ietf.org/doc/html/rfc6381#section-3.3
@ -397,7 +425,7 @@ pub(crate) struct Depacketizer {
#[derive(Debug)]
struct Aggregate {
ctx: crate::Context,
ctx: crate::RtspMessageContext,
/// RTP packets lost before the next frame in this aggregate. Includes old
/// loss that caused a previous fragment to be too short.
@ -410,7 +438,10 @@ struct Aggregate {
/// to be too short.
loss_since_mark: bool,
channel_id: u8,
stream_id: usize,
ssrc: u32,
sequence_number: u16,
/// The RTP-level timestamp; frame `i` is at timestamp `timestamp + frame_length*i`.
timestamp: crate::Timestamp,
@ -463,20 +494,19 @@ impl Depacketizer {
clock_rate: u32,
channels: Option<NonZeroU16>,
format_specific_params: Option<&str>,
) -> Result<Self, Error> {
) -> Result<Self, String> {
let format_specific_params = format_specific_params
.ok_or_else(|| format_err!("AAC requires format specific params"))?;
.ok_or_else(|| "AAC requires format specific params".to_string())?;
let parameters = parse_format_specific_params(clock_rate, format_specific_params)?;
let parsed = match parameters.config {
super::AudioCodecConfig::Aac(ref c) => c,
_ => unreachable!(),
};
if matches!(channels, Some(c) if c.get() != parsed.channels.channels) {
bail!(
return Err(format!(
"Expected RTP channels {:?} and AAC channels {:?} to match",
channels,
parsed.channels
);
channels, parsed.channels
));
}
let frame_length = parsed.frame_length;
Ok(Self {
@ -490,7 +520,7 @@ impl Depacketizer {
Some(&self.parameters)
}
pub(super) fn push(&mut self, mut pkt: Packet) -> Result<(), Error> {
pub(super) fn push(&mut self, mut pkt: Packet) -> Result<(), String> {
if pkt.loss > 0 && matches!(self.state, DepacketizerState::Fragmented(_)) {
log::debug!(
"Discarding fragmented AAC frame due to loss of {} RTP packets.",
@ -501,38 +531,37 @@ impl Depacketizer {
// Read the AU headers.
if pkt.payload.len() < 2 {
bail!("packet too short for au-header-length");
return Err("packet too short for au-header-length".to_string());
}
let au_headers_length_bits = pkt.payload.get_u16();
// AAC-hbr requires 16-bit AU headers: 13-bit size, 3-bit index.
if (au_headers_length_bits & 0x7) != 0 {
bail!("bad au-headers-length {}", au_headers_length_bits);
return Err(format!("bad au-headers-length {}", au_headers_length_bits));
}
let au_headers_count = au_headers_length_bits >> 4;
let data_off = usize::from(au_headers_count) << 1;
if pkt.payload.len() < (usize::from(au_headers_count) << 1) {
bail!("packet too short for au-headers");
return Err("packet too short for au-headers".to_string());
}
match &mut self.state {
DepacketizerState::Fragmented(ref mut frag) => {
if au_headers_count != 1 {
bail!(
return Err(format!(
"Got {}-AU packet while fragment in progress",
au_headers_count
);
));
}
if (pkt.timestamp.timestamp as u16) != frag.rtp_timestamp {
bail!(
return Err(format!(
"Timestamp changed from 0x{:04x} to 0x{:04x} mid-fragment",
frag.rtp_timestamp,
pkt.timestamp.timestamp as u16
);
frag.rtp_timestamp, pkt.timestamp.timestamp as u16
));
}
let au_header = u16::from_be_bytes([pkt.payload[0], pkt.payload[1]]);
let size = usize::from(au_header >> 3);
if size != usize::from(frag.size) {
bail!("size changed {}->{} mid-fragment", frag.size, size);
return Err(format!("size changed {}->{} mid-fragment", frag.size, size));
}
let data = &pkt.payload[data_off..];
match (frag.buf.len() + data.len()).cmp(&size) {
@ -544,22 +573,24 @@ impl Depacketizer {
};
return Ok(());
}
bail!(
return Err(format!(
"frag marked complete when {}+{}<{}",
frag.buf.len(),
data.len(),
size
);
));
}
}
std::cmp::Ordering::Equal => {
if !pkt.mark {
bail!("frag not marked complete when full data present");
return Err(
"frag not marked complete when full data present".to_string()
);
}
frag.buf.extend_from_slice(data);
println!("au {}: len-{}, fragmented", &pkt.timestamp, size);
self.state = DepacketizerState::Ready(super::AudioFrame {
ctx: pkt.rtsp_ctx,
ctx: pkt.ctx,
loss: frag.loss,
frame_length: NonZeroU32::from(self.frame_length),
stream_id: pkt.stream_id,
@ -567,19 +598,22 @@ impl Depacketizer {
data: std::mem::take(&mut frag.buf).freeze(),
});
}
std::cmp::Ordering::Greater => bail!("too much data in fragment"),
std::cmp::Ordering::Greater => return Err("too much data in fragment".into()),
}
}
DepacketizerState::Aggregated(_) => panic!("push when already in state aggregated"),
DepacketizerState::Idle { prev_loss } => {
if au_headers_count == 0 {
bail!("aggregate with no headers");
return Err("aggregate with no headers".to_string());
}
self.state = DepacketizerState::Aggregated(Aggregate {
ctx: pkt.rtsp_ctx,
ctx: pkt.ctx,
loss: *prev_loss + pkt.loss,
loss_since_mark: pkt.loss > 0,
channel_id: pkt.channel_id,
stream_id: pkt.stream_id,
ssrc: pkt.ssrc,
sequence_number: pkt.sequence_number,
timestamp: pkt.timestamp,
buf: pkt.payload,
frame_i: 0,
@ -593,7 +627,10 @@ impl Depacketizer {
Ok(())
}
pub(super) fn pull(&mut self) -> Result<Option<super::CodecItem>, Error> {
pub(super) fn pull(
&mut self,
conn_ctx: &ConnectionContext,
) -> Result<Option<super::CodecItem>, Error> {
match std::mem::replace(&mut self.state, DepacketizerState::Idle { prev_loss: 0 }) {
s @ DepacketizerState::Idle { .. } | s @ DepacketizerState::Fragmented(..) => {
self.state = s;
@ -613,15 +650,27 @@ impl Depacketizer {
// indicate interleaving, which we don't support.
// TODO: https://datatracker.ietf.org/doc/html/rfc3640#section-3.3.6
// says "receivers MUST support de-interleaving".
bail!("interleaving not yet supported");
return Err(error(
*conn_ctx,
agg,
"interleaving not yet supported".to_owned(),
));
}
if size > agg.buf.len() - agg.data_off {
// start of fragment
if agg.frame_count != 1 {
bail!("fragmented AUs must not share packets");
return Err(error(
*conn_ctx,
agg,
"fragmented AUs must not share packets".to_owned(),
));
}
if agg.mark {
bail!("mark can't be set on beginning of fragment");
return Err(error(
*conn_ctx,
agg,
"mark can't be set on beginning of fragment".to_owned(),
));
}
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&agg.buf[agg.data_off..]);
@ -635,8 +684,15 @@ impl Depacketizer {
return Ok(None);
}
if !agg.mark {
bail!("mark must be set on non-fragmented au");
return Err(error(
*conn_ctx,
agg,
"mark must be set on non-fragmented au".to_owned(),
));
}
let delta = u32::from(agg.frame_i) * u32::from(self.frame_length.get());
let agg_timestamp = agg.timestamp;
let frame = super::AudioFrame {
ctx: agg.ctx,
loss: agg.loss,
@ -644,9 +700,19 @@ impl Depacketizer {
frame_length: NonZeroU32::from(self.frame_length),
// u16 * u16 can't overflow u32, but i64 + u32 can overflow i64.
timestamp: agg
.timestamp
.try_add(u32::from(agg.frame_i) * u32::from(self.frame_length.get()))?,
timestamp: match agg_timestamp.try_add(delta) {
Some(t) => t,
None => {
return Err(error(
*conn_ctx,
agg,
format!(
"aggregate timestamp {} + {} overflows",
agg_timestamp, delta
),
))
}
},
data: agg.buf.slice(agg.data_off..agg.data_off + size),
};
agg.loss = 0;
@ -661,6 +727,18 @@ impl Depacketizer {
}
}
fn error(conn_ctx: ConnectionContext, agg: Aggregate, description: String) -> Error {
Error(Box::new(ErrorInt::RtpPacketError {
conn_ctx,
msg_ctx: agg.ctx,
channel_id: agg.channel_id,
stream_id: agg.stream_id,
ssrc: agg.ssrc,
sequence_number: agg.sequence_number,
description,
}))
}
#[cfg(test)]
mod tests {
#[test]

View File

@ -6,7 +6,6 @@
use std::num::NonZeroU32;
use bytes::Bytes;
use failure::{bail, Error};
use pretty_hex::PrettyHex;
#[derive(Debug)]
@ -17,9 +16,12 @@ pub(crate) struct Depacketizer {
impl Depacketizer {
/// Creates a new Depacketizer.
pub(super) fn new(clock_rate: u32) -> Result<Self, Error> {
pub(super) fn new(clock_rate: u32) -> Result<Self, String> {
if clock_rate != 8_000 {
bail!("Expected clock rate of 8000 for G.723, got {}", clock_rate);
return Err(format!(
"Expected clock rate of 8000 for G.723, got {}",
clock_rate
));
}
Ok(Self {
parameters: super::Parameters::Audio(super::AudioParameters {
@ -48,13 +50,16 @@ impl Depacketizer {
actual_hdr_bits == expected_hdr_bits
}
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), Error> {
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> {
assert!(self.pending.is_none());
if !Self::validate(&pkt) {
bail!("Invalid G.723 packet: {:#?}", pkt.payload.hex_dump());
return Err(format!(
"Invalid G.723 packet: {:#?}",
pkt.payload.hex_dump()
));
}
self.pending = Some(super::AudioFrame {
ctx: pkt.rtsp_ctx,
ctx: pkt.ctx,
loss: pkt.loss,
stream_id: pkt.stream_id,
timestamp: pkt.timestamp,
@ -64,7 +69,7 @@ impl Depacketizer {
Ok(())
}
pub(super) fn pull(&mut self) -> Result<Option<super::CodecItem>, Error> {
Ok(self.pending.take().map(super::CodecItem::AudioFrame))
pub(super) fn pull(&mut self) -> Option<super::CodecItem> {
self.pending.take().map(super::CodecItem::AudioFrame)
}
}

View File

@ -6,11 +6,10 @@
use std::convert::TryFrom;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use failure::{bail, format_err, Error};
use h264_reader::nal::{NalHeader, UnitType};
use log::debug;
use crate::{client::rtp::Packet, Timestamp};
use crate::{client::rtp::Packet, Error, Timestamp};
use super::VideoFrame;
@ -56,8 +55,8 @@ struct Nal {
/// An access unit that is currently being accumulated during `PreMark` state.
#[derive(Debug)]
struct AccessUnit {
start_ctx: crate::Context,
end_ctx: crate::Context,
start_ctx: crate::RtspMessageContext,
end_ctx: crate::RtspMessageContext,
timestamp: crate::Timestamp,
stream_id: usize,
@ -69,7 +68,7 @@ struct AccessUnit {
}
#[derive(Debug)]
#[allow(clippy::clippy::large_enum_variant)]
#[allow(clippy::large_enum_variant)]
enum DepacketizerInputState {
/// Not yet processing an access unit.
New,
@ -95,14 +94,17 @@ impl Depacketizer {
pub(super) fn new(
clock_rate: u32,
format_specific_params: Option<&str>,
) -> Result<Self, Error> {
) -> Result<Self, String> {
if clock_rate != 90_000 {
bail!("H.264 clock rate must always be 90000");
return Err(format!(
"invalid H.264 clock rate {}; must always be 90000",
clock_rate
));
}
// TODO: the spec doesn't require out-of-band parameters, so we shouldn't either.
let format_specific_params = format_specific_params
.ok_or_else(|| format_err!("H.264 depacketizer expects out-of-band parameters"))?;
.ok_or_else(|| "H.264 depacketizer expects out-of-band parameters".to_owned())?;
Ok(Depacketizer {
input_state: DepacketizerInputState::New,
pending: None,
@ -116,105 +118,100 @@ impl Depacketizer {
Some(&self.parameters.generic_parameters)
}
pub(super) fn push(&mut self, pkt: Packet) -> Result<(), Error> {
pub(super) fn push(&mut self, pkt: Packet) -> Result<(), String> {
// Push shouldn't be called until pull is exhausted.
if let Some(p) = self.pending.as_ref() {
panic!("push with data already pending: {:?}", p);
}
let seq = pkt.sequence_number;
let mut access_unit = match std::mem::replace(
&mut self.input_state,
DepacketizerInputState::New,
) {
DepacketizerInputState::New => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
AccessUnit::start(&pkt, 0)
}
DepacketizerInputState::PreMark(mut access_unit) => {
if pkt.loss > 0 {
self.nals.clear();
self.pieces.clear();
if access_unit.timestamp.timestamp == pkt.timestamp.timestamp {
// Loss within this access unit. Ignore until mark or new timestamp.
self.input_state = if pkt.mark {
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
loss: pkt.loss,
}
} else {
self.pieces.clear();
self.nals.clear();
DepacketizerInputState::Loss {
timestamp: pkt.timestamp,
pkts: pkt.loss,
}
};
let mut access_unit =
match std::mem::replace(&mut self.input_state, DepacketizerInputState::New) {
DepacketizerInputState::New => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
AccessUnit::start(&pkt, 0)
}
DepacketizerInputState::PreMark(mut access_unit) => {
if pkt.loss > 0 {
self.nals.clear();
self.pieces.clear();
if access_unit.timestamp.timestamp == pkt.timestamp.timestamp {
// Loss within this access unit. Ignore until mark or new timestamp.
self.input_state = if pkt.mark {
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
loss: pkt.loss,
}
} else {
self.pieces.clear();
self.nals.clear();
DepacketizerInputState::Loss {
timestamp: pkt.timestamp,
pkts: pkt.loss,
}
};
return Ok(());
}
// A suffix of a previous access unit was lost; discard it.
// A prefix of the new one may have been lost; try parsing.
AccessUnit::start(&pkt, 0)
} else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp {
if access_unit.in_fu_a {
return Err(format!(
"Timestamp changed from {} to {} in the middle of a fragmented NAL",
access_unit.timestamp, pkt.timestamp
));
}
access_unit.end_ctx = pkt.ctx;
self.pending = Some(self.finalize_access_unit(access_unit)?);
AccessUnit::start(&pkt, 0)
} else {
access_unit
}
}
DepacketizerInputState::PostMark {
timestamp: state_ts,
loss,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if state_ts.timestamp == pkt.timestamp.timestamp {
return Err("packet follows marked packet with same timestamp".into());
}
AccessUnit::start(&pkt, loss)
}
DepacketizerInputState::Loss {
timestamp,
mut pkts,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if pkt.timestamp.timestamp == timestamp.timestamp {
pkts += pkt.loss;
self.input_state = DepacketizerInputState::Loss { timestamp, pkts };
return Ok(());
}
// A suffix of a previous access unit was lost; discard it.
// A prefix of the new one may have been lost; try parsing.
AccessUnit::start(&pkt, 0)
} else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp {
if access_unit.in_fu_a {
bail!("Timestamp changed from {} to {} in the middle of a fragmented NAL at seq={:04x} {:#?}", access_unit.timestamp, pkt.timestamp, seq, &pkt.rtsp_ctx);
}
access_unit.end_ctx = pkt.rtsp_ctx;
self.pending = Some(self.finalize_access_unit(access_unit)?);
AccessUnit::start(&pkt, 0)
} else {
access_unit
AccessUnit::start(&pkt, pkts)
}
}
DepacketizerInputState::PostMark {
timestamp: state_ts,
loss,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if state_ts.timestamp == pkt.timestamp.timestamp {
bail!("Received packet with timestamp {} after marked packet with same timestamp at seq={:04x} {:#?}", pkt.timestamp, seq, &pkt.rtsp_ctx);
}
AccessUnit::start(&pkt, loss)
}
DepacketizerInputState::Loss {
timestamp,
mut pkts,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if pkt.timestamp.timestamp == timestamp.timestamp {
pkts += pkt.loss;
self.input_state = DepacketizerInputState::Loss { timestamp, pkts };
return Ok(());
}
AccessUnit::start(&pkt, pkts)
}
};
};
let mut data = pkt.payload;
if data.is_empty() {
bail!("Empty NAL at RTP seq {:04x}, {:#?}", seq, &pkt.rtsp_ctx);
return Err("Empty NAL".into());
}
// https://tools.ietf.org/html/rfc6184#section-5.2
let nal_header = data[0];
if (nal_header >> 7) != 0 {
bail!(
"NAL header has F bit set at seq {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
);
return Err(format!("NAL header {:02x} has F bit set", nal_header));
}
data.advance(1); // skip the header byte.
match nal_header & 0b11111 {
1..=23 => {
if access_unit.in_fu_a {
bail!(
"Non-fragmented NAL while fragment in progress seq {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
);
return Err(format!(
"Non-fragmented NAL {:02x} while fragment in progress",
nal_header
));
}
let len = u32::try_from(data.len()).expect("data len < u16::MAX") + 1;
let next_piece_idx = self.add_piece(data)?;
@ -228,24 +225,25 @@ impl Depacketizer {
// STAP-A. https://tools.ietf.org/html/rfc6184#section-5.7.1
loop {
if data.remaining() < 3 {
bail!(
return Err(format!(
"STAP-A has {} remaining bytes; expecting 2-byte length, non-empty NAL",
data.remaining()
);
));
}
let len = data.get_u16();
//let len = usize::from(data.get_u16());
if len == 0 {
bail!("zero length in STAP-A");
return Err("zero length in STAP-A".into());
}
let hdr =
NalHeader::new(data[0]).map_err(|_| format_err!("bad header in STAP-A"))?;
let hdr = NalHeader::new(data[0])
.map_err(|_| format!("bad header {:02x} in STAP-A", data[0]))?;
match data.remaining().cmp(&usize::from(len)) {
std::cmp::Ordering::Less => bail!(
"STAP-A too short: {} bytes remaining, expecting {}-byte NAL",
data.remaining(),
len
),
std::cmp::Ordering::Less => {
return Err(format!(
"STAP-A too short: {} bytes remaining, expecting {}-byte NAL",
data.remaining(),
len
))
}
std::cmp::Ordering::Equal => {
data.advance(1);
let next_piece_idx = self.add_piece(data)?;
@ -269,21 +267,16 @@ impl Depacketizer {
}
}
}
25..=27 | 29 => bail!(
"unimplemented NAL (header 0x{:02x}) at seq {:04x} {:#?}",
nal_header,
seq,
&pkt.rtsp_ctx
),
25..=27 | 29 => {
return Err(format!(
"unimplemented/unexpected interleaved mode NAL ({:02x})",
nal_header,
))
}
28 => {
// FU-A. https://tools.ietf.org/html/rfc6184#section-5.8
if data.len() < 2 {
bail!(
"FU-A len {} too short at seq {:04x} {:#?}",
data.len(),
seq,
&pkt.rtsp_ctx
);
return Err(format!("FU-A len {} too short", data.len()));
}
let fu_header = data[0];
let start = (fu_header & 0b10000000) != 0;
@ -294,27 +287,14 @@ impl Depacketizer {
.expect("NalHeader is valid");
data.advance(1);
if (start && end) || reserved {
bail!(
"Invalid FU-A header {:08b} at seq {:04x} {:#?}",
fu_header,
seq,
&pkt.rtsp_ctx
);
return Err(format!("Invalid FU-A header {:02x}", fu_header));
}
if !end && pkt.mark {
bail!(
"FU-A pkt with MARK && !END at seq {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
);
return Err("FU-A pkt with MARK && !END".into());
}
let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX");
match (start, access_unit.in_fu_a) {
(true, true) => bail!(
"FU-A with start bit while frag in progress at seq {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
),
(true, true) => return Err("FU-A with start bit while frag in progress".into()),
(true, false) => {
self.add_piece(data)?;
self.nals.push(Nal {
@ -328,24 +308,17 @@ impl Depacketizer {
let pieces = self.add_piece(data)?;
let nal = self.nals.last_mut().expect("nals non-empty while in fu-a");
if u8::from(nal_header) != u8::from(nal.hdr) {
bail!(
"FU-A has inconsistent NAL type: {:?} then {:?} at {:02x} {:?}",
nal.hdr,
nal_header,
seq,
&pkt.rtsp_ctx
);
return Err(format!(
"FU-A has inconsistent NAL type: {:?} then {:?}",
nal.hdr, nal_header,
));
}
nal.len += u32_len;
if end {
nal.next_piece_idx = pieces;
access_unit.in_fu_a = false;
} else if pkt.mark {
bail!(
"FU-A with MARK and no END at seq {:04x} {:#?}",
seq,
pkt.rtsp_ctx
);
return Err("FU-A has MARK and no END".into());
}
}
(false, false) => {
@ -358,23 +331,14 @@ impl Depacketizer {
};
return Ok(());
}
bail!(
"FU-A with start bit unset while no frag in progress at {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
);
return Err("FU-A has start bit unset while no frag in progress".into());
}
}
}
_ => bail!(
"bad nal header {:0x} at seq {:04x} {:#?}",
nal_header,
seq,
&pkt.rtsp_ctx
),
_ => return Err(format!("bad nal header {:02x}", nal_header)),
}
self.input_state = if pkt.mark {
access_unit.end_ctx = pkt.rtsp_ctx;
access_unit.end_ctx = pkt.ctx;
self.pending = Some(self.finalize_access_unit(access_unit)?);
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
@ -386,17 +350,17 @@ impl Depacketizer {
Ok(())
}
pub(super) fn pull(&mut self) -> Result<Option<super::CodecItem>, Error> {
Ok(self.pending.take().map(super::CodecItem::VideoFrame))
pub(super) fn pull(&mut self) -> Option<super::CodecItem> {
self.pending.take().map(super::CodecItem::VideoFrame)
}
/// Adds a piece to `self.pieces`, erroring if it becomes absurdly large.
fn add_piece(&mut self, piece: Bytes) -> Result<u32, Error> {
fn add_piece(&mut self, piece: Bytes) -> Result<u32, String> {
self.pieces.push(piece);
u32::try_from(self.pieces.len()).map_err(|_| format_err!("more than u32::MAX pieces!"))
u32::try_from(self.pieces.len()).map_err(|_| "more than u32::MAX pieces!".to_string())
}
fn finalize_access_unit(&mut self, au: AccessUnit) -> Result<VideoFrame, Error> {
fn finalize_access_unit(&mut self, au: AccessUnit) -> Result<VideoFrame, String> {
let mut piece_idx = 0;
let mut retained_len = 0usize;
let mut is_random_access_point = false;
@ -453,6 +417,7 @@ impl Depacketizer {
let new_parameters = if new_sps.is_some() || new_pps.is_some() {
let sps_nal = new_sps.as_deref().unwrap_or(&self.parameters.sps_nal);
let pps_nal = new_pps.as_deref().unwrap_or(&self.parameters.pps_nal);
// TODO: could map this to a RtpPacketError more accurately.
self.parameters = InternalParameters::parse_sps_and_pps(sps_nal, pps_nal)?;
match self.parameters.generic_parameters {
super::Parameters::Video(ref p) => Some(p.clone()),
@ -478,8 +443,8 @@ impl Depacketizer {
impl AccessUnit {
fn start(pkt: &crate::client::rtp::Packet, additional_loss: u16) -> Self {
AccessUnit {
start_ctx: pkt.rtsp_ctx,
end_ctx: pkt.rtsp_ctx,
start_ctx: pkt.ctx,
end_ctx: pkt.ctx,
timestamp: pkt.timestamp,
stream_id: pkt.stream_id,
in_fu_a: false,
@ -503,7 +468,7 @@ struct InternalParameters {
impl InternalParameters {
/// Parses metadata from the `format-specific-params` of a SDP `fmtp` media attribute.
fn parse_format_specific_params(format_specific_params: &str) -> Result<Self, Error> {
fn parse_format_specific_params(format_specific_params: &str) -> Result<Self, String> {
let mut sprop_parameter_sets = None;
for p in format_specific_params.split(';') {
let (key, value) = p.trim().split_once('=').unwrap();
@ -511,38 +476,37 @@ impl InternalParameters {
sprop_parameter_sets = Some(value);
}
}
let sprop_parameter_sets = sprop_parameter_sets.ok_or_else(|| {
format_err!("no sprop-parameter-sets in H.264 format-specific-params")
})?;
let sprop_parameter_sets = sprop_parameter_sets
.ok_or_else(|| "no sprop-parameter-sets in H.264 format-specific-params".to_string())?;
let mut sps_nal = None;
let mut pps_nal = None;
for nal in sprop_parameter_sets.split(',') {
let nal =
base64::decode(nal).map_err(|_| format_err!("NAL has invalid base64 encoding"))?;
base64::decode(nal).map_err(|_| "NAL has invalid base64 encoding".to_string())?;
if nal.is_empty() {
bail!("empty NAL");
return Err("empty NAL".into());
}
let header = h264_reader::nal::NalHeader::new(nal[0])
.map_err(|_| format_err!("bad NAL header {:0x}", nal[0]))?;
.map_err(|_| format!("bad NAL header {:0x}", nal[0]))?;
match header.nal_unit_type() {
UnitType::SeqParameterSet => {
if sps_nal.is_some() {
bail!("multiple SPSs");
return Err("multiple SPSs".into());
}
sps_nal = Some(nal);
}
UnitType::PicParameterSet => {
if pps_nal.is_some() {
bail!("multiple PPSs");
return Err("multiple PPSs".into());
}
pps_nal = Some(nal);
}
_ => bail!("only SPS and PPS expected in parameter sets"),
_ => return Err("only SPS and PPS expected in parameter sets".into()),
}
}
let sps_nal = sps_nal.ok_or_else(|| format_err!("no sps"))?;
let pps_nal = pps_nal.ok_or_else(|| format_err!("no pps"))?;
let sps_nal = sps_nal.ok_or_else(|| "no sps".to_string())?;
let pps_nal = pps_nal.ok_or_else(|| "no pps".to_string())?;
// GW security GW4089IP leaves Annex B start codes at the end of both
// SPS and PPS in the sprop-parameter-sets. Leaving them in means
@ -557,22 +521,22 @@ impl InternalParameters {
Self::parse_sps_and_pps(sps_nal, pps_nal)
}
fn parse_sps_and_pps(sps_nal: &[u8], pps_nal: &[u8]) -> Result<InternalParameters, Error> {
fn parse_sps_and_pps(sps_nal: &[u8], pps_nal: &[u8]) -> Result<InternalParameters, String> {
let sps_rbsp = h264_reader::rbsp::decode_nal(&sps_nal[1..]);
if sps_rbsp.len() < 4 {
bail!("bad sps");
return Err("bad sps".into());
}
let rfc6381_codec = format!(
"avc1.{:02X}{:02X}{:02X}",
sps_rbsp[0], sps_rbsp[1], sps_rbsp[2]
);
let sps = h264_reader::nal::sps::SeqParameterSet::from_bytes(&sps_rbsp)
.map_err(|e| format_err!("Bad SPS: {:?}", e))?;
.map_err(|e| format!("Bad SPS: {:?}", e))?;
debug!("sps: {:#?}", &sps);
let pixel_dimensions = sps
.pixel_dimensions()
.map_err(|e| format_err!("SPS has invalid pixel dimensions: {:?}", e))?;
.map_err(|e| format!("SPS has invalid pixel dimensions: {:?}", e))?;
// Create the AVCDecoderConfiguration, ISO/IEC 14496-15 section 5.2.4.1.
// The beginning of the AVCDecoderConfiguration takes a few values from
@ -591,12 +555,20 @@ impl InternalParameters {
// ffmpeg's ff_isom_write_avcc has the same limitation, so it's probably
// fine. This next byte is a reserved 0b111 + a 5-bit # of SPSs (1).
avc_decoder_config.put_u8(0xe1);
avc_decoder_config.extend(&u16::try_from(sps_nal.len())?.to_be_bytes()[..]);
avc_decoder_config.extend(
&u16::try_from(sps_nal.len())
.map_err(|_| format!("SPS NAL is {} bytes long; must fit in u16", sps_nal.len()))?
.to_be_bytes()[..],
);
let sps_nal_start = avc_decoder_config.len();
avc_decoder_config.extend_from_slice(sps_nal);
let sps_nal_end = avc_decoder_config.len();
avc_decoder_config.put_u8(1); // # of PPSs.
avc_decoder_config.extend(&u16::try_from(pps_nal.len())?.to_be_bytes()[..]);
avc_decoder_config.extend(
&u16::try_from(pps_nal.len())
.map_err(|_| format!("PPS NAL is {} bytes long; must fit in u16", pps_nal.len()))?
.to_be_bytes()[..],
);
let pps_nal_start = avc_decoder_config.len();
avc_decoder_config.extend_from_slice(pps_nal);
let pps_nal_end = avc_decoder_config.len();
@ -651,7 +623,7 @@ fn matches(nal: &[u8], hdr: NalHeader, pieces: &[Bytes]) -> bool {
if nal.len() < new_pos {
return false;
}
if &piece[..] != &nal[nal_pos..new_pos] {
if piece[..] != nal[nal_pos..new_pos] {
return false;
}
nal_pos = new_pos;
@ -688,10 +660,10 @@ impl Packetizer {
max_payload_size: u16,
stream_id: usize,
initial_sequence_number: u16,
) -> Result<Self, Error> {
) -> Result<Self, String> {
if max_payload_size < 3 {
// minimum size to make progress with FU-A packets.
bail!("max_payload_size must be > 3");
return Err("max_payload_size must be > 3".into());
}
Ok(Self {
max_payload_size,
@ -707,39 +679,43 @@ impl Packetizer {
Ok(())
}
pub fn pull(&mut self) -> Result<Option<Packet>, Error> {
// TODO: better error type?
pub fn pull(&mut self) -> Result<Option<Packet>, String> {
let max_payload_size = usize::from(self.max_payload_size);
match std::mem::replace(&mut self.state, PacketizerState::Idle) {
PacketizerState::Idle => return Ok(None),
PacketizerState::Idle => Ok(None),
PacketizerState::HaveData {
timestamp,
mut data,
} => {
if data.len() < 5 {
bail!(
return Err(format!(
"have only {} bytes; expected 4-byte length + non-empty NAL",
data.len()
);
));
}
let len = data.get_u32();
let usize_len = usize::try_from(len).expect("u32 fits in usize");
if data.len() < usize_len || len == 0 {
bail!("bad length of {} bytes; expected [1, {}]", len, data.len());
return Err(format!(
"bad length of {} bytes; expected [1, {}]",
len,
data.len()
));
}
let sequence_number = self.next_sequence_number;
self.next_sequence_number = self.next_sequence_number.wrapping_add(1);
let hdr =
NalHeader::new(data[0]).map_err(|_| format_err!("F bit in NAL header"))?;
let hdr = NalHeader::new(data[0]).map_err(|_| "F bit in NAL header".to_owned())?;
if matches!(hdr.nal_unit_type(), UnitType::Unspecified(_)) {
// This can clash with fragmentation/aggregation NAL types.
bail!("bad NAL header {:?}", hdr);
return Err(format!("bad NAL header {:?}", hdr));
}
if usize_len > max_payload_size {
// start a FU-A.
data.advance(1);
let mut payload = Vec::with_capacity(max_payload_size);
let fu_indicator = (hdr.nal_ref_idc() << 5) | 28;
let fu_header = 0b100_00000 | hdr.nal_unit_type().id(); // START bit set.
let fu_header = 0b1000_0000 | hdr.nal_unit_type().id(); // START bit set.
payload.extend_from_slice(&[fu_indicator, fu_header]);
payload.extend_from_slice(&data[..max_payload_size - 2]);
data.advance(max_payload_size - 2);
@ -749,10 +725,13 @@ impl Packetizer {
left: len + 1 - u32::from(self.max_payload_size),
data,
};
// TODO: ctx, channel_id, and ssrc are placeholders.
return Ok(Some(Packet {
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: self.stream_id,
timestamp,
ssrc: 0,
sequence_number,
loss: 0,
mark: false,
@ -772,9 +751,11 @@ impl Packetizer {
mark = false;
}
Ok(Some(Packet {
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: self.stream_id,
timestamp,
ssrc: 0,
sequence_number,
loss: 0,
mark,
@ -809,7 +790,7 @@ impl Packetizer {
let usize_left = usize::try_from(left).expect("u32 fits in usize");
payload = Vec::with_capacity(usize_left + 2);
let fu_indicator = (hdr.nal_ref_idc() << 5) | 28;
let fu_header = 0b010_00000 | hdr.nal_unit_type().id(); // END bit set.
let fu_header = 0b0100_0000 | hdr.nal_unit_type().id(); // END bit set.
payload.extend_from_slice(&[fu_indicator, fu_header]);
payload.extend_from_slice(&data[..usize_left]);
if data.len() == usize_left {
@ -821,10 +802,13 @@ impl Packetizer {
self.state = PacketizerState::HaveData { timestamp, data };
}
}
// TODO: placeholders.
Ok(Some(Packet {
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: self.stream_id,
timestamp,
ssrc: 0,
sequence_number,
loss: 0,
mark,
@ -927,57 +911,67 @@ mod tests {
};
d.push(Packet {
// plain SEI packet.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x06plain"),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
assert!(d.pull().is_none());
d.push(Packet {
// STAP-A packet.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2"),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, start.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x7c\x86fu-a start, "),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, middle.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x7c\x06fu-a middle, "),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, end.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 4,
loss: 0,
mark: true,
@ -985,7 +979,7 @@ mod tests {
})
.unwrap();
let frame = match d.pull() {
Ok(Some(CodecItem::VideoFrame(frame))) => frame,
Some(CodecItem::VideoFrame(frame)) => frame,
_ => panic!(),
};
assert_eq!(
@ -1012,20 +1006,24 @@ mod tests {
start: 0,
};
d.push(Packet { // new SPS.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10"),
}).unwrap();
assert!(d.pull().unwrap().is_none());
assert!(d.pull().is_none());
d.push(Packet {
// same PPS again.
rtsp_ctx: crate::Context::dummy(),
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: true,
@ -1033,7 +1031,7 @@ mod tests {
})
.unwrap();
let frame = match d.pull() {
Ok(Some(CodecItem::VideoFrame(frame))) => frame,
Some(CodecItem::VideoFrame(frame)) => frame,
_ => panic!(),
};
assert!(frame.new_parameters.is_some());

View File

@ -10,8 +10,10 @@
use std::num::{NonZeroU16, NonZeroU32};
use crate::client::rtp;
use crate::error::ErrorInt;
use crate::ConnectionContext;
use crate::Error;
use bytes::{Buf, Bytes};
use failure::{bail, Error};
use pretty_hex::PrettyHex;
pub(crate) mod aac;
@ -155,13 +157,15 @@ impl AudioParameters {
/// Not all codecs can be placed into a `.mp4` file, and even for supported codecs there
/// may be unsupported edge cases.
pub fn sample_entry(&self) -> Result<Bytes, Error> {
aac::get_mp4a_box(self)
// TODO: InvalidArgument doesn't seem quite right. We probably should
// produce the mp4a eagerly anyway.
aac::get_mp4a_box(self).map_err(|description| wrap!(ErrorInt::InvalidArgument(description)))
}
}
/// An audio frame, which consists of one or more samples.
pub struct AudioFrame {
pub ctx: crate::Context,
pub ctx: crate::RtspMessageContext,
pub stream_id: usize,
pub timestamp: crate::Timestamp,
pub frame_length: NonZeroU32,
@ -206,7 +210,7 @@ impl Buf for AudioFrame {
pub struct MessageParameters(onvif::CompressionType);
pub struct MessageFrame {
pub ctx: crate::Context,
pub ctx: crate::RtspMessageContext,
pub timestamp: crate::Timestamp,
pub stream_id: usize,
@ -245,8 +249,8 @@ pub struct VideoFrame {
// A pair of contexts: for the start and for the end.
// Having both can be useful to measure the total time elapsed while receiving the frame.
start_ctx: crate::Context,
end_ctx: crate::Context,
start_ctx: crate::RtspMessageContext,
end_ctx: crate::RtspMessageContext,
/// This picture's timestamp in the time base associated with the stream.
pub timestamp: crate::Timestamp,
@ -268,12 +272,12 @@ pub struct VideoFrame {
impl VideoFrame {
#[inline]
pub fn start_ctx(&self) -> crate::Context {
pub fn start_ctx(&self) -> crate::RtspMessageContext {
self.start_ctx
}
#[inline]
pub fn end_ctx(&self) -> crate::Context {
pub fn end_ctx(&self) -> crate::RtspMessageContext {
self.end_ctx
}
@ -327,7 +331,7 @@ impl Depacketizer {
clock_rate: u32,
channels: Option<NonZeroU16>,
format_specific_params: Option<&str>,
) -> Result<Self, Error> {
) -> Result<Self, String> {
use onvif::CompressionType;
// RTP Payload Format Media Types
@ -382,11 +386,10 @@ impl Depacketizer {
media,
encoding_name
);
bail!(
return Err(format!(
"no depacketizer for media/encoding_name {}/{}",
media,
encoding_name
);
media, encoding_name
));
}
}))
}
@ -401,7 +404,7 @@ impl Depacketizer {
}
}
pub fn push(&mut self, input: rtp::Packet) -> Result<(), Error> {
pub fn push(&mut self, input: rtp::Packet) -> Result<(), String> {
match &mut self.0 {
DepacketizerInner::Aac(d) => d.push(input),
DepacketizerInner::G723(d) => d.push(input),
@ -411,13 +414,13 @@ impl Depacketizer {
}
}
pub fn pull(&mut self) -> Result<Option<CodecItem>, Error> {
pub fn pull(&mut self, conn_ctx: &ConnectionContext) -> Result<Option<CodecItem>, Error> {
match &mut self.0 {
DepacketizerInner::Aac(d) => d.pull(),
DepacketizerInner::G723(d) => d.pull(),
DepacketizerInner::H264(d) => d.pull(),
DepacketizerInner::Onvif(d) => d.pull(),
DepacketizerInner::SimpleAudio(d) => d.pull(),
DepacketizerInner::Aac(d) => d.pull(conn_ctx),
DepacketizerInner::G723(d) => Ok(d.pull()),
DepacketizerInner::H264(d) => Ok(d.pull()),
DepacketizerInner::Onvif(d) => Ok(d.pull()),
DepacketizerInner::SimpleAudio(d) => Ok(d.pull()),
}
}
}

View File

@ -9,7 +9,6 @@
//! bit set end messages.
use bytes::{Buf, BufMut, BytesMut};
use failure::{bail, Error};
use super::CodecItem;
@ -37,7 +36,7 @@ enum State {
#[derive(Debug)]
struct InProgress {
ctx: crate::Context,
ctx: crate::RtspMessageContext,
timestamp: crate::Timestamp,
data: BytesMut,
loss: u16,
@ -56,7 +55,7 @@ impl Depacketizer {
Some(&self.parameters)
}
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), failure::Error> {
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> {
if pkt.loss > 0 {
if let State::InProgress(in_progress) = &self.state {
log::debug!(
@ -70,12 +69,10 @@ impl Depacketizer {
let mut in_progress = match std::mem::replace(&mut self.state, State::Idle) {
State::InProgress(in_progress) => {
if in_progress.timestamp.timestamp != pkt.timestamp.timestamp {
bail!(
"Timestamp changed from {} to {} (@ seq {:04x}) with message in progress",
&in_progress.timestamp,
&pkt.timestamp,
pkt.sequence_number
);
return Err(format!(
"Timestamp changed from {} to {} with message in progress",
&in_progress.timestamp, &pkt.timestamp,
));
}
in_progress
}
@ -86,7 +83,7 @@ impl Depacketizer {
self.state = State::Ready(super::MessageFrame {
stream_id: pkt.stream_id,
loss: pkt.loss,
ctx: pkt.rtsp_ctx,
ctx: pkt.ctx,
timestamp: pkt.timestamp,
data: pkt.payload,
});
@ -94,7 +91,7 @@ impl Depacketizer {
}
InProgress {
loss: pkt.loss,
ctx: pkt.rtsp_ctx,
ctx: pkt.ctx,
timestamp: pkt.timestamp,
data: BytesMut::with_capacity(self.high_water_size),
}
@ -117,13 +114,13 @@ impl Depacketizer {
Ok(())
}
pub(super) fn pull(&mut self) -> Result<Option<CodecItem>, Error> {
Ok(match std::mem::replace(&mut self.state, State::Idle) {
pub(super) fn pull(&mut self) -> Option<CodecItem> {
match std::mem::replace(&mut self.state, State::Idle) {
State::Ready(message) => Some(CodecItem::MessageFrame(message)),
s => {
self.state = s;
None
}
})
}
}
}

View File

@ -7,8 +7,6 @@
use std::num::NonZeroU32;
use bytes::Bytes;
use failure::format_err;
use failure::Error;
use super::CodecItem;
@ -50,10 +48,10 @@ impl Depacketizer {
}
}
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), Error> {
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> {
assert!(self.pending.is_none());
let frame_length = self.frame_length(pkt.payload.len()).ok_or_else(|| {
format_err!(
format!(
"invalid length {} for payload of {}-bit audio samples",
pkt.payload.len(),
self.bits_per_sample
@ -61,7 +59,7 @@ impl Depacketizer {
})?;
self.pending = Some(super::AudioFrame {
loss: pkt.loss,
ctx: pkt.rtsp_ctx,
ctx: pkt.ctx,
stream_id: pkt.stream_id,
timestamp: pkt.timestamp,
frame_length,
@ -70,7 +68,7 @@ impl Depacketizer {
Ok(())
}
pub(super) fn pull(&mut self) -> Result<Option<super::CodecItem>, Error> {
Ok(self.pending.take().map(CodecItem::AudioFrame))
pub(super) fn pull(&mut self) -> Option<super::CodecItem> {
self.pending.take().map(CodecItem::AudioFrame)
}
}

View File

@ -1,17 +1,36 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
use bytes::{Buf, BufMut, Bytes, BytesMut};
use failure::{bail, format_err, Error};
use bytes::Bytes;
//use failure::{bail, format_err, Error};
use once_cell::sync::Lazy;
use pretty_hex::PrettyHex;
use rtsp_types::{Data, Message};
use std::convert::TryFrom;
use rtsp_types::Message;
use std::fmt::{Debug, Display};
use std::num::NonZeroU32;
mod error;
pub use error::Error;
/// Wraps the supplied `ErrorInt` and returns it as an `Err`.
macro_rules! bail {
($e:expr) => {
return Err(crate::error::Error(Box::new($e)))
};
}
macro_rules! wrap {
($e:expr) => {
crate::error::Error(Box::new($e))
};
}
pub mod client;
pub mod codec;
//mod error;
mod tokio;
use error::ErrorInt;
pub static X_ACCEPT_DYNAMIC_RATE: Lazy<rtsp_types::HeaderName> = Lazy::new(|| {
rtsp_types::HeaderName::from_static_str("x-Accept-Dynamic-Rate").expect("is ascii")
@ -19,10 +38,11 @@ pub static X_ACCEPT_DYNAMIC_RATE: Lazy<rtsp_types::HeaderName> = Lazy::new(|| {
pub static X_DYNAMIC_RATE: Lazy<rtsp_types::HeaderName> =
Lazy::new(|| rtsp_types::HeaderName::from_static_str("x-Dynamic-Rate").expect("is ascii"));
/// A received RTSP message.
#[derive(Debug)]
pub struct ReceivedMessage {
pub ctx: Context,
pub msg: Message<Bytes>,
struct ReceivedMessage {
ctx: RtspMessageContext,
msg: Message<Bytes>,
}
/// A monotonically increasing timestamp within an RTP stream.
@ -47,16 +67,14 @@ pub struct Timestamp {
}
impl Timestamp {
/// Creates a new timestamp, ensuring `timestamp - start` doesn't underflow.
pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Result<Self, Error> {
match timestamp.checked_sub(i64::from(start)) {
None => bail!("timestamp - start must not underflow"),
Some(_) => Ok(Timestamp {
timestamp,
clock_rate,
start,
}),
}
/// Creates a new timestamp unless `timestamp - start` underflows.
#[inline]
pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Option<Self> {
timestamp.checked_sub(i64::from(start)).map(|_| Timestamp {
timestamp,
clock_rate,
start,
})
}
/// Returns time since some arbitrary point before the stream started.
@ -90,17 +108,17 @@ impl Timestamp {
(self.elapsed() as f64) / (self.clock_rate.get() as f64)
}
pub fn try_add(&self, delta: u32) -> Result<Self, Error> {
/// Returns `self + delta` unless it would overflow.
pub fn try_add(&self, delta: u32) -> Option<Self> {
// Check for `timestamp` overflow only. We don't need to check for
// `timestamp - start` underflow because delta is non-negative.
Ok(Timestamp {
timestamp: self
.timestamp
.checked_add(i64::from(delta))
.ok_or_else(|| format_err!("overflow on {:?} + {}", &self, delta))?,
clock_rate: self.clock_rate,
start: self.start,
})
self.timestamp
.checked_add(i64::from(delta))
.map(|timestamp| Timestamp {
timestamp,
clock_rate: self.clock_rate,
start: self.start,
})
}
}
@ -159,78 +177,106 @@ impl std::fmt::Debug for NtpTimestamp {
}
}
/// Context of a received message within an RTSP stream.
/// This is meant to help find the correct TCP stream and packet in a matching
/// packet capture.
#[derive(Copy, Clone)]
pub struct Context {
conn_local_addr: std::net::SocketAddr,
conn_peer_addr: std::net::SocketAddr,
conn_established_wall: time::Timespec,
conn_established: std::time::Instant,
/// A wall time taken from the local machine's realtime clock, used in error reporting.
///
/// Currently this just allows formatting via `Debug` and `Display`.
#[derive(Copy, Clone, Debug)]
pub struct WallTime(time::Timespec);
/// The byte position within the input stream. The bottom 32 bits can be
/// compared to the TCP sequence number.
msg_pos: u64,
/// Time when the application parsed the message. Caveat: this may not
/// closely match the time on a packet capture if the application is
/// overloaded (or `CLOCK_REALTIME` jumps).
msg_received_wall: time::Timespec,
msg_received: std::time::Instant,
impl WallTime {
fn now() -> Self {
Self(time::get_time())
}
}
impl Context {
impl Display for WallTime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(
&time::at(self.0)
.strftime("%FT%T")
.map_err(|_| std::fmt::Error)?,
f,
)
}
}
/// RTSP connection context.
///
/// This gives enough information to pick out the flow in a packet capture.
#[derive(Copy, Clone, Debug)]
pub struct ConnectionContext {
local_addr: std::net::SocketAddr,
peer_addr: std::net::SocketAddr,
established_wall: WallTime,
established: std::time::Instant,
}
impl ConnectionContext {
#[doc(hidden)]
pub fn dummy() -> Self {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
Self {
conn_local_addr: addr,
conn_peer_addr: addr,
conn_established_wall: time::get_time(),
conn_established: std::time::Instant::now(),
msg_pos: 0,
msg_received_wall: time::get_time(),
msg_received: std::time::Instant::now(),
local_addr: addr,
peer_addr: addr,
established_wall: WallTime::now(),
established: std::time::Instant::now(),
}
}
pub fn conn_established(&self) -> std::time::Instant {
self.conn_established
}
pub fn msg_received(&self) -> std::time::Instant {
self.msg_received
}
pub fn msg_pos(&self) -> u64 {
self.msg_pos
}
}
impl Debug for Context {
impl Display for ConnectionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: this current hardcodes the assumption we are the client.
// Change if/when adding server code.
write!(
f,
"[{}(me)->{}@{} pos={}@{}]",
&self.conn_local_addr,
&self.conn_peer_addr,
time::at(self.conn_established_wall)
.strftime("%FT%T")
.map_err(|_| std::fmt::Error)?,
self.msg_pos,
time::at(self.msg_received_wall)
.strftime("%FT%T")
.map_err(|_| std::fmt::Error)?
"{}(me)->{}@{}",
&self.local_addr, &self.peer_addr, &self.established_wall,
)
}
}
struct Codec {
ctx: Context,
/// Context of a received message (or read error) within an RTSP connection.
///
/// When paired with a [`ConnectionContext`], this should allow picking the
/// message out of a packet capture.
#[derive(Copy, Clone, Debug)]
pub struct RtspMessageContext {
/// The starting byte position within the input stream. The bottom 32 bits
/// can be compared to the relative TCP sequence number.
pos: u64,
/// Time when the application parsed the message. Caveat: this may not
/// closely match the time on a packet capture if the application is
/// overloaded (or if `CLOCK_REALTIME` jumps).
received_wall: WallTime,
received: std::time::Instant,
}
impl RtspMessageContext {
#[doc(hidden)]
pub fn dummy() -> Self {
Self {
pos: 0,
received_wall: WallTime::now(),
received: std::time::Instant::now(),
}
}
pub fn received(&self) -> std::time::Instant {
self.received
}
pub fn pos(&self) -> u64 {
self.pos
}
}
impl Display for RtspMessageContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.pos, &self.received_wall)
}
}
/// Returns the range within `buf` that represents `subset`.
@ -253,113 +299,3 @@ pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option<std::ops::Range<usiz
assert!(end <= buf.len());
Some(off..end)
}
impl Codec {
fn parse_msg(&self, src: &mut BytesMut) -> Result<Option<(usize, Message<Bytes>)>, Error> {
if !src.is_empty() && src[0] == b'$' {
// Fast path for interleaved data, avoiding MessageRef -> Message<&[u8]> ->
// Message<Bytes> conversion. This speeds things up quite a bit in practice,
// avoiding a bunch of memmove calls.
if src.len() < 4 {
return Ok(None);
}
let channel_id = src[1];
let len = 4 + usize::from(u16::from_be_bytes([src[2], src[3]]));
if src.len() < len {
src.reserve(len - src.len());
return Ok(None);
}
let mut msg = src.split_to(len);
msg.advance(4);
return Ok(Some((
len,
Message::Data(Data::new(channel_id, msg.freeze())),
)));
}
let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) {
Ok((m, l)) => (m, l),
Err(rtsp_types::ParseError::Error) => {
let snippet = &src[0..std::cmp::min(128, src.len())];
bail!(
"RTSP parse error at {:#?}: next bytes are {:#?}",
&self.ctx,
snippet.hex_dump()
)
}
Err(rtsp_types::ParseError::Incomplete) => return Ok(None),
};
// Map msg's body to a Bytes representation and advance `src`. Awkward:
// 1. lifetime concerns require mapping twice: first so the message
// doesn't depend on the BytesMut, which needs to be split/advanced;
// then to get the proper Bytes body in place post-split.
// 2. rtsp_types messages must be AsRef<[u8]>, so we can't use the
// range as an intermediate body.
// 3. within a match because the rtsp_types::Message enum itself
// doesn't have body/replace_body/map_body methods.
let msg = match msg {
Message::Request(msg) => {
let body_range = as_range(src, msg.body());
let msg = msg.replace_body(rtsp_types::Empty);
if let Some(r) = body_range {
let mut raw_msg = src.split_to(len);
raw_msg.advance(r.start);
raw_msg.truncate(r.len());
Message::Request(msg.replace_body(raw_msg.freeze()))
} else {
src.advance(len);
Message::Request(msg.replace_body(Bytes::new()))
}
}
Message::Response(msg) => {
let body_range = as_range(src, msg.body());
let msg = msg.replace_body(rtsp_types::Empty);
if let Some(r) = body_range {
let mut raw_msg = src.split_to(len);
raw_msg.advance(r.start);
raw_msg.truncate(r.len());
Message::Response(msg.replace_body(raw_msg.freeze()))
} else {
src.advance(len);
Message::Response(msg.replace_body(Bytes::new()))
}
}
Message::Data(_) => unreachable!(),
};
Ok(Some((len, msg)))
}
}
impl tokio_util::codec::Decoder for Codec {
type Item = ReceivedMessage;
type Error = failure::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let (len, msg) = match self.parse_msg(src) {
Err(e) => return Err(e),
Ok(None) => return Ok(None),
Ok(Some((len, msg))) => (len, msg),
};
self.ctx.msg_received_wall = time::get_time();
self.ctx.msg_received = std::time::Instant::now();
let msg = ReceivedMessage { ctx: self.ctx, msg };
self.ctx.msg_pos += u64::try_from(len).expect("usize fits in u64");
Ok(Some(msg))
}
}
impl tokio_util::codec::Encoder<rtsp_types::Message<bytes::Bytes>> for Codec {
type Error = failure::Error;
fn encode(
&mut self,
item: rtsp_types::Message<bytes::Bytes>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let mut w = std::mem::replace(dst, BytesMut::new()).writer();
item.write(&mut w).expect("bytes Writer is infallible");
*dst = w.into_inner();
Ok(())
}
}