experimental UDP support (fixes #30)

* support setting transport to UDP
* breaking change: use new PacketContext rather than RtspMessageContext
  in places where an RTP/RTCP packet is expected, as it can now be over
  UDP instead of via RTSP interleaved data
* send teardown, which is important with UDP sessions. (It also will
  help somewhat with Reolink TCP.) Along the way, make Session Unpin
  so that teardown can consume it without tripping over tokio::pin!().
* refactor to shrink the amount of data used by Session and how much
  gets memmoved around on the stack, instead of further growing it.

Because of missing RTCP RR and reorder buffer support, this is only
appropriate for using on a LAN. That's enough for me right now.
This commit is contained in:
Scott Lamb 2021-08-27 16:30:55 -07:00
parent 785b63ff4a
commit 9e9366f115
19 changed files with 933 additions and 367 deletions

View File

@ -1,5 +1,7 @@
## unreleased
## `v0.3.0` (unreleased)
* [#30](https://github.com/scottlamb/retina/issues/30): experimental UDP
support.
* [#22](https://github.com/scottlamb/retina/issues/22): fix handling of
44.1 kHz AAC audio.

3
Cargo.lock generated
View File

@ -989,7 +989,7 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "retina"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"anyhow",
"base64",
@ -1005,6 +1005,7 @@ dependencies = [
"once_cell",
"pin-project",
"pretty-hex",
"rand",
"rtp-rs",
"rtsp-types",
"sdp",

View File

@ -1,6 +1,6 @@
[package]
name = "retina"
version = "0.2.0"
version = "0.3.0"
authors = ["Scott Lamb <slamb@slamb.org>"]
license = "MIT/Apache-2.0"
edition = "2018"
@ -22,6 +22,7 @@ log = "0.4.8"
once_cell = "1.7.2"
pin-project = "1.0.7"
pretty-hex = "0.2.1"
rand = "0.8.3"
rtp-rs = "0.6.0"
rtsp-types = "0.0.2"
sdp = "0.1.4"

View File

@ -14,7 +14,8 @@ Progress:
* [x] client support
* [x] digest authentication.
* [x] RTP over TCP via RTSP interleaved channels.
* [ ] RTP over UDP.
* [x] RTP over UDP (experimental).
* * [ ] re-order buffer. (Out-of-order packets are dropped now.)
* [x] RTSP/1.0.
* [ ] RTSP/2.0.
* [ ] SRTP.

View File

@ -4,7 +4,7 @@
use std::num::NonZeroU16;
use criterion::{criterion_group, criterion_main, Criterion};
use retina::client::{rtp::StrictSequenceChecker, Timeline};
use retina::client::{rtp::InorderParser, Timeline};
use retina::codec::{CodecItem, Depacketizer};
use std::convert::TryFrom;
use std::io::Write;
@ -24,15 +24,15 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
Timeline::new(Some(0), 90_000, None).unwrap(),
];
let mut rtps = [
StrictSequenceChecker::new(None, Some(1)),
StrictSequenceChecker::new(None, Some(1)),
InorderParser::new(None, Some(1)),
InorderParser::new(None, Some(1)),
];
let mut depacketizers = [
Depacketizer::new("audio", "mpeg4-generic", 12_000, NonZeroU16::new(2), Some("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490")).unwrap(),
Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(),
];
let conn_ctx = retina::ConnectionContext::dummy();
let msg_ctx = retina::RtspMessageContext::dummy();
let pkt_ctx = retina::PacketContext::dummy();
while !remaining.is_empty() {
assert!(remaining.len() > 4);
assert_eq!(remaining[0], b'$');
@ -50,9 +50,8 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
let pkt = match rtps[stream_id].rtp(
&retina::client::SessionOptions::default(),
&conn_ctx,
&msg_ctx,
&pkt_ctx,
&mut timelines[stream_id],
channel_id,
stream_id,
data,
) {

View File

@ -29,12 +29,11 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
.position(|s| matches!(s.parameters(), Some(retina::codec::Parameters::Message(..))))
.ok_or_else(|| anyhow!("couldn't find onvif stream"))?;
session.setup(onvif_stream_i).await?;
let session = session
let mut session = session
.play(retina::client::PlayOptions::default().ignore_zero_seq(true))
.await?
.demuxed()?;
tokio::pin!(session);
tokio::pin!(stop);
loop {
tokio::select! {
@ -51,5 +50,6 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
},
}
}
session.teardown().await?;
Ok(())
}

View File

@ -20,8 +20,11 @@
use anyhow::{anyhow, bail, Error};
use bytes::{Buf, BufMut, BytesMut};
use futures::StreamExt;
use log::info;
use retina::codec::{AudioParameters, CodecItem, VideoParameters};
use log::{info, warn};
use retina::{
client::Transport,
codec::{AudioParameters, CodecItem, VideoParameters},
};
use std::convert::TryFrom;
use std::io::SeekFrom;
@ -60,6 +63,12 @@ pub struct Opts {
#[structopt(long, name = "secs")]
duration: Option<u64>,
/// The transport to use: `tcp` or `udp` (experimental).
///
/// Note: `--allow-loss` is strongly recommended with `udp`.
#[structopt(default_value, long)]
transport: retina::client::Transport,
/// Path to `.mp4` file to write.
#[structopt(parse(try_from_str))]
out: PathBuf,
@ -564,6 +573,10 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
}
pub async fn run(opts: Opts) -> Result<(), Error> {
if matches!(opts.transport, Transport::Udp) && !opts.allow_loss {
warn!("Using --transport=udp without strongly recommended --allow-loss!");
}
let creds = super::creds(opts.src.username, opts.src.password);
let stop_signal = tokio::signal::ctrl_c();
let mut session = retina::client::Session::describe(
@ -571,6 +584,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
retina::client::SessionOptions::default()
.creds(creds)
.user_agent("Retina mp4 example".to_owned())
.transport(opts.transport)
.ignore_spurious_data(opts.ignore_spurious_data),
)
.await?;
@ -604,7 +618,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
if let Some((i, _)) = audio_stream {
session.setup(i).await?;
}
let session = session
let mut session = session
.play(
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
@ -629,7 +643,6 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
}
None => futures::future::Either::Right(futures::future::pending()),
};
tokio::pin!(session);
tokio::pin!(stop_signal);
tokio::pin!(sleep);
loop {
@ -654,6 +667,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
},
}
}
session.teardown().await?;
mp4.finish().await?;
Ok(())
}

3
fuzz/Cargo.lock generated
View File

@ -544,7 +544,7 @@ dependencies = [
[[package]]
name = "retina"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"base64",
"bitreader",
@ -557,6 +557,7 @@ dependencies = [
"once_cell",
"pin-project",
"pretty-hex",
"rand",
"rtp-rs",
"rtsp-types",
"sdp",

View File

@ -13,7 +13,7 @@ fuzz_target!(|data: &[u8]| {
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
let mut sequence_number: u16 = 0;
let conn_ctx = retina::ConnectionContext::dummy();
let msg_ctx = retina::RtspMessageContext::dummy();
let pkt_ctx = retina::PacketContext::dummy();
while data.has_remaining() {
let hdr = data.get_u8();
let ts_change = (hdr & 0b001) != 0;
@ -30,8 +30,7 @@ fuzz_target!(|data: &[u8]| {
timestamp = timestamp.try_add(1).unwrap();
}
let pkt = retina::client::rtp::Packet {
ctx: msg_ctx,
channel_id: 0,
ctx: pkt_ctx,
stream_id: 0,
timestamp,
ssrc: 0,

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@ use bytes::{Buf, Bytes};
use log::debug;
use pretty_hex::PrettyHex;
use sdp::media_description::MediaDescription;
use std::{convert::TryFrom, num::NonZeroU16};
use std::{convert::TryFrom, net::IpAddr, num::NonZeroU16};
use url::Url;
use super::{Presentation, Stream};
@ -365,6 +365,7 @@ fn parse_media(base_url: &Url, media_description: &MediaDescription) -> Result<S
rtp_payload_type,
depacketizer,
control,
sockets: None,
channels,
state: super::StreamState::Uninit,
})
@ -446,14 +447,15 @@ pub(crate) fn parse_describe(
base_url,
control,
accept_dynamic_rate,
sdp,
})
}
pub(crate) struct SetupResponse<'a> {
pub(crate) session_id: &'a str,
pub(crate) ssrc: Option<u32>,
pub(crate) channel_id: u8,
pub(crate) channel_id: Option<u8>,
pub(crate) source: Option<IpAddr>,
pub(crate) server_port: Option<(u16, u16)>,
}
/// Parses a `SETUP` response.
@ -473,6 +475,8 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<Setu
.ok_or_else(|| "Missing Transport header".to_string())?;
let mut channel_id = None;
let mut ssrc = None;
let mut source = None;
let mut server_port = None;
for part in transport.as_str().split(';') {
if let Some(v) = part.strip_prefix("ssrc=") {
let v = u32::from_str_radix(v, 16).map_err(|_| format!("Unparseable ssrc {}", v))?;
@ -490,14 +494,33 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<Setu
}
}
channel_id = Some(n);
} else if let Some(s) = part.strip_prefix("source=") {
source = Some(
s.parse()
.map_err(|_| format!("Transport header has unparseable source {:?}", s))?,
);
} else if let Some(s) = part.strip_prefix("server_port=") {
let mut ports = s.splitn(2, '-');
let n = ports.next().expect("splitn returns at least one part");
let n = u16::from_str_radix(n, 10)
.map_err(|_| format!("bad port in Transport: {}", transport.as_str()))?;
if let Some(m) = ports.next() {
let m = u16::from_str_radix(m, 10).map_err(|_| format!("bad second port {}", m))?;
server_port = Some((n, m))
} else {
// TODO: this is allowed by RFC 2326's grammar, but I'm not sure
// what it means. Does it use the same port for both RTP and
// RTCP, or is it implied the second is one more than the first?
return Err("Transport header specifies a single server_port".to_owned());
}
}
}
let channel_id =
channel_id.ok_or_else(|| "Transport header has no interleaved parameter".to_string())?;
Ok(SetupResponse {
session_id,
ssrc,
channel_id,
source,
server_port,
})
}
@ -670,7 +693,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/dahua_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "634214675641");
assert_eq!(setup_response.channel_id, 0);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, Some(0x30a98ee7));
p.streams[0].state = StreamState::Init(StreamStateInit {
ssrc: setup_response.ssrc,
@ -767,7 +790,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/hikvision_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "708345999");
assert_eq!(setup_response.channel_id, 0);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, Some(0x4cacc3d1));
p.streams[0].state = StreamState::Init(StreamStateInit {
ssrc: setup_response.ssrc,
@ -842,7 +865,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/reolink_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "F8F8E425");
assert_eq!(setup_response.channel_id, 0);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit::default());
p.streams[1].state = StreamState::Init(StreamStateInit::default());
@ -920,7 +943,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/bunny_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "1642021126");
assert_eq!(setup_response.channel_id, 0);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit::default());
p.streams[1].state = StreamState::Init(StreamStateInit::default());
@ -1081,7 +1104,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/gw_main_setup_video.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "9a90de54");
assert_eq!(setup_response.channel_id, 0);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit {
ssrc: None,
@ -1092,7 +1115,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/gw_main_setup_audio.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "9a90de54");
assert_eq!(setup_response.channel_id, 2);
assert_eq!(setup_response.channel_id, Some(2));
assert_eq!(setup_response.ssrc, None);
p.streams[1].state = StreamState::Init(StreamStateInit {
ssrc: None,
@ -1161,7 +1184,7 @@ mod tests {
let setup_response = response(include_bytes!("testdata/gw_sub_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "9b0d0e54");
assert_eq!(setup_response.channel_id, 0);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit {
ssrc: None,

View File

@ -12,8 +12,7 @@ use crate::{Error, ErrorInt};
/// A received RTP packet.
pub struct Packet {
pub ctx: crate::RtspMessageContext,
pub channel_id: u8,
pub ctx: crate::PacketContext,
pub stream_id: usize,
pub timestamp: crate::Timestamp,
pub ssrc: u32,
@ -37,7 +36,6 @@ impl std::fmt::Debug for Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Packet")
.field("ctx", &self.ctx)
.field("channel_id", &self.channel_id)
.field("stream_id", &self.stream_id)
.field("timestamp", &self.timestamp)
.field("ssrc", &self.ssrc)
@ -53,7 +51,7 @@ impl std::fmt::Debug for Packet {
#[derive(Debug)]
pub struct SenderReport {
pub stream_id: usize,
pub ctx: crate::RtspMessageContext,
pub ctx: crate::PacketContext,
pub timestamp: crate::Timestamp,
pub ntp_timestamp: crate::NtpTimestamp,
}
@ -61,13 +59,16 @@ pub struct SenderReport {
/// RTP/RTCP demarshaller which ensures packets have the correct SSRC and
/// monotonically increasing SEQ. Unstable; exposed for benchmark.
///
/// This reports packet loss (via [Packet::loss]) but doesn't prohibit it, except for losses
/// When using UDP, skips and logs out-of-order packets. When using TCP,
/// fails on them.
///
/// This reports packet loss (via [Packet::loss]) but doesn't prohibit it
/// of more than `i16::MAX` which would be indistinguishable from non-monotonic sequence numbers.
/// Servers sometimes drop packets internally even when sending data via TCP.
///
/// At least [one camera](https://github.com/scottlamb/moonfire-nvr/wiki/Cameras:-Reolink#reolink-rlc-410-hardware-version-ipc_3816m)
/// sometimes sends data from old RTSP sessions over new ones. This seems like a
/// serious bug, and currently `StrictSequenceChecker` will error in this case,
/// serious bug, and currently `InorderRtpParser` will error in this case,
/// although it'd be possible to discard the incorrect SSRC instead.
///
/// [RFC 3550 section 8.2](https://tools.ietf.org/html/rfc3550#section-8.2) says that SSRC
@ -75,12 +76,12 @@ pub struct SenderReport {
/// not sure it will ever come up with IP cameras.
#[doc(hidden)]
#[derive(Debug)]
pub struct StrictSequenceChecker {
pub struct InorderParser {
ssrc: Option<u32>,
next_seq: Option<u16>,
}
impl StrictSequenceChecker {
impl InorderParser {
pub fn new(ssrc: Option<u32>, next_seq: Option<u16>) -> Self {
Self { ssrc, next_seq }
}
@ -89,9 +90,8 @@ impl StrictSequenceChecker {
&mut self,
session_options: &super::SessionOptions,
conn_ctx: &crate::ConnectionContext,
msg_ctx: &crate::RtspMessageContext,
pkt_ctx: &crate::PacketContext,
timeline: &mut super::Timeline,
channel_id: u8,
stream_id: usize,
mut data: Bytes,
) -> Result<Option<PacketItem>, Error> {
@ -109,10 +109,9 @@ impl StrictSequenceChecker {
}
let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| {
wrap!(ErrorInt::RtspDataMessageError {
wrap!(ErrorInt::PacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
pkt_ctx: *pkt_ctx,
stream_id,
description: format!(
"corrupt RTP header while expecting seq={:04x?}: {:?}\n{:#?}",
@ -139,8 +138,7 @@ impl StrictSequenceChecker {
} else {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
pkt_ctx: *pkt_ctx,
stream_id,
ssrc,
sequence_number,
@ -152,25 +150,32 @@ impl StrictSequenceChecker {
}
}
if loss > 0x80_00 {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
stream_id,
ssrc,
sequence_number,
description: format!(
"Out-of-order packet or large loss; expecting ssrc={:08x?} seq={:04x?}",
self.ssrc, self.next_seq
),
});
if matches!(session_options.transport, super::Transport::Tcp) {
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
pkt_ctx: *pkt_ctx,
stream_id,
ssrc,
sequence_number,
description: format!(
"Out-of-order packet or large loss; expecting ssrc={:08x?} seq={:04x?}",
self.ssrc, self.next_seq
),
});
} else {
log::info!(
"Skipping out-of-order seq={:04x} when expecting ssrc={:08x?} seq={:04x?}",
sequence_number,
self.ssrc,
self.next_seq
);
}
}
let timestamp = match timeline.advance_to(reader.timestamp()) {
Ok(ts) => ts,
Err(description) => bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
pkt_ctx: *pkt_ctx,
stream_id,
ssrc,
sequence_number,
@ -182,8 +187,7 @@ impl StrictSequenceChecker {
let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
msg_ctx: *msg_ctx,
channel_id,
pkt_ctx: *pkt_ctx,
stream_id,
ssrc,
sequence_number,
@ -194,8 +198,7 @@ impl StrictSequenceChecker {
data.advance(payload_range.start);
self.next_seq = Some(sequence_number.wrapping_add(1));
Ok(Some(PacketItem::RtpPacket(Packet {
ctx: *msg_ctx,
channel_id,
ctx: *pkt_ctx,
stream_id,
timestamp,
ssrc,
@ -209,7 +212,7 @@ impl StrictSequenceChecker {
pub fn rtcp(
&mut self,
session_options: &super::SessionOptions,
msg_ctx: &crate::RtspMessageContext,
pkt_ctx: &crate::PacketContext,
timeline: &mut super::Timeline,
stream_id: usize,
data: Bytes,
@ -254,12 +257,12 @@ impl StrictSequenceChecker {
sr = Some(SenderReport {
stream_id,
ctx: *msg_ctx,
ctx: *pkt_ctx,
timestamp,
ntp_timestamp: pkt.ntp_timestamp(),
});
}
crate::rtcp::Packet::Unknown(pkt) => debug!("rtcp: {:?}", pkt.payload_type()),
crate::rtcp::Packet::Unknown(pkt) => debug!("rtcp: pt {:?}", pkt.payload_type()),
}
i += 1;
}

View File

@ -427,7 +427,7 @@ pub(crate) struct Depacketizer {
#[derive(Debug)]
struct Aggregate {
ctx: crate::RtspMessageContext,
ctx: crate::PacketContext,
/// RTP packets lost before the next frame in this aggregate. Includes old
/// loss that caused a previous fragment to be too short.
@ -440,7 +440,6 @@ struct Aggregate {
/// to be too short.
loss_since_mark: bool,
channel_id: u8,
stream_id: usize,
ssrc: u32,
sequence_number: u16,
@ -606,7 +605,6 @@ impl Depacketizer {
ctx: pkt.ctx,
loss: *prev_loss + pkt.loss,
loss_since_mark: pkt.loss > 0,
channel_id: pkt.channel_id,
stream_id: pkt.stream_id,
ssrc: pkt.ssrc,
sequence_number: pkt.sequence_number,
@ -726,8 +724,7 @@ impl Depacketizer {
fn error(conn_ctx: ConnectionContext, agg: Aggregate, description: String) -> Error {
Error(Box::new(ErrorInt::RtpPacketError {
conn_ctx,
msg_ctx: agg.ctx,
channel_id: agg.channel_id,
pkt_ctx: agg.ctx,
stream_id: agg.stream_id,
ssrc: agg.ssrc,
sequence_number: agg.sequence_number,

View File

@ -56,8 +56,8 @@ struct Nal {
/// An access unit that is currently being accumulated during `PreMark` state.
#[derive(Debug)]
struct AccessUnit {
start_ctx: crate::RtspMessageContext,
end_ctx: crate::RtspMessageContext,
start_ctx: crate::PacketContext,
end_ctx: crate::PacketContext,
timestamp: crate::Timestamp,
stream_id: usize,
@ -846,8 +846,7 @@ impl Packetizer {
};
// TODO: ctx, channel_id, and ssrc are placeholders.
return Ok(Some(Packet {
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: 0,
@ -870,8 +869,7 @@ impl Packetizer {
mark = false;
}
Ok(Some(Packet {
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: 0,
@ -923,8 +921,7 @@ impl Packetizer {
}
// TODO: placeholders.
Ok(Some(Packet {
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: 0,
@ -1030,8 +1027,7 @@ mod tests {
};
d.push(Packet {
// plain SEI packet.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1044,8 +1040,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// STAP-A packet.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1058,8 +1053,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, start.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1072,8 +1066,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, middle.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1086,8 +1079,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, end.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1128,8 +1120,7 @@ mod tests {
};
d.push(Packet {
// SPS with (incorrect) mark
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
@ -1142,8 +1133,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// PPS
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
@ -1160,8 +1150,7 @@ mod tests {
// RFC 6184 section 5.1 says that "the timestamp must match that of
// the primary coded picture of the access unit and that the marker
// bit can only be set on the final packet of the access unit.""
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
@ -1203,8 +1192,7 @@ mod tests {
d.push(Packet {
// Slice layer without partitioning non-IDR, representing the
// last frame of the previous GOP.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
@ -1222,8 +1210,7 @@ mod tests {
assert_eq!(frame.timestamp, ts1);
d.push(Packet {
// SPS with (incorrect) timestamp matching last frame.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
@ -1236,8 +1223,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// PPS, again with timestamp matching last frame.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
@ -1250,8 +1236,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// Slice layer without partitioning IDR. Now correct timestamp.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
@ -1289,8 +1274,7 @@ mod tests {
start: 0,
};
d.push(Packet { // new SPS.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1302,8 +1286,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// same PPS again.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
@ -1316,8 +1299,7 @@ mod tests {
assert!(d.pull().is_none());
d.push(Packet {
// dummy slice NAL to end the AU.
ctx: crate::RtspMessageContext::dummy(),
channel_id: 0,
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,

View File

@ -157,7 +157,7 @@ impl AudioParameters {
/// An audio frame, which consists of one or more samples.
pub struct AudioFrame {
pub ctx: crate::RtspMessageContext,
pub ctx: crate::PacketContext,
pub stream_id: usize,
pub timestamp: crate::Timestamp,
pub frame_length: NonZeroU32,
@ -202,7 +202,7 @@ impl Buf for AudioFrame {
pub struct MessageParameters(onvif::CompressionType);
pub struct MessageFrame {
pub ctx: crate::RtspMessageContext,
pub ctx: crate::PacketContext,
pub timestamp: crate::Timestamp,
pub stream_id: usize,
@ -242,8 +242,8 @@ pub struct VideoFrame {
// A pair of contexts: for the start and for the end.
// Having both can be useful to measure the total time elapsed while receiving the frame.
start_ctx: crate::RtspMessageContext,
end_ctx: crate::RtspMessageContext,
start_ctx: crate::PacketContext,
end_ctx: crate::PacketContext,
/// This picture's timestamp in the time base associated with the stream.
pub timestamp: crate::Timestamp,
@ -265,12 +265,12 @@ pub struct VideoFrame {
impl VideoFrame {
#[inline]
pub fn start_ctx(&self) -> crate::RtspMessageContext {
pub fn start_ctx(&self) -> crate::PacketContext {
self.start_ctx
}
#[inline]
pub fn end_ctx(&self) -> crate::RtspMessageContext {
pub fn end_ctx(&self) -> crate::PacketContext {
self.end_ctx
}

View File

@ -36,7 +36,7 @@ enum State {
#[derive(Debug)]
struct InProgress {
ctx: crate::RtspMessageContext,
ctx: crate::PacketContext,
timestamp: crate::Timestamp,
data: BytesMut,
loss: u16,

View File

@ -3,7 +3,7 @@
use std::fmt::Display;
use crate::{ConnectionContext, RtspMessageContext};
use crate::{ConnectionContext, PacketContext, RtspMessageContext};
use thiserror::Error;
/// An opaque `std::error::Error + Send + Sync + 'static` implementation.
@ -65,23 +65,21 @@ pub(crate) enum ErrorInt {
channel_id: u8,
},
#[error("[{conn_ctx}, {msg_ctx} channel {channel_id} stream {stream_id}] RTSP data message: {description}")]
RtspDataMessageError {
#[error("[{conn_ctx}, {pkt_ctx} stream {stream_id}]: {description}")]
PacketError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
channel_id: u8,
pkt_ctx: PacketContext,
stream_id: usize,
description: String,
},
#[error(
"[{conn_ctx}, {msg_ctx}, channel={channel_id}, stream={stream_id}, ssrc={ssrc:08x}, \
"[{conn_ctx}, {pkt_ctx}, stream={stream_id}, ssrc={ssrc:08x}, \
seq={sequence_number:08x}] {description}"
)]
RtpPacketError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
channel_id: u8,
pkt_ctx: crate::PacketContext,
stream_id: usize,
ssrc: u32,
sequence_number: u16,
@ -92,12 +90,19 @@ pub(crate) enum ErrorInt {
ConnectError(#[source] std::io::Error),
#[error("[{conn_ctx}, {msg_ctx}] Error reading from RTSP peer: {source}")]
ReadError {
RtspReadError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
source: std::io::Error,
},
#[error("[{conn_ctx}, {pkt_ctx}] Error receiving UDP packet: {source}")]
UdpRecvError {
conn_ctx: ConnectionContext,
pkt_ctx: PacketContext,
source: std::io::Error,
},
#[error("[{conn_ctx}] Error writing to RTSP peer: {source}")]
WriteError {
conn_ctx: ConnectionContext,

View File

@ -2,11 +2,15 @@
// SPDX-License-Identifier: MIT OR Apache-2.0
use bytes::Bytes;
use log::trace;
//use failure::{bail, format_err, Error};
use once_cell::sync::Lazy;
use rand::Rng;
use rtsp_types::Message;
use std::fmt::{Debug, Display};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::num::NonZeroU32;
use std::ops::Range;
mod error;
mod rtcp;
@ -218,8 +222,7 @@ pub struct ConnectionContext {
impl ConnectionContext {
#[doc(hidden)]
pub fn dummy() -> Self {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
let addr = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0);
Self {
local_addr: addr,
peer_addr: addr,
@ -283,6 +286,58 @@ impl Display for RtspMessageContext {
}
}
/// Context for an RTP or RTCP packet, received either via RTSP interleaved data or UDP.
///
/// Should be paired with an [`RtspConnectionContext`] of the RTSP connection that started
/// the session. In the interleaved data case, it's assumed the packet was received over
/// that same connection.
#[derive(Copy, Clone, Debug)]
pub struct PacketContext(PacketContextInner);
impl PacketContext {
#[doc(hidden)]
pub fn dummy() -> PacketContext {
Self(PacketContextInner::Dummy)
}
}
#[derive(Copy, Clone, Debug)]
enum PacketContextInner {
Tcp {
msg_ctx: RtspMessageContext,
channel_id: u8,
},
Udp {
local_addr: SocketAddr,
peer_addr: SocketAddr,
received_wall: WallTime,
received: std::time::Instant,
},
Dummy,
}
impl Display for PacketContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
PacketContextInner::Udp {
local_addr,
peer_addr,
received_wall,
..
} => {
write!(f, "{}->{}@{}", peer_addr, local_addr, received_wall)
}
PacketContextInner::Tcp {
msg_ctx,
channel_id,
} => {
write!(f, "{} ch={}", msg_ctx, channel_id)
}
PacketContextInner::Dummy => write!(f, "dummy"),
}
}
}
/// Returns the range within `buf` that represents `subset`.
/// If `subset` is empty, returns None; otherwise panics if `subset` is not within `buf`.
pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option<std::ops::Range<usize>> {
@ -303,3 +358,77 @@ pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option<std::ops::Range<usiz
assert!(end <= buf.len());
Some(off..end)
}
/// A pair of local UDP sockets used for RTP and RTCP transmission.
///
/// The RTP port is always even, and the RTCP port is always the following (odd) integer.
struct UdpPair {
rtp_port: u16,
rtp_socket: UdpSocket,
rtcp_socket: UdpSocket,
}
impl UdpPair {
fn for_ip(ip_addr: IpAddr) -> Result<Self, std::io::Error> {
const MAX_TRIES: usize = 10;
const ALLOWED_RTP_RANGE: Range<u16> = 5000..65000; // stolen from ffmpeg's defaults.
let mut rng = rand::thread_rng();
for i in 0..MAX_TRIES {
let rtp_port = rng.gen_range(ALLOWED_RTP_RANGE) & !0b1;
debug_assert!(ALLOWED_RTP_RANGE.contains(&rtp_port));
let rtp_addr = SocketAddr::new(ip_addr, rtp_port);
let rtp_socket = match UdpSocket::bind(rtp_addr) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
trace!(
"Try {}/{}: unable to bind RTP addr {:?}",
i,
MAX_TRIES,
rtp_addr
);
continue;
}
Err(e) => return Err(e),
};
let rtcp_addr = SocketAddr::new(ip_addr, rtp_port + 1);
let rtcp_socket = match UdpSocket::bind(rtcp_addr) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
trace!(
"Try {}/{}: unable to bind RTCP addr {:?}",
i,
MAX_TRIES,
rtcp_addr
);
continue;
}
Err(e) => return Err(e),
};
return Ok(Self {
rtp_port,
rtp_socket,
rtcp_socket,
});
}
Err(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!(
"Unable to find even/odd pair in {}:{}..{} after {} tries",
ip_addr, ALLOWED_RTP_RANGE.start, ALLOWED_RTP_RANGE.end, MAX_TRIES
),
))
}
}
#[cfg(test)]
mod test {
use std::net::Ipv4Addr;
use super::*;
#[test]
fn local_udp_pair() {
// Just test that it succeeds.
UdpPair::for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).unwrap();
}
}

View File

@ -11,7 +11,7 @@ use pretty_hex::PrettyHex;
use rtsp_types::{Data, Message};
use std::convert::TryFrom;
use std::time::Instant;
use tokio::net::TcpStream;
use tokio::net::{TcpStream, UdpSocket};
use tokio_util::codec::Framed;
use url::Host;
@ -84,7 +84,7 @@ impl Stream for Connection {
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx).map_err(|e| {
wrap!(match e {
CodecError::IoError(error) => ErrorInt::ReadError {
CodecError::IoError(error) => ErrorInt::RtspReadError {
conn_ctx: *self.ctx(),
msg_ctx: self.eof_ctx(),
source: error,
@ -296,3 +296,23 @@ impl tokio_util::codec::Encoder<rtsp_types::Message<Bytes>> for Codec {
Ok(())
}
}
/// tokio-specific version of [`crate::UdpPair`].
pub(crate) struct UdpPair {
pub(crate) rtp_port: u16,
pub(crate) rtp_socket: UdpSocket,
pub(crate) rtcp_socket: UdpSocket,
}
impl UdpPair {
pub(crate) fn for_ip(ip_addr: std::net::IpAddr) -> Result<Self, std::io::Error> {
let inner = crate::UdpPair::for_ip(ip_addr)?;
inner.rtp_socket.set_nonblocking(true)?;
inner.rtcp_socket.set_nonblocking(true)?;
Ok(Self {
rtp_port: inner.rtp_port,
rtp_socket: UdpSocket::from_std(inner.rtp_socket)?,
rtcp_socket: UdpSocket::from_std(inner.rtcp_socket)?,
})
}
}