From 27c5ed5f04843418dad6c109926bad00e49ea4a6 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Tue, 10 May 2022 15:52:01 -0700 Subject: [PATCH] expose RTCP compound packets ...and rename `PacketItem` variants to `Rtp` and `Rtcp` for brevity. Fixes #59 --- CHANGELOG.md | 5 ++ benches/depacketize.rs | 2 +- examples/client/mp4.rs | 6 +- src/client/mod.rs | 14 ++-- src/client/rtp.rs | 74 +++++++---------- src/codec/mod.rs | 2 +- src/lib.rs | 2 +- src/rtcp.rs | 178 +++++++++++++++++++++++++++++++++-------- 8 files changed, 193 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a30d62..f9c24a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ * BREAKING: `retina::client::rtp::Packet` is now `retina::rtp::ReceivedPacket`, and field access has been removed in favor of accessors. +* BREAKING: `retina::client::PacketItem::RtpPacket` has been renamed to + `retina::client::PacketItem::Rtp`. +* BREAKING: `retina::client::PacketItem::SenderReport` has been replaced with + `retina::client::PacketItem::Rtcp`, to expose full RTCP compound packets. + Likewise `retina::codec::CodecItem`. * minimum Rust version is now 1.56. ## `v0.3.9` (2022-04-12) diff --git a/benches/depacketize.rs b/benches/depacketize.rs index 9c9fabe..e4814bb 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -58,7 +58,7 @@ fn h264_aac ()>(mut f: F) { stream_id, data, ) { - Ok(Some(retina::client::PacketItem::RtpPacket(rtp))) => rtp, + Ok(Some(retina::client::PacketItem::Rtp(rtp))) => rtp, _ => unreachable!(), }; depacketizers[stream_id].push(pkt).unwrap(); diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 33c8e32..5506e19 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -654,8 +654,10 @@ async fn copy<'a>( mp4.audio(f).await.with_context( || format!("Error processing audio frame, {}", ctx))?; }, - CodecItem::SenderReport(sr) => { - println!("{}: SR ts={}", sr.timestamp, sr.ntp_timestamp); + CodecItem::Rtcp(rtcp) => { + if let (Some(t), Some(Ok(Some(sr)))) = (rtcp.rtp_timestamp(), rtcp.pkts().next().map(retina::rtcp::PacketRef::as_sender_report)) { + println!("{}: SR ts={}", t, sr.ntp_timestamp()); + } }, _ => continue, }; diff --git a/src/client/mod.rs b/src/client/mod.rs index 7353fc8..20498e0 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1828,8 +1828,8 @@ async fn punch_firewall_hole( #[derive(Debug)] #[non_exhaustive] pub enum PacketItem { - RtpPacket(crate::rtp::ReceivedPacket), - SenderReport(rtp::SenderReport), + Rtp(crate::rtp::ReceivedPacket), + Rtcp(crate::rtcp::ReceivedCompoundPacket), } impl Session { @@ -2345,9 +2345,9 @@ impl futures::Stream for Demuxed { loop { let (stream_id, pkt) = match self.state { DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) { - Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id(), Some(p)), - Some(Ok(PacketItem::SenderReport(p))) => { - return Poll::Ready(Some(Ok(CodecItem::SenderReport(p)))) + Some(Ok(PacketItem::Rtp(p))) => (p.stream_id(), Some(p)), + Some(Ok(PacketItem::Rtcp(p))) => { + return Poll::Ready(Some(Ok(CodecItem::Rtcp(p)))) } Some(Err(e)) => return Poll::Ready(Some(Err(e))), None => return Poll::Ready(None), @@ -2536,7 +2536,7 @@ mod tests { tokio::join!( async { match session.next().await { - Some(Ok(PacketItem::RtpPacket(p))) => { + Some(Ok(PacketItem::Rtp(p))) => { assert_eq!(p.ssrc(), 0xdcc4a0d8); assert_eq!(p.sequence_number(), 0x41d4); assert_eq!(&p.payload()[..], b"hello world"); @@ -2646,7 +2646,7 @@ mod tests { tokio::join!( async { match session.next().await { - Some(Ok(PacketItem::RtpPacket(p))) => { + Some(Ok(PacketItem::Rtp(p))) => { assert_eq!(p.ssrc(), 0xdcc4a0d8); assert_eq!(p.sequence_number(), 0x41d4); assert_eq!(&p.payload()[..], b"hello world"); diff --git a/src/client/rtp.rs b/src/client/rtp.rs index 4105c9d..a4167e6 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -4,9 +4,10 @@ //! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550). use bytes::Bytes; -use log::{debug, trace}; +use log::debug; use crate::client::PacketItem; +use crate::rtcp::ReceivedCompoundPacket; use crate::rtp::{RawPacket, ReceivedPacket}; use crate::{ ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner, @@ -162,7 +163,7 @@ impl InorderParser { self.ssrc = Some(ssrc); self.next_seq = Some(sequence_number.wrapping_add(1)); self.seen_packets += 1; - Ok(Some(PacketItem::RtpPacket(ReceivedPacket { + Ok(Some(PacketItem::Rtp(ReceivedPacket { ctx: *pkt_ctx, stream_id, timestamp, @@ -183,49 +184,34 @@ impl InorderParser { stream_id: usize, data: Bytes, ) -> Result, String> { - let mut sr = None; - let mut i = 0; - let mut data = &data[..]; - while !data.is_empty() { - let (pkt, rest) = crate::rtcp::Packet::parse(data)?; - data = rest; - match pkt { - crate::rtcp::Packet::SenderReport(pkt) => { - if i > 0 { - return Err("RTCP SR must be first in packet".into()); - } - let timestamp = - timeline - .place(pkt.rtp_timestamp()) - .map_err(|mut description| { - description.push_str(" in RTCP SR"); - description - })?; + let first_pkt = crate::rtcp::ReceivedCompoundPacket::validate(&data[..])?; + let mut rtp_timestamp = None; + if let Ok(Some(sr)) = first_pkt.as_sender_report() { + rtp_timestamp = Some(timeline.place(sr.rtp_timestamp()).map_err( + |mut description| { + description.push_str(" in RTCP SR"); + description + }, + )?); - let ssrc = pkt.ssrc(); - if matches!(self.ssrc, Some(s) if s != ssrc) { - if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) { - super::note_stale_live555_data(tool, session_options); - } - return Err(format!( - "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", - self.ssrc, ssrc - )); - } - self.ssrc = Some(ssrc); - - sr = Some(SenderReport { - stream_id, - ctx: *pkt_ctx, - timestamp, - ntp_timestamp: pkt.ntp_timestamp(), - }); + let ssrc = sr.ssrc(); + if matches!(self.ssrc, Some(s) if s != ssrc) { + if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) { + super::note_stale_live555_data(tool, session_options); } - crate::rtcp::Packet::Unknown(pkt) => trace!("rtcp: pt {:?}", pkt.payload_type()), + return Err(format!( + "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", + self.ssrc, ssrc + )); } - i += 1; + self.ssrc = Some(ssrc); } - Ok(sr.map(PacketItem::SenderReport)) + Ok(Some(PacketItem::Rtcp(ReceivedCompoundPacket { + ctx: *pkt_ctx, + stream_id, + rtp_timestamp, + raw: data, + }))) } } @@ -267,7 +253,7 @@ mod tests { 0, pkt.0, ) { - Ok(Some(PacketItem::RtpPacket(_))) => {} + Ok(Some(PacketItem::Rtp(_))) => {} o => panic!("unexpected packet 1 result: {:#?}", o), } @@ -327,7 +313,7 @@ mod tests { 0, pkt.0, ) { - Ok(Some(PacketItem::RtpPacket(p))) => { + Ok(Some(PacketItem::Rtp(p))) => { assert_eq!(p.timestamp().elapsed(), 0); } o => panic!("unexpected packet 2 result: {:#?}", o), @@ -375,7 +361,7 @@ mod tests { 0, pkt.0, ) { - Ok(Some(PacketItem::RtpPacket(p))) => { + Ok(Some(PacketItem::Rtp(p))) => { // The missing timestamp shouldn't have adjusted time. assert_eq!(p.timestamp().elapsed(), 1); } diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 0a5b4c3..ae8b996 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -31,7 +31,7 @@ pub enum CodecItem { VideoFrame(VideoFrame), AudioFrame(AudioFrame), MessageFrame(MessageFrame), - SenderReport(crate::client::rtp::SenderReport), + Rtcp(crate::rtcp::ReceivedCompoundPacket), } /// Parameters which describe a stream. diff --git a/src/lib.rs b/src/lib.rs index 226f3a8..44d78c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,9 +20,9 @@ use std::num::NonZeroU32; use std::ops::Range; mod error; -mod rtcp; mod hex; +pub mod rtcp; pub mod rtp; #[cfg(test)] diff --git a/src/rtcp.rs b/src/rtcp.rs index b22e7ec..8bde578 100644 --- a/src/rtcp.rs +++ b/src/rtcp.rs @@ -5,20 +5,110 @@ /// [RFC 3550 section 6](https://datatracker.ietf.org/doc/html/rfc3550#section-6). use std::convert::TryInto; -pub enum Packet<'a> { - SenderReport(SenderReport<'a>), - Unknown(GenericPacket<'a>), +use bytes::Bytes; + +use crate::PacketContext; + +/// A single received RTCP compound packet. +/// +/// The contents have been validated at least as specified in [RFC 3550 appendix +/// A.2](https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.2), updated +/// by [RFC 5506](https://datatracker.ietf.org/doc/html/rfc5506)): +/// +/// * There is at least one RTCP packet within the compound packet. +/// * All packets are RTCP version 2. +/// * Non-final packets have no padding. +/// * The packets' lengths add up to the compound packet's length. +/// +/// Contained packets may additionally have been validated via payload +/// type-specific rules. +pub struct ReceivedCompoundPacket { + pub(crate) ctx: PacketContext, + pub(crate) stream_id: usize, + pub(crate) rtp_timestamp: Option, + pub(crate) raw: Bytes, } -impl<'a> Packet<'a> { - pub fn parse(buf: &'a [u8]) -> Result<(Self, &'a [u8]), String> { - let (pkt, rest) = GenericPacket::parse(buf)?; - let pkt = match pkt.payload_type() { - 200 => Packet::SenderReport(SenderReport::validate(pkt)?), - _ => Packet::Unknown(pkt), - }; - Ok((pkt, rest)) +impl ReceivedCompoundPacket { + /// Validates the supplied compound packet. + /// + /// Returns the first packet on success so the caller doesn't need to + /// recaculate its lengths. + pub(crate) fn validate(raw: &[u8]) -> Result { + let (first_pkt, mut rest) = PacketRef::parse(raw)?; + let mut pkt = first_pkt; + loop { + if rest.is_empty() { + break; + } else if pkt.has_padding() { + return Err("padding on non-final packet within RTCP compound packet".to_owned()); + } + (pkt, rest) = PacketRef::parse(rest)?; + } + Ok(first_pkt) } + + #[inline] + pub fn context(&self) -> &PacketContext { + &self.ctx + } + + #[inline] + pub fn stream_id(&self) -> usize { + self.stream_id + } + + /// Returns an RTP timestamp iff this compound packet begins with a valid Sender Report. + #[inline] + pub fn rtp_timestamp(&self) -> Option { + self.rtp_timestamp + } + + /// Returns the full raw compound packet, including headers of all packets. + #[inline] + pub fn raw(&self) -> &[u8] { + &self.raw[..] + } + + /// Returns an iterator through all contained packets. + #[inline] + pub fn pkts(&self) -> impl Iterator { + CompoundPacketIterator(&self.raw[..]) + } +} + +impl std::fmt::Debug for ReceivedCompoundPacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReceivedCompoundPacket") + .field("ctx", &self.ctx) + .field("stream_id", &self.stream_id) + .field("rtp_timestamp", &self.rtp_timestamp) + .field("raw", &crate::hex::LimitedHex::new(&self.raw[..], 64)) + .finish() + } +} + +/// Internal type returned from [`ReceivedCompoundPacket::pkts`]. +struct CompoundPacketIterator<'a>(&'a [u8]); + +impl<'a> Iterator for CompoundPacketIterator<'a> { + type Item = PacketRef<'a>; + + fn next(&mut self) -> Option { + if self.0.is_empty() { + return None; + } + + let (pkt, rest) = + PacketRef::parse(self.0).expect("failed to parse previously validated packet"); + self.0 = rest; + Some(pkt) + } +} + +#[non_exhaustive] +pub enum TypedPacketRef<'a> { + SenderReport(SenderReportRef<'a>), } /// A RTCP sender report, as defined in @@ -61,10 +151,10 @@ impl<'a> Packet<'a> { /// | profile-specific extensions | /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// ``` -pub struct SenderReport<'a>(GenericPacket<'a>); +pub struct SenderReportRef<'a>(PacketRef<'a>); -impl<'a> SenderReport<'a> { - fn validate(pkt: GenericPacket<'a>) -> Result { +impl<'a> SenderReportRef<'a> { + fn validate(pkt: PacketRef<'a>) -> Result { let count = usize::from(pkt.count()); const HEADER_LEN: usize = 8; const SENDER_INFO_LEN: usize = 20; @@ -76,7 +166,7 @@ impl<'a> SenderReport<'a> { count, pkt.payload_end )); } - Ok(SenderReport(pkt)) + Ok(SenderReportRef(pkt)) } pub fn ssrc(&self) -> u32 { @@ -94,7 +184,7 @@ impl<'a> SenderReport<'a> { /// A generic packet, not parsed as any particular payload type. /// -/// This only inteprets the leading four bytes: +/// This only interprets the leading four bytes: /// /// ```text /// 0 1 2 3 @@ -103,14 +193,15 @@ impl<'a> SenderReport<'a> { /// |V=2|P| | PT | length | /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// ``` -pub struct GenericPacket<'a> { +#[derive(Copy, Clone)] +pub struct PacketRef<'a> { buf: &'a [u8], payload_end: usize, } const COMMON_HEADER_LEN: usize = 4; -impl<'a> GenericPacket<'a> { +impl<'a> PacketRef<'a> { /// Parses a buffer into this packet and rest, doing only basic validation /// of the version, padding, and length. pub fn parse(buf: &'a [u8]) -> Result<(Self, &'a [u8]), String> { @@ -151,7 +242,7 @@ impl<'a> GenericPacket<'a> { )); } Ok(( - GenericPacket { + PacketRef { buf: this, payload_end: len - padding_bytes, }, @@ -159,7 +250,7 @@ impl<'a> GenericPacket<'a> { )) } else { Ok(( - GenericPacket { + PacketRef { buf: this, payload_end: len, }, @@ -169,12 +260,38 @@ impl<'a> GenericPacket<'a> { } /// Returns the uninterpreted payload type of this RTCP packet. + #[inline] pub fn payload_type(&self) -> u8 { self.buf[1] } + /// Parses to a `TypedPacketRef` if the payload type is supported. + pub fn as_typed(self) -> Result>, String> { + match self.payload_type() { + 200 => Ok(Some(TypedPacketRef::SenderReport( + SenderReportRef::validate(self)?, + ))), + _ => Ok(None), + } + } + + /// Parses as a sender report, if the type matches. + pub fn as_sender_report(self) -> Result>, String> { + if self.payload_type() == 200 { + return Ok(Some(SenderReportRef::validate(self)?)); + } + Ok(None) + } + + /// Returns true iff this packet has padding. + #[inline] + pub fn has_padding(&self) -> bool { + (self.buf[0] & 0b0010_0000) != 0 + } + /// Returns the low 5 bits of the first octet, which is typically a count /// or subtype. + #[inline] pub fn count(&self) -> u8 { self.buf[0] & 0b0001_1111 } @@ -194,26 +311,19 @@ mod tests { \x81\xca\x00\x04\x66\x42\x6a\xe1\ \x01\x06\x28\x6e\x6f\x6e\x65\x29\ \x00\x00\x00\x00"; - let (sr, buf) = Packet::parse(buf).unwrap(); - match sr { - Packet::SenderReport(p) => { - assert_eq!(p.ntp_timestamp(), crate::NtpTimestamp(0xe4362f99cccccccc)); - assert_eq!(p.rtp_timestamp(), 0x852ef807); - } - _ => panic!(), - } - let (sdes, buf) = Packet::parse(buf).unwrap(); - match sdes { - Packet::Unknown(p) => assert_eq!(p.payload_type(), 202), - _ => panic!(), - } + let (pkt, buf) = PacketRef::parse(buf).unwrap(); + let sr = pkt.as_sender_report().unwrap().unwrap(); + assert_eq!(sr.ntp_timestamp(), crate::NtpTimestamp(0xe4362f99cccccccc)); + assert_eq!(sr.rtp_timestamp(), 0x852ef807); + let (pkt, buf) = PacketRef::parse(buf).unwrap(); + assert_eq!(pkt.payload_type(), 202); assert_eq!(buf.len(), 0); } #[test] fn padding() { let buf = b"\xa7\x00\x00\x02asdf\x00\x00\x00\x04rest"; - let (pkt, rest) = GenericPacket::parse(buf).unwrap(); + let (pkt, rest) = PacketRef::parse(buf).unwrap(); assert_eq!(pkt.count(), 7); assert_eq!(&pkt.buf[4..pkt.payload_end], b"asdf"); assert_eq!(b"rest", rest);