expose RTCP compound packets

...and rename `PacketItem` variants to `Rtp` and `Rtcp` for brevity.

Fixes #59
This commit is contained in:
Scott Lamb 2022-05-10 15:52:01 -07:00
parent 06d7150989
commit 27c5ed5f04
8 changed files with 193 additions and 90 deletions

View File

@ -17,6 +17,11 @@
* BREAKING: `retina::client::rtp::Packet` is now * BREAKING: `retina::client::rtp::Packet` is now
`retina::rtp::ReceivedPacket`, and field access has been removed in favor `retina::rtp::ReceivedPacket`, and field access has been removed in favor
of accessors. of accessors.
* BREAKING: `retina::client::PacketItem::RtpPacket` has been renamed to
`retina::client::PacketItem::Rtp`.
* BREAKING: `retina::client::PacketItem::SenderReport` has been replaced with
`retina::client::PacketItem::Rtcp`, to expose full RTCP compound packets.
Likewise `retina::codec::CodecItem`.
* minimum Rust version is now 1.56. * minimum Rust version is now 1.56.
## `v0.3.9` (2022-04-12) ## `v0.3.9` (2022-04-12)

View File

@ -58,7 +58,7 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
stream_id, stream_id,
data, data,
) { ) {
Ok(Some(retina::client::PacketItem::RtpPacket(rtp))) => rtp, Ok(Some(retina::client::PacketItem::Rtp(rtp))) => rtp,
_ => unreachable!(), _ => unreachable!(),
}; };
depacketizers[stream_id].push(pkt).unwrap(); depacketizers[stream_id].push(pkt).unwrap();

View File

@ -654,8 +654,10 @@ async fn copy<'a>(
mp4.audio(f).await.with_context( mp4.audio(f).await.with_context(
|| format!("Error processing audio frame, {}", ctx))?; || format!("Error processing audio frame, {}", ctx))?;
}, },
CodecItem::SenderReport(sr) => { CodecItem::Rtcp(rtcp) => {
println!("{}: SR ts={}", sr.timestamp, sr.ntp_timestamp); if let (Some(t), Some(Ok(Some(sr)))) = (rtcp.rtp_timestamp(), rtcp.pkts().next().map(retina::rtcp::PacketRef::as_sender_report)) {
println!("{}: SR ts={}", t, sr.ntp_timestamp());
}
}, },
_ => continue, _ => continue,
}; };

View File

@ -1828,8 +1828,8 @@ async fn punch_firewall_hole(
#[derive(Debug)] #[derive(Debug)]
#[non_exhaustive] #[non_exhaustive]
pub enum PacketItem { pub enum PacketItem {
RtpPacket(crate::rtp::ReceivedPacket), Rtp(crate::rtp::ReceivedPacket),
SenderReport(rtp::SenderReport), Rtcp(crate::rtcp::ReceivedCompoundPacket),
} }
impl Session<Playing> { impl Session<Playing> {
@ -2345,9 +2345,9 @@ impl futures::Stream for Demuxed {
loop { loop {
let (stream_id, pkt) = match self.state { let (stream_id, pkt) = match self.state {
DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) { DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) {
Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id(), Some(p)), Some(Ok(PacketItem::Rtp(p))) => (p.stream_id(), Some(p)),
Some(Ok(PacketItem::SenderReport(p))) => { Some(Ok(PacketItem::Rtcp(p))) => {
return Poll::Ready(Some(Ok(CodecItem::SenderReport(p)))) return Poll::Ready(Some(Ok(CodecItem::Rtcp(p))))
} }
Some(Err(e)) => return Poll::Ready(Some(Err(e))), Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None), None => return Poll::Ready(None),
@ -2536,7 +2536,7 @@ mod tests {
tokio::join!( tokio::join!(
async { async {
match session.next().await { match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => { Some(Ok(PacketItem::Rtp(p))) => {
assert_eq!(p.ssrc(), 0xdcc4a0d8); assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4); assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(&p.payload()[..], b"hello world"); assert_eq!(&p.payload()[..], b"hello world");
@ -2646,7 +2646,7 @@ mod tests {
tokio::join!( tokio::join!(
async { async {
match session.next().await { match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => { Some(Ok(PacketItem::Rtp(p))) => {
assert_eq!(p.ssrc(), 0xdcc4a0d8); assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4); assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(&p.payload()[..], b"hello world"); assert_eq!(&p.payload()[..], b"hello world");

View File

@ -4,9 +4,10 @@
//! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550). //! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550).
use bytes::Bytes; use bytes::Bytes;
use log::{debug, trace}; use log::debug;
use crate::client::PacketItem; use crate::client::PacketItem;
use crate::rtcp::ReceivedCompoundPacket;
use crate::rtp::{RawPacket, ReceivedPacket}; use crate::rtp::{RawPacket, ReceivedPacket};
use crate::{ use crate::{
ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner, ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner,
@ -162,7 +163,7 @@ impl InorderParser {
self.ssrc = Some(ssrc); self.ssrc = Some(ssrc);
self.next_seq = Some(sequence_number.wrapping_add(1)); self.next_seq = Some(sequence_number.wrapping_add(1));
self.seen_packets += 1; self.seen_packets += 1;
Ok(Some(PacketItem::RtpPacket(ReceivedPacket { Ok(Some(PacketItem::Rtp(ReceivedPacket {
ctx: *pkt_ctx, ctx: *pkt_ctx,
stream_id, stream_id,
timestamp, timestamp,
@ -183,26 +184,17 @@ impl InorderParser {
stream_id: usize, stream_id: usize,
data: Bytes, data: Bytes,
) -> Result<Option<PacketItem>, String> { ) -> Result<Option<PacketItem>, String> {
let mut sr = None; let first_pkt = crate::rtcp::ReceivedCompoundPacket::validate(&data[..])?;
let mut i = 0; let mut rtp_timestamp = None;
let mut data = &data[..]; if let Ok(Some(sr)) = first_pkt.as_sender_report() {
while !data.is_empty() { rtp_timestamp = Some(timeline.place(sr.rtp_timestamp()).map_err(
let (pkt, rest) = crate::rtcp::Packet::parse(data)?; |mut description| {
data = rest;
match pkt {
crate::rtcp::Packet::SenderReport(pkt) => {
if i > 0 {
return Err("RTCP SR must be first in packet".into());
}
let timestamp =
timeline
.place(pkt.rtp_timestamp())
.map_err(|mut description| {
description.push_str(" in RTCP SR"); description.push_str(" in RTCP SR");
description description
})?; },
)?);
let ssrc = pkt.ssrc(); let ssrc = sr.ssrc();
if matches!(self.ssrc, Some(s) if s != ssrc) { if matches!(self.ssrc, Some(s) if s != ssrc) {
if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) { if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) {
super::note_stale_live555_data(tool, session_options); super::note_stale_live555_data(tool, session_options);
@ -213,19 +205,13 @@ impl InorderParser {
)); ));
} }
self.ssrc = Some(ssrc); self.ssrc = Some(ssrc);
}
sr = Some(SenderReport { Ok(Some(PacketItem::Rtcp(ReceivedCompoundPacket {
stream_id,
ctx: *pkt_ctx, ctx: *pkt_ctx,
timestamp, stream_id,
ntp_timestamp: pkt.ntp_timestamp(), rtp_timestamp,
}); raw: data,
} })))
crate::rtcp::Packet::Unknown(pkt) => trace!("rtcp: pt {:?}", pkt.payload_type()),
}
i += 1;
}
Ok(sr.map(PacketItem::SenderReport))
} }
} }
@ -267,7 +253,7 @@ mod tests {
0, 0,
pkt.0, pkt.0,
) { ) {
Ok(Some(PacketItem::RtpPacket(_))) => {} Ok(Some(PacketItem::Rtp(_))) => {}
o => panic!("unexpected packet 1 result: {:#?}", o), o => panic!("unexpected packet 1 result: {:#?}", o),
} }
@ -327,7 +313,7 @@ mod tests {
0, 0,
pkt.0, pkt.0,
) { ) {
Ok(Some(PacketItem::RtpPacket(p))) => { Ok(Some(PacketItem::Rtp(p))) => {
assert_eq!(p.timestamp().elapsed(), 0); assert_eq!(p.timestamp().elapsed(), 0);
} }
o => panic!("unexpected packet 2 result: {:#?}", o), o => panic!("unexpected packet 2 result: {:#?}", o),
@ -375,7 +361,7 @@ mod tests {
0, 0,
pkt.0, pkt.0,
) { ) {
Ok(Some(PacketItem::RtpPacket(p))) => { Ok(Some(PacketItem::Rtp(p))) => {
// The missing timestamp shouldn't have adjusted time. // The missing timestamp shouldn't have adjusted time.
assert_eq!(p.timestamp().elapsed(), 1); assert_eq!(p.timestamp().elapsed(), 1);
} }

View File

@ -31,7 +31,7 @@ pub enum CodecItem {
VideoFrame(VideoFrame), VideoFrame(VideoFrame),
AudioFrame(AudioFrame), AudioFrame(AudioFrame),
MessageFrame(MessageFrame), MessageFrame(MessageFrame),
SenderReport(crate::client::rtp::SenderReport), Rtcp(crate::rtcp::ReceivedCompoundPacket),
} }
/// Parameters which describe a stream. /// Parameters which describe a stream.

View File

@ -20,9 +20,9 @@ use std::num::NonZeroU32;
use std::ops::Range; use std::ops::Range;
mod error; mod error;
mod rtcp;
mod hex; mod hex;
pub mod rtcp;
pub mod rtp; pub mod rtp;
#[cfg(test)] #[cfg(test)]

View File

@ -5,20 +5,110 @@
/// [RFC 3550 section 6](https://datatracker.ietf.org/doc/html/rfc3550#section-6). /// [RFC 3550 section 6](https://datatracker.ietf.org/doc/html/rfc3550#section-6).
use std::convert::TryInto; use std::convert::TryInto;
pub enum Packet<'a> { use bytes::Bytes;
SenderReport(SenderReport<'a>),
Unknown(GenericPacket<'a>), use crate::PacketContext;
/// A single received RTCP compound packet.
///
/// The contents have been validated at least as specified in [RFC 3550 appendix
/// A.2](https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.2), updated
/// by [RFC 5506](https://datatracker.ietf.org/doc/html/rfc5506)):
///
/// * There is at least one RTCP packet within the compound packet.
/// * All packets are RTCP version 2.
/// * Non-final packets have no padding.
/// * The packets' lengths add up to the compound packet's length.
///
/// Contained packets may additionally have been validated via payload
/// type-specific rules.
pub struct ReceivedCompoundPacket {
pub(crate) ctx: PacketContext,
pub(crate) stream_id: usize,
pub(crate) rtp_timestamp: Option<crate::Timestamp>,
pub(crate) raw: Bytes,
} }
impl<'a> Packet<'a> { impl ReceivedCompoundPacket {
pub fn parse(buf: &'a [u8]) -> Result<(Self, &'a [u8]), String> { /// Validates the supplied compound packet.
let (pkt, rest) = GenericPacket::parse(buf)?; ///
let pkt = match pkt.payload_type() { /// Returns the first packet on success so the caller doesn't need to
200 => Packet::SenderReport(SenderReport::validate(pkt)?), /// recaculate its lengths.
_ => Packet::Unknown(pkt), pub(crate) fn validate(raw: &[u8]) -> Result<PacketRef, String> {
}; let (first_pkt, mut rest) = PacketRef::parse(raw)?;
Ok((pkt, rest)) let mut pkt = first_pkt;
loop {
if rest.is_empty() {
break;
} else if pkt.has_padding() {
return Err("padding on non-final packet within RTCP compound packet".to_owned());
} }
(pkt, rest) = PacketRef::parse(rest)?;
}
Ok(first_pkt)
}
#[inline]
pub fn context(&self) -> &PacketContext {
&self.ctx
}
#[inline]
pub fn stream_id(&self) -> usize {
self.stream_id
}
/// Returns an RTP timestamp iff this compound packet begins with a valid Sender Report.
#[inline]
pub fn rtp_timestamp(&self) -> Option<crate::Timestamp> {
self.rtp_timestamp
}
/// Returns the full raw compound packet, including headers of all packets.
#[inline]
pub fn raw(&self) -> &[u8] {
&self.raw[..]
}
/// Returns an iterator through all contained packets.
#[inline]
pub fn pkts(&self) -> impl Iterator<Item = PacketRef> {
CompoundPacketIterator(&self.raw[..])
}
}
impl std::fmt::Debug for ReceivedCompoundPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReceivedCompoundPacket")
.field("ctx", &self.ctx)
.field("stream_id", &self.stream_id)
.field("rtp_timestamp", &self.rtp_timestamp)
.field("raw", &crate::hex::LimitedHex::new(&self.raw[..], 64))
.finish()
}
}
/// Internal type returned from [`ReceivedCompoundPacket::pkts`].
struct CompoundPacketIterator<'a>(&'a [u8]);
impl<'a> Iterator for CompoundPacketIterator<'a> {
type Item = PacketRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.0.is_empty() {
return None;
}
let (pkt, rest) =
PacketRef::parse(self.0).expect("failed to parse previously validated packet");
self.0 = rest;
Some(pkt)
}
}
#[non_exhaustive]
pub enum TypedPacketRef<'a> {
SenderReport(SenderReportRef<'a>),
} }
/// A RTCP sender report, as defined in /// A RTCP sender report, as defined in
@ -61,10 +151,10 @@ impl<'a> Packet<'a> {
/// | profile-specific extensions | /// | profile-specific extensions |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// ``` /// ```
pub struct SenderReport<'a>(GenericPacket<'a>); pub struct SenderReportRef<'a>(PacketRef<'a>);
impl<'a> SenderReport<'a> { impl<'a> SenderReportRef<'a> {
fn validate(pkt: GenericPacket<'a>) -> Result<Self, String> { fn validate(pkt: PacketRef<'a>) -> Result<Self, String> {
let count = usize::from(pkt.count()); let count = usize::from(pkt.count());
const HEADER_LEN: usize = 8; const HEADER_LEN: usize = 8;
const SENDER_INFO_LEN: usize = 20; const SENDER_INFO_LEN: usize = 20;
@ -76,7 +166,7 @@ impl<'a> SenderReport<'a> {
count, pkt.payload_end count, pkt.payload_end
)); ));
} }
Ok(SenderReport(pkt)) Ok(SenderReportRef(pkt))
} }
pub fn ssrc(&self) -> u32 { pub fn ssrc(&self) -> u32 {
@ -94,7 +184,7 @@ impl<'a> SenderReport<'a> {
/// A generic packet, not parsed as any particular payload type. /// A generic packet, not parsed as any particular payload type.
/// ///
/// This only inteprets the leading four bytes: /// This only interprets the leading four bytes:
/// ///
/// ```text /// ```text
/// 0 1 2 3 /// 0 1 2 3
@ -103,14 +193,15 @@ impl<'a> SenderReport<'a> {
/// |V=2|P| | PT | length | /// |V=2|P| | PT | length |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// ``` /// ```
pub struct GenericPacket<'a> { #[derive(Copy, Clone)]
pub struct PacketRef<'a> {
buf: &'a [u8], buf: &'a [u8],
payload_end: usize, payload_end: usize,
} }
const COMMON_HEADER_LEN: usize = 4; const COMMON_HEADER_LEN: usize = 4;
impl<'a> GenericPacket<'a> { impl<'a> PacketRef<'a> {
/// Parses a buffer into this packet and rest, doing only basic validation /// Parses a buffer into this packet and rest, doing only basic validation
/// of the version, padding, and length. /// of the version, padding, and length.
pub fn parse(buf: &'a [u8]) -> Result<(Self, &'a [u8]), String> { pub fn parse(buf: &'a [u8]) -> Result<(Self, &'a [u8]), String> {
@ -151,7 +242,7 @@ impl<'a> GenericPacket<'a> {
)); ));
} }
Ok(( Ok((
GenericPacket { PacketRef {
buf: this, buf: this,
payload_end: len - padding_bytes, payload_end: len - padding_bytes,
}, },
@ -159,7 +250,7 @@ impl<'a> GenericPacket<'a> {
)) ))
} else { } else {
Ok(( Ok((
GenericPacket { PacketRef {
buf: this, buf: this,
payload_end: len, payload_end: len,
}, },
@ -169,12 +260,38 @@ impl<'a> GenericPacket<'a> {
} }
/// Returns the uninterpreted payload type of this RTCP packet. /// Returns the uninterpreted payload type of this RTCP packet.
#[inline]
pub fn payload_type(&self) -> u8 { pub fn payload_type(&self) -> u8 {
self.buf[1] self.buf[1]
} }
/// Parses to a `TypedPacketRef` if the payload type is supported.
pub fn as_typed(self) -> Result<Option<TypedPacketRef<'a>>, String> {
match self.payload_type() {
200 => Ok(Some(TypedPacketRef::SenderReport(
SenderReportRef::validate(self)?,
))),
_ => Ok(None),
}
}
/// Parses as a sender report, if the type matches.
pub fn as_sender_report(self) -> Result<Option<SenderReportRef<'a>>, String> {
if self.payload_type() == 200 {
return Ok(Some(SenderReportRef::validate(self)?));
}
Ok(None)
}
/// Returns true iff this packet has padding.
#[inline]
pub fn has_padding(&self) -> bool {
(self.buf[0] & 0b0010_0000) != 0
}
/// Returns the low 5 bits of the first octet, which is typically a count /// Returns the low 5 bits of the first octet, which is typically a count
/// or subtype. /// or subtype.
#[inline]
pub fn count(&self) -> u8 { pub fn count(&self) -> u8 {
self.buf[0] & 0b0001_1111 self.buf[0] & 0b0001_1111
} }
@ -194,26 +311,19 @@ mod tests {
\x81\xca\x00\x04\x66\x42\x6a\xe1\ \x81\xca\x00\x04\x66\x42\x6a\xe1\
\x01\x06\x28\x6e\x6f\x6e\x65\x29\ \x01\x06\x28\x6e\x6f\x6e\x65\x29\
\x00\x00\x00\x00"; \x00\x00\x00\x00";
let (sr, buf) = Packet::parse(buf).unwrap(); let (pkt, buf) = PacketRef::parse(buf).unwrap();
match sr { let sr = pkt.as_sender_report().unwrap().unwrap();
Packet::SenderReport(p) => { assert_eq!(sr.ntp_timestamp(), crate::NtpTimestamp(0xe4362f99cccccccc));
assert_eq!(p.ntp_timestamp(), crate::NtpTimestamp(0xe4362f99cccccccc)); assert_eq!(sr.rtp_timestamp(), 0x852ef807);
assert_eq!(p.rtp_timestamp(), 0x852ef807); let (pkt, buf) = PacketRef::parse(buf).unwrap();
} assert_eq!(pkt.payload_type(), 202);
_ => panic!(),
}
let (sdes, buf) = Packet::parse(buf).unwrap();
match sdes {
Packet::Unknown(p) => assert_eq!(p.payload_type(), 202),
_ => panic!(),
}
assert_eq!(buf.len(), 0); assert_eq!(buf.len(), 0);
} }
#[test] #[test]
fn padding() { fn padding() {
let buf = b"\xa7\x00\x00\x02asdf\x00\x00\x00\x04rest"; let buf = b"\xa7\x00\x00\x02asdf\x00\x00\x00\x04rest";
let (pkt, rest) = GenericPacket::parse(buf).unwrap(); let (pkt, rest) = PacketRef::parse(buf).unwrap();
assert_eq!(pkt.count(), 7); assert_eq!(pkt.count(), 7);
assert_eq!(&pkt.buf[4..pkt.payload_end], b"asdf"); assert_eq!(&pkt.buf[4..pkt.payload_end], b"asdf");
assert_eq!(b"rest", rest); assert_eq!(b"rest", rest);