diff --git a/CHANGELOG.md b/CHANGELOG.md index 28c3762..f321fed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ with each packet. * BREAKING: `PacketItem` and `CodecItem` are now `#[non_exhaustive]` for future expansion. +* BREAKING: `retina::client::rtp::Packet` is now + `retina::rtp::ReceivedPacket`, and field access has been removed in favor + of accessors. ## `v0.3.9` (2022-04-12) diff --git a/Cargo.lock b/Cargo.lock index 95ee9ea..bf20ff4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -928,7 +928,6 @@ dependencies = [ "pin-project", "pretty-hex", "rand", - "rtp-rs", "rtsp-types", "sdp-types", "smallvec", @@ -951,12 +950,6 @@ dependencies = [ "mpeg4-audio-const", ] -[[package]] -name = "rtp-rs" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355" - [[package]] name = "rtsp-types" version = "0.0.3" diff --git a/Cargo.toml b/Cargo.toml index 5141338..ee533ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ once_cell = "1.7.2" pin-project = "1.0.7" pretty-hex = "0.2.1" rand = "0.8.3" -rtp-rs = "0.6.0" rtsp-types = "0.0.3" sdp-types = "0.1.4" smallvec = { version = "1.6.1", features = ["union"] } diff --git a/benches/client.rs b/benches/client.rs index eccd4e4..04c767e 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -79,28 +79,20 @@ fn make_test_data(max_payload_size: u16) -> Bytes { ]; let mut dummy_frame = vec![0; 1048576]; dummy_frame[4] = h264_reader::nal::UnitType::SliceLayerWithoutPartitioningIdr.id(); - let mut p = retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104).unwrap(); + let mut p = + retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104, 96, 0x4cacc3d1).unwrap(); let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap(); - let mut pkt_buf = vec![0; 65536]; for _ in 0..30 { for &f in &frame_sizes { dummy_frame[0..4].copy_from_slice(&f.to_be_bytes()[..]); let frame = Bytes::copy_from_slice(&dummy_frame[..(usize::try_from(f).unwrap() + 4)]); p.push(timestamp, frame).unwrap(); while let Some(pkt) = p.pull().unwrap() { - let pkt_len = rtp_rs::RtpPacketBuilder::new() - .payload_type(96) - .marked(pkt.mark) - .sequence(rtp_rs::Seq::from(pkt.sequence_number)) - .ssrc(0x4cacc3d1) - .timestamp(pkt.timestamp.timestamp() as u32) - .payload(&pkt.payload) - .build_into(&mut pkt_buf) - .unwrap(); + let pkt = pkt.raw(); data.push(b'$'); // interleaved data data.push(0); // channel 0 - data.extend_from_slice(&u16::try_from(pkt_len).unwrap().to_be_bytes()[..]); - data.extend_from_slice(&pkt_buf[..pkt_len]); + data.extend_from_slice(&u16::try_from(pkt.len()).unwrap().to_be_bytes()[..]); + data.extend_from_slice(&pkt); } timestamp = timestamp.try_add(3000).unwrap(); } diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index d448292..1b94b8c 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -544,7 +544,6 @@ dependencies = [ "pin-project", "pretty-hex", "rand", - "rtp-rs", "rtsp-types", "sdp-types", "smallvec", @@ -575,12 +574,6 @@ dependencies = [ "mpeg4-audio-const", ] -[[package]] -name = "rtp-rs" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355" - [[package]] name = "rtsp-types" version = "0.0.3" diff --git a/fuzz/fuzz_targets/depacketize_h264.rs b/fuzz/fuzz_targets/depacketize_h264.rs index 52f7793..908e8a0 100644 --- a/fuzz/fuzz_targets/depacketize_h264.rs +++ b/fuzz/fuzz_targets/depacketize_h264.rs @@ -2,12 +2,11 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 #![no_main] -use bytes::{Buf, Bytes}; use libfuzzer_sys::fuzz_target; use std::num::NonZeroU32; fuzz_target!(|data: &[u8]| { - let mut data = Bytes::copy_from_slice(data); + let mut data = data; let mut depacketizer = retina::codec::Depacketizer::new( "video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=64001E;sprop-parameter-sets=Z2QAHqwsaoLA9puCgIKgAAADACAAAAMD0IAA,aO4xshsA")).unwrap(); let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap(); @@ -15,31 +14,38 @@ fuzz_target!(|data: &[u8]| { let conn_ctx = retina::ConnectionContext::dummy(); let stream_ctx = retina::StreamContextRef::dummy(); let pkt_ctx = retina::PacketContext::dummy(); - while data.has_remaining() { - let hdr = data.get_u8(); + loop { + let (hdr, rest) = match data.split_first() { + Some(r) => r, + None => return, + }; let ts_change = (hdr & 0b001) != 0; let mark = (hdr & 0b010) != 0; let loss = (hdr & 0b100) != 0; let len = usize::from(hdr >> 3); - if len > data.remaining() { + if rest.len() < len { return; } + let (payload, rest) = rest.split_at(len); + data = rest; if loss { sequence_number = sequence_number.wrapping_add(1); } if ts_change { timestamp = timestamp.try_add(1).unwrap(); } - let pkt = retina::client::rtp::Packet { + let pkt = retina::rtp::ReceivedPacketBuilder { ctx: pkt_ctx, stream_id: 0, timestamp, ssrc: 0, sequence_number, loss: u16::from(loss), + payload_type: 96, mark, - payload: data.split_off(len), - }; + } + .build(payload.iter().copied()) + .unwrap(); //println!("pkt: {:#?}", pkt); if depacketizer.push(pkt).is_err() { return; diff --git a/fuzz/fuzz_targets/roundtrip_h264.rs b/fuzz/fuzz_targets/roundtrip_h264.rs index 04b984c..f3c649f 100644 --- a/fuzz/fuzz_targets/roundtrip_h264.rs +++ b/fuzz/fuzz_targets/roundtrip_h264.rs @@ -18,7 +18,7 @@ fuzz_target!(|data: &[u8]| { let conn_ctx = retina::ConnectionContext::dummy(); let stream_ctx = retina::StreamContextRef::dummy(); let max_payload_size = u16::from_be_bytes([data[0], data[1]]); - let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0) { + let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0, 0, 0) { Ok(p) => p, Err(_) => return, }; @@ -39,7 +39,7 @@ fuzz_target!(|data: &[u8]| { let frame = loop { match p.pull() { Ok(Some(pkt)) => { - let mark = pkt.mark; + let mark = pkt.mark(); if d.push(pkt).is_err() { return; } diff --git a/src/client/mod.rs b/src/client/mod.rs index f67e645..5705b22 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1798,7 +1798,7 @@ async fn punch_firewall_hole( #[derive(Debug)] #[non_exhaustive] pub enum PacketItem { - RtpPacket(rtp::Packet), + RtpPacket(crate::rtp::ReceivedPacket), SenderReport(rtp::SenderReport), } @@ -2315,7 +2315,7 @@ impl futures::Stream for Demuxed { loop { let (stream_id, pkt) = match self.state { DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) { - Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id, Some(p)), + Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id(), Some(p)), Some(Ok(PacketItem::SenderReport(p))) => { return Poll::Ready(Some(Ok(CodecItem::SenderReport(p)))) } @@ -2342,10 +2342,10 @@ impl futures::Stream for Demuxed { .inner .ctx(); if let Some(p) = pkt { - let pkt_ctx = p.ctx; - let stream_id = p.stream_id; - let ssrc = p.ssrc; - let sequence_number = p.sequence_number; + let pkt_ctx = *p.ctx(); + let stream_id = p.stream_id(); + let ssrc = p.ssrc(); + let sequence_number = p.sequence_number(); depacketizer.push(p).map_err(|description| { wrap!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, @@ -2498,9 +2498,9 @@ mod tests { async { match session.next().await { Some(Ok(PacketItem::RtpPacket(p))) => { - assert_eq!(p.ssrc, 0xdcc4a0d8); - assert_eq!(p.sequence_number, 0x41d4); - assert_eq!(&p.payload[..], b"hello world"); + assert_eq!(p.ssrc(), 0xdcc4a0d8); + assert_eq!(p.sequence_number(), 0x41d4); + assert_eq!(&p.payload()[..], b"hello world"); } o => panic!("unexpected item: {:#?}", o), } @@ -2608,9 +2608,9 @@ mod tests { async { match session.next().await { Some(Ok(PacketItem::RtpPacket(p))) => { - assert_eq!(p.ssrc, 0xdcc4a0d8); - assert_eq!(p.sequence_number, 0x41d4); - assert_eq!(&p.payload[..], b"hello world"); + assert_eq!(p.ssrc(), 0xdcc4a0d8); + assert_eq!(p.sequence_number(), 0x41d4); + assert_eq!(&p.payload()[..], b"hello world"); } o => panic!("unexpected item: {:#?}", o), } @@ -2771,7 +2771,6 @@ mod tests { ("SessionOptions", std::mem::size_of::()), ("Demuxed", std::mem::size_of::()), ("Stream", std::mem::size_of::()), - ("rtp::Packet", std::mem::size_of::()), ( "rtp::SenderReport", std::mem::size_of::(), diff --git a/src/client/rtp.rs b/src/client/rtp.rs index ffd7519..4105c9d 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -3,53 +3,17 @@ //! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550). -use bytes::{Buf, Bytes}; +use bytes::Bytes; use log::{debug, trace}; use crate::client::PacketItem; +use crate::rtp::{RawPacket, ReceivedPacket}; use crate::{ ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner, }; use super::{SessionOptions, Timeline}; -/// A received RTP packet. -pub struct Packet { - pub ctx: PacketContext, - pub stream_id: usize, - pub timestamp: crate::Timestamp, - pub ssrc: u32, - pub sequence_number: u16, - - /// Number of skipped sequence numbers since the last packet. - /// - /// In the case of the first packet on the stream, this may also report loss - /// packets since the `RTP-Info` header's `seq` value. However, currently - /// that header is not required to be present and may be ignored (see - /// [`super::PlayOptions::ignore_zero_seq()`].) - pub loss: u16, - - pub mark: bool, - - /// Guaranteed to be less than u16::MAX bytes. - pub payload: Bytes, -} - -impl std::fmt::Debug for Packet { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Packet") - .field("ctx", &self.ctx) - .field("stream_id", &self.stream_id) - .field("timestamp", &self.timestamp) - .field("ssrc", &self.ssrc) - .field("sequence_number", &self.sequence_number) - .field("loss", &self.loss) - .field("mark", &self.mark) - .field("payload", &crate::hex::LimitedHex::new(&self.payload, 64)) - .finish() - } -} - /// An RTCP sender report. #[derive(Debug)] pub struct SenderReport { @@ -65,7 +29,7 @@ pub struct SenderReport { /// When using UDP, skips and logs out-of-order packets. When using TCP, /// fails on them. /// -/// This reports packet loss (via [Packet::loss]) but doesn't prohibit it +/// This reports packet loss (via [ReceivedPacket::loss]) but doesn't prohibit it /// of more than `i16::MAX` which would be indistinguishable from non-monotonic sequence numbers. /// Servers sometimes drop packets internally even when sending data via TCP. /// @@ -110,9 +74,9 @@ impl InorderParser { pkt_ctx: &PacketContext, timeline: &mut Timeline, stream_id: usize, - mut data: Bytes, + data: Bytes, ) -> Result, Error> { - let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| { + let (raw, payload_range) = RawPacket::new(data).map_err(|e| { wrap!(ErrorInt::PacketError { conn_ctx: *conn_ctx, stream_ctx: stream_ctx.to_owned(), @@ -121,8 +85,8 @@ impl InorderParser { description: format!( "corrupt RTP header while expecting seq={:04x?}: {:?}\n{:#?}", &self.next_seq, - e, - crate::hex::LimitedHex::new(&data, 64), + e.reason, + crate::hex::LimitedHex::new(&e.data[..], 64), ), }) })?; @@ -133,13 +97,13 @@ impl InorderParser { // "Out-of-order packet or large loss" error. In UDP streams, if these // are delivered out of order, they will cause the more important other // packet with the same sequence number to be skipped. - if reader.payload_type() == 50 { + if raw.payload_type() == 50 { debug!("skipping pkt with invalid payload type 50"); return Ok(None); } - let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq. - let ssrc = reader.ssrc(); + let sequence_number = raw.sequence_number(); + let ssrc = raw.ssrc(); let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); if matches!(self.ssrc, Some(s) if s != ssrc) { if matches!(stream_ctx.0, StreamContextRefInner::Udp(_)) { @@ -178,12 +142,12 @@ impl InorderParser { "Skipping out-of-order seq={:04x} when expecting ssrc={:08x?} seq={:04x?}", sequence_number, self.ssrc, - self.next_seq + self.next_seq, ); return Ok(None); } } - let timestamp = match timeline.advance_to(reader.timestamp()) { + let timestamp = match timeline.advance_to(raw.timestamp()) { Ok(ts) => ts, Err(description) => bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, @@ -196,31 +160,15 @@ impl InorderParser { }), }; self.ssrc = Some(ssrc); - let mark = reader.mark(); - let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| { - wrap!(ErrorInt::RtpPacketError { - conn_ctx: *conn_ctx, - stream_ctx: stream_ctx.to_owned(), - pkt_ctx: *pkt_ctx, - stream_id, - ssrc, - sequence_number, - description: "empty payload".into(), - }) - })?; - data.truncate(payload_range.end); - data.advance(payload_range.start); self.next_seq = Some(sequence_number.wrapping_add(1)); self.seen_packets += 1; - Ok(Some(PacketItem::RtpPacket(Packet { + Ok(Some(PacketItem::RtpPacket(ReceivedPacket { ctx: *pkt_ctx, stream_id, timestamp, - ssrc, - sequence_number, + raw, + payload_range, loss, - mark, - payload: data, }))) } @@ -300,6 +248,15 @@ mod tests { let stream_ctx = StreamContextRef::dummy(); // Normal packet. + let (pkt, _payload_range) = crate::rtp::RawPacketBuilder { + sequence_number: 0x1234, + timestamp: 141000, + payload_type: 105, + ssrc: 0xd25614e, + mark: true, + } + .build(*b"foo") + .unwrap(); match parser.rtp( &SessionOptions::default(), stream_ctx, @@ -308,22 +265,22 @@ mod tests { &PacketContext::dummy(), &mut timeline, 0, - rtp_rs::RtpPacketBuilder::new() - .payload_type(105) - .ssrc(0xd25614e) - .sequence(0x1234.into()) - .timestamp(141000) - .marked(true) - .payload(b"foo") - .build() - .unwrap() - .into(), + pkt.0, ) { Ok(Some(PacketItem::RtpPacket(_))) => {} o => panic!("unexpected packet 1 result: {:#?}", o), } // Mystery pt=50 packet with same sequence number. + let (pkt, _payload_range) = crate::rtp::RawPacketBuilder { + sequence_number: 0x1234, + timestamp: 141000, + payload_type: 50, + ssrc: 0xd25614e, + mark: true, + } + .build(*b"bar") + .unwrap(); match parser.rtp( &SessionOptions::default(), stream_ctx, @@ -332,16 +289,7 @@ mod tests { &PacketContext::dummy(), &mut timeline, 0, - rtp_rs::RtpPacketBuilder::new() - .payload_type(50) - .ssrc(0xd25614e) - .sequence(0x1234.into()) - .timestamp(141000) - .marked(true) - .payload(b"bar") - .build() - .unwrap() - .into(), + pkt.0, ) { Ok(None) => {} o => panic!("unexpected packet 2 result: {:#?}", o), @@ -360,6 +308,15 @@ mod tests { }; let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp)); let session_options = SessionOptions::default(); + let (pkt, _payload_range) = crate::rtp::RawPacketBuilder { + sequence_number: 2, + timestamp: 2, + payload_type: 96, + ssrc: 0xd25614e, + mark: true, + } + .build(*b"pkt 2") + .unwrap(); match parser.rtp( &session_options, stream_ctx, @@ -368,23 +325,23 @@ mod tests { &PacketContext::dummy(), &mut timeline, 0, - rtp_rs::RtpPacketBuilder::new() - .payload_type(96) - .ssrc(0xd25614e) - .sequence(2.into()) - .timestamp(2) - .marked(true) - .payload(b"pkt 2") - .build() - .unwrap() - .into(), + pkt.0, ) { Ok(Some(PacketItem::RtpPacket(p))) => { - assert_eq!(p.timestamp.elapsed(), 0); + assert_eq!(p.timestamp().elapsed(), 0); } o => panic!("unexpected packet 2 result: {:#?}", o), } + let (pkt, _payload_range) = crate::rtp::RawPacketBuilder { + sequence_number: 1, + timestamp: 1, + payload_type: 96, + ssrc: 0xd25614e, + mark: true, + } + .build(*b"pkt 1") + .unwrap(); match parser.rtp( &session_options, stream_ctx, @@ -393,21 +350,21 @@ mod tests { &PacketContext::dummy(), &mut timeline, 0, - rtp_rs::RtpPacketBuilder::new() - .payload_type(96) - .ssrc(0xd25614e) - .sequence(1.into()) - .timestamp(1) - .marked(true) - .payload(b"pkt 1") - .build() - .unwrap() - .into(), + pkt.0, ) { Ok(None) => {} o => panic!("unexpected packet 1 result: {:#?}", o), } + let (pkt, _payload_range) = crate::rtp::RawPacketBuilder { + sequence_number: 3, + timestamp: 3, + payload_type: 96, + ssrc: 0xd25614e, + mark: true, + } + .build(*b"pkt 3") + .unwrap(); match parser.rtp( &session_options, stream_ctx, @@ -416,20 +373,11 @@ mod tests { &PacketContext::dummy(), &mut timeline, 0, - rtp_rs::RtpPacketBuilder::new() - .payload_type(96) - .ssrc(0xd25614e) - .sequence(3.into()) - .timestamp(3) - .marked(true) - .payload(b"pkt 3") - .build() - .unwrap() - .into(), + pkt.0, ) { Ok(Some(PacketItem::RtpPacket(p))) => { // The missing timestamp shouldn't have adjusted time. - assert_eq!(p.timestamp.elapsed(), 1); + assert_eq!(p.timestamp().elapsed(), 1); } o => panic!("unexpected packet 2 result: {:#?}", o), } diff --git a/src/codec/aac.rs b/src/codec/aac.rs index 9d87536..13a98f3 100644 --- a/src/codec/aac.rs +++ b/src/codec/aac.rs @@ -14,14 +14,14 @@ //! ISO base media file format. //! * ISO/IEC 14496-14: MP4 File Format. -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use std::{ convert::TryFrom, fmt::Debug, num::{NonZeroU16, NonZeroU32}, }; -use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error, StreamContextRef}; +use crate::{error::ErrorInt, rtp::ReceivedPacket, ConnectionContext, Error, StreamContextRef}; use super::CodecItem; @@ -442,7 +442,7 @@ pub(crate) struct Depacketizer { /// beginning of a fragment. #[derive(Debug)] struct Aggregate { - ctx: crate::PacketContext, + pkt: ReceivedPacket, /// RTP packets lost before the next frame in this aggregate. Includes old /// loss that caused a previous fragment to be too short. @@ -455,16 +455,6 @@ struct Aggregate { /// to be too short. loss_since_mark: bool, - stream_id: usize, - ssrc: u32, - sequence_number: u16, - - /// The RTP-level timestamp; frame `i` is at timestamp `timestamp + frame_length*i`. - timestamp: crate::Timestamp, - - /// The buffer, positioned at frame 0's header. - buf: Bytes, - /// The index in range `[0, frame_count)` of the next frame to return from `pull`. frame_i: u16, @@ -472,13 +462,8 @@ struct Aggregate { /// been returned by `pull`). frame_count: u16, - /// The starting byte offset of `frame_i`'s data within `buf`. + /// The starting byte offset of `frame_i`'s data within `pkt.payload()`. data_off: usize, - - /// If a mark was set on this packet. When this is false, this should - /// actually be the start of a fragmented frame, but that conversion is - /// currently deferred until `pull`. - mark: bool, } /// The received prefix of a single access unit which has been spread across multiple packets. @@ -552,12 +537,12 @@ impl Depacketizer { Some(super::Parameters::Audio(self.config.to_parameters())) } - pub(super) fn push(&mut self, mut pkt: Packet) -> Result<(), String> { - if pkt.loss > 0 { + pub(super) fn push(&mut self, pkt: ReceivedPacket) -> Result<(), String> { + if pkt.loss() > 0 { if let DepacketizerState::Fragmented(ref mut f) = self.state { log::debug!( "Discarding in-progress fragmented AAC frame due to loss of {} RTP packets.", - pkt.loss + pkt.loss(), ); self.state = DepacketizerState::Idle { prev_loss: f.loss, // note this packet's loss will be added in later. @@ -567,18 +552,19 @@ impl Depacketizer { } // Read the AU headers. - if pkt.payload.len() < 2 { + let payload = pkt.payload(); + if payload.len() < 2 { return Err("packet too short for au-header-length".to_string()); } - let au_headers_length_bits = pkt.payload.get_u16(); + let au_headers_length_bits = u16::from_be_bytes([payload[0], payload[1]]); // AAC-hbr requires 16-bit AU headers: 13-bit size, 3-bit index. if (au_headers_length_bits & 0x7) != 0 { return Err(format!("bad au-headers-length {}", au_headers_length_bits)); } let au_headers_count = au_headers_length_bits >> 4; - let data_off = usize::from(au_headers_count) << 1; - if pkt.payload.len() < (usize::from(au_headers_count) << 1) { + let data_off = 2 + (usize::from(au_headers_count) << 1); + if payload.len() < data_off { return Err("packet too short for au-headers".to_string()); } match &mut self.state { @@ -589,21 +575,22 @@ impl Depacketizer { au_headers_count )); } - if (pkt.timestamp.timestamp as u16) != frag.rtp_timestamp { + if (pkt.timestamp().timestamp as u16) != frag.rtp_timestamp { return Err(format!( "Timestamp changed from 0x{:04x} to 0x{:04x} mid-fragment", - frag.rtp_timestamp, pkt.timestamp.timestamp as u16 + frag.rtp_timestamp, + pkt.timestamp().timestamp as u16 )); } - let au_header = u16::from_be_bytes([pkt.payload[0], pkt.payload[1]]); + let au_header = u16::from_be_bytes([payload[2], payload[3]]); let size = usize::from(au_header >> 3); if size != usize::from(frag.size) { return Err(format!("size changed {}->{} mid-fragment", frag.size, size)); } - let data = &pkt.payload[data_off..]; + let data = &payload[data_off..]; match (frag.buf.len() + data.len()).cmp(&size) { std::cmp::Ordering::Less => { - if pkt.mark { + if pkt.mark() { if frag.loss_since_mark { self.state = DepacketizerState::Idle { prev_loss: frag.loss, @@ -621,18 +608,18 @@ impl Depacketizer { frag.buf.extend_from_slice(data); } std::cmp::Ordering::Equal => { - if !pkt.mark { + if !pkt.mark() { return Err( "frag not marked complete when full data present".to_string() ); } frag.buf.extend_from_slice(data); self.state = DepacketizerState::Ready(super::AudioFrame { - ctx: pkt.ctx, + ctx: *pkt.ctx(), loss: frag.loss, frame_length: NonZeroU32::from(self.config.frame_length), - stream_id: pkt.stream_id, - timestamp: pkt.timestamp, + stream_id: pkt.stream_id(), + timestamp: pkt.timestamp(), data: std::mem::take(&mut frag.buf).freeze(), }); } @@ -647,19 +634,14 @@ impl Depacketizer { if au_headers_count == 0 { return Err("aggregate with no headers".to_string()); } + let loss = pkt.loss(); self.state = DepacketizerState::Aggregated(Aggregate { - ctx: pkt.ctx, - loss: *prev_loss + pkt.loss, - loss_since_mark: *loss_since_mark || pkt.loss > 0, - stream_id: pkt.stream_id, - ssrc: pkt.ssrc, - sequence_number: pkt.sequence_number, - timestamp: pkt.timestamp, - buf: pkt.payload, + pkt, + loss: *prev_loss + loss, + loss_since_mark: *loss_since_mark || loss > 0, frame_i: 0, frame_count: au_headers_count, data_off, - mark: pkt.mark, }); } DepacketizerState::Ready(..) => panic!("push when in state ready"), @@ -683,7 +665,9 @@ impl Depacketizer { } DepacketizerState::Aggregated(mut agg) => { let i = usize::from(agg.frame_i); - let au_header = u16::from_be_bytes([agg.buf[i << 1], agg.buf[(i << 1) + 1]]); + let payload = agg.pkt.payload(); + let mark = agg.pkt.mark(); + let au_header = u16::from_be_bytes([payload[2 + (i << 1)], payload[3 + (i << 1)]]); let size = usize::from(au_header >> 3); let index = au_header & 0b111; if index != 0 { @@ -698,7 +682,7 @@ impl Depacketizer { "interleaving not yet supported".to_owned(), )); } - if size > agg.buf.len() - agg.data_off { + if size > payload.len() - agg.data_off { // start of fragment if agg.frame_count != 1 { return Err(error( @@ -708,7 +692,7 @@ impl Depacketizer { "fragmented AUs must not share packets".to_owned(), )); } - if agg.mark { + if mark { if agg.loss_since_mark { log::debug!( "Discarding in-progress fragmented AAC frame due to loss of {} RTP packets.", @@ -728,9 +712,9 @@ impl Depacketizer { )); } let mut buf = BytesMut::with_capacity(size); - buf.extend_from_slice(&agg.buf[agg.data_off..]); + buf.extend_from_slice(&payload[agg.data_off..]); self.state = DepacketizerState::Fragmented(Fragment { - rtp_timestamp: agg.timestamp.timestamp as u16, + rtp_timestamp: agg.pkt.timestamp().timestamp as u16, loss: agg.loss, loss_since_mark: agg.loss_since_mark, size: size as u16, @@ -738,7 +722,7 @@ impl Depacketizer { }); return Ok(None); } - if !agg.mark { + if !mark { return Err(error( *conn_ctx, stream_ctx, @@ -748,11 +732,11 @@ impl Depacketizer { } let delta = u32::from(agg.frame_i) * u32::from(self.config.frame_length.get()); - let agg_timestamp = agg.timestamp; + let agg_timestamp = agg.pkt.timestamp(); let frame = super::AudioFrame { - ctx: agg.ctx, + ctx: *agg.pkt.ctx(), loss: agg.loss, - stream_id: agg.stream_id, + stream_id: agg.pkt.stream_id(), frame_length: NonZeroU32::from(self.config.frame_length), // u16 * u16 can't overflow u32, but i64 + u32 can overflow i64. @@ -770,7 +754,7 @@ impl Depacketizer { )) } }, - data: agg.buf.slice(agg.data_off..agg.data_off + size), + data: Bytes::copy_from_slice(&payload[agg.data_off..agg.data_off + size]), }; agg.loss = 0; agg.data_off += size; @@ -793,16 +777,18 @@ fn error( Error(std::sync::Arc::new(ErrorInt::RtpPacketError { conn_ctx, stream_ctx: stream_ctx.to_owned(), - pkt_ctx: agg.ctx, - stream_id: agg.stream_id, - ssrc: agg.ssrc, - sequence_number: agg.sequence_number, + pkt_ctx: *agg.pkt.ctx(), + stream_id: agg.pkt.stream_id(), + ssrc: agg.pkt.ssrc(), + sequence_number: agg.pkt.sequence_number(), description, })) } #[cfg(test)] mod tests { + use crate::{rtp::ReceivedPacketBuilder, PacketContext}; + use super::*; #[test] @@ -834,23 +820,26 @@ mod tests { }; // Single frame. - d.push(Packet { - // single frame. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + ctx: PacketContext::dummy(), + stream_id: 0, + sequence_number: 0, + timestamp, + payload_type: 0, + ssrc: 0, + mark: true, + loss: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x20, // AU-header: AU-size=4 + AU-index=0 b'a', b's', b'd', b'f', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let a = match d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -867,15 +856,18 @@ mod tests { .is_none()); // Aggregate of 3 frames. - d.push(Packet { - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x30, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 3 headers @@ -883,8 +875,9 @@ mod tests { 0x00, 0x18, // AU-header: AU-size=3 + AU-index-delta=0 0x00, 0x18, // AU-header: AU-size=3 + AU-index-delta=0 b'f', b'o', b'o', b'b', b'a', b'r', b'b', b'a', b'z', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let a = match d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -919,67 +912,79 @@ mod tests { .is_none()); // Fragment across 3 packets. - d.push(Packet { - // fragment 1/3. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: false, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // fragment 1/3. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'f', b'o', b'o', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) .unwrap() .is_none()); - d.push(Packet { - // fragment 2/3. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: false, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // fragment 2/3. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'r', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) .unwrap() .is_none()); - d.push(Packet { - // fragment 3/3. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // fragment 3/3. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'z', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let a = match d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1011,43 +1016,51 @@ mod tests { }; // Fragment - d.push(Packet { - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 1, - mark: false, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 1, + mark: false, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'r', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) .unwrap() .is_none()); - d.push(Packet { - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'z', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1055,23 +1068,27 @@ mod tests { .is_none()); // Following frame reports the loss. - d.push(Packet { - // single frame. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // single frame. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x20, // AU-header: AU-size=4 + AU-index=0 b'a', b's', b'd', b'f', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let a = match d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1102,46 +1119,54 @@ mod tests { start: 0, }; - d.push(Packet { - // 1/3 - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: false, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // 1/3 + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'f', b'o', b'o', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) .unwrap() .is_none()); // Fragment 2/3 is lost - d.push(Packet { - // 3/3 reports the loss - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 1, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // 3/3 reports the loss + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 1, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'z', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1149,23 +1174,27 @@ mod tests { .is_none()); // Following frame reports the loss. - d.push(Packet { - // single frame. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // single frame. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x20, // AU-header: AU-size=4 + AU-index=0 b'a', b's', b'd', b'f', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let a = match d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1196,46 +1225,54 @@ mod tests { start: 0, }; - d.push(Packet { - // 1/3 - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: false, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // 1/3 + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'f', b'o', b'o', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) .unwrap() .is_none()); // Fragment 2/3 is lost - d.push(Packet { - // 3/3 reports the loss - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 1, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // 3/3 reports the loss + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 1, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'z', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1243,23 +1280,27 @@ mod tests { .is_none()); // Following frame reports the loss. - d.push(Packet { - // single frame. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // single frame. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x20, // AU-header: AU-size=4 + AU-index=0 b'a', b's', b'd', b'f', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let a = match d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1293,23 +1334,27 @@ mod tests { start: 0, }; - d.push(Packet { - // end of previous fragment, first parts missing. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 1, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // end of previous fragment, first parts missing. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 1, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'r', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); assert!(d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) @@ -1317,23 +1362,27 @@ mod tests { .is_none()); // Incomplete fragment with no reported loss. - d.push(Packet { - // end of previous fragment, first parts missing. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(&[ + d.push( + ReceivedPacketBuilder { + // end of previous fragment, first parts missing. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build([ // https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1 0x00, 0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header 0x00, 0x48, // AU-header: AU-size=9 + AU-index=0 b'b', b'a', b'r', - ]), - }) + ]) + .unwrap(), + ) .unwrap(); let e = d .pull(&ConnectionContext::dummy(), StreamContextRef::dummy()) diff --git a/src/codec/g723.rs b/src/codec/g723.rs index 7fb3da6..9d1c6df 100644 --- a/src/codec/g723.rs +++ b/src/codec/g723.rs @@ -36,32 +36,33 @@ impl Depacketizer { })) } - fn validate(pkt: &crate::client::rtp::Packet) -> bool { - let expected_hdr_bits = match pkt.payload.len() { + fn validate(pkt: &crate::rtp::ReceivedPacket) -> bool { + let payload = pkt.payload(); + let expected_hdr_bits = match payload.len() { 24 => 0b00, 20 => 0b01, 4 => 0b10, _ => return false, }; - let actual_hdr_bits = pkt.payload[0] & 0b11; + let actual_hdr_bits = payload[0] & 0b11; actual_hdr_bits == expected_hdr_bits } - pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> { + pub(super) fn push(&mut self, pkt: crate::rtp::ReceivedPacket) -> Result<(), String> { assert!(self.pending.is_none()); if !Self::validate(&pkt) { return Err(format!( "Invalid G.723 packet: {:#?}", - crate::hex::LimitedHex::new(&pkt.payload, 64), + crate::hex::LimitedHex::new(pkt.payload(), 64), )); } self.pending = Some(super::AudioFrame { - ctx: pkt.ctx, - loss: pkt.loss, - stream_id: pkt.stream_id, - timestamp: pkt.timestamp, + ctx: *pkt.ctx(), + loss: pkt.loss(), + stream_id: pkt.stream_id(), + timestamp: pkt.timestamp(), frame_length: NonZeroU32::new(240).unwrap(), - data: pkt.payload, + data: pkt.into_payload_bytes(), }); Ok(()) } diff --git a/src/codec/h264.rs b/src/codec/h264.rs index 6bd4495..a7f1070 100644 --- a/src/codec/h264.rs +++ b/src/codec/h264.rs @@ -10,7 +10,10 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use h264_reader::nal::{NalHeader, UnitType}; use log::{debug, log_enabled, trace}; -use crate::{client::rtp::Packet, Error, Timestamp}; +use crate::{ + rtp::{ReceivedPacket, ReceivedPacketBuilder}, + Error, Timestamp, +}; use super::VideoFrame; @@ -130,7 +133,7 @@ impl Depacketizer { .map(|p| super::Parameters::Video(p.generic_parameters.clone())) } - pub(super) fn push(&mut self, pkt: Packet) -> Result<(), String> { + pub(super) fn push(&mut self, pkt: ReceivedPacket) -> Result<(), String> { // Push shouldn't be called until pull is exhausted. if let Some(p) = self.pending.as_ref() { panic!("push with data already pending: {:?}", p); @@ -144,22 +147,23 @@ impl Depacketizer { AccessUnit::start(&pkt, 0, false) } DepacketizerInputState::PreMark(mut access_unit) => { - if pkt.loss > 0 { + let loss = pkt.loss(); + if loss > 0 { self.nals.clear(); self.pieces.clear(); - if access_unit.timestamp.timestamp == pkt.timestamp.timestamp { + if access_unit.timestamp.timestamp == pkt.timestamp().timestamp { // Loss within this access unit. Ignore until mark or new timestamp. - self.input_state = if pkt.mark { + self.input_state = if pkt.mark() { DepacketizerInputState::PostMark { - timestamp: pkt.timestamp, - loss: pkt.loss, + timestamp: pkt.timestamp(), + loss, } } else { self.pieces.clear(); self.nals.clear(); DepacketizerInputState::Loss { - timestamp: pkt.timestamp, - pkts: pkt.loss, + timestamp: pkt.timestamp(), + pkts: loss, } }; return Ok(()); @@ -167,16 +171,17 @@ impl Depacketizer { // A suffix of a previous access unit was lost; discard it. // A prefix of the new one may have been lost; try parsing. AccessUnit::start(&pkt, 0, false) - } else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp { + } else if access_unit.timestamp.timestamp != pkt.timestamp().timestamp { if access_unit.in_fu_a { return Err(format!( "Timestamp changed from {} to {} in the middle of a fragmented NAL", - access_unit.timestamp, pkt.timestamp + access_unit.timestamp, + pkt.timestamp() )); } let last_nal_hdr = self.nals.last().unwrap().hdr; if can_end_au(last_nal_hdr.nal_unit_type()) { - access_unit.end_ctx = pkt.ctx; + access_unit.end_ctx = *pkt.ctx(); self.pending = Some(self.finalize_access_unit(access_unit, "ts change")?); AccessUnit::start(&pkt, 0, false) @@ -185,7 +190,7 @@ impl Depacketizer { "Bogus mid-access unit timestamp change after {:?}", last_nal_hdr ); - access_unit.timestamp.timestamp = pkt.timestamp.timestamp; + access_unit.timestamp.timestamp = pkt.timestamp().timestamp; access_unit } } else { @@ -198,7 +203,7 @@ impl Depacketizer { } => { debug_assert!(self.nals.is_empty()); debug_assert!(self.pieces.is_empty()); - AccessUnit::start(&pkt, loss, state_ts.timestamp == pkt.timestamp.timestamp) + AccessUnit::start(&pkt, loss, state_ts.timestamp == pkt.timestamp().timestamp) } DepacketizerInputState::Loss { timestamp, @@ -206,8 +211,8 @@ impl Depacketizer { } => { debug_assert!(self.nals.is_empty()); debug_assert!(self.pieces.is_empty()); - if pkt.timestamp.timestamp == timestamp.timestamp { - pkts += pkt.loss; + if pkt.timestamp().timestamp == timestamp.timestamp { + pkts += pkt.loss(); self.input_state = DepacketizerInputState::Loss { timestamp, pkts }; return Ok(()); } @@ -215,7 +220,11 @@ impl Depacketizer { } }; - let mut data = pkt.payload; + let ctx = *pkt.ctx(); + let mark = pkt.mark(); + let loss = pkt.loss(); + let timestamp = pkt.timestamp(); + let mut data = pkt.into_payload_bytes(); if data.is_empty() { return Err("Empty NAL".into()); } @@ -309,7 +318,7 @@ impl Depacketizer { if (start && end) || reserved { return Err(format!("Invalid FU-A header {:02x}", fu_header)); } - if !end && pkt.mark { + if !end && mark { return Err("FU-A pkt with MARK && !END".into()); } let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX"); @@ -337,17 +346,17 @@ impl Depacketizer { if end { nal.next_piece_idx = pieces; access_unit.in_fu_a = false; - } else if pkt.mark { + } else if mark { return Err("FU-A has MARK and no END".into()); } } (false, false) => { - if pkt.loss > 0 { + if loss > 0 { self.pieces.clear(); self.nals.clear(); self.input_state = DepacketizerInputState::Loss { - timestamp: pkt.timestamp, - pkts: pkt.loss, + timestamp, + pkts: loss, }; return Ok(()); } @@ -357,21 +366,18 @@ impl Depacketizer { } _ => return Err(format!("bad nal header {:02x}", nal_header)), } - self.input_state = if pkt.mark { + self.input_state = if mark { let last_nal_hdr = self.nals.last().unwrap().hdr; if can_end_au(last_nal_hdr.nal_unit_type()) { - access_unit.end_ctx = pkt.ctx; + access_unit.end_ctx = ctx; self.pending = Some(self.finalize_access_unit(access_unit, "mark")?); - DepacketizerInputState::PostMark { - timestamp: pkt.timestamp, - loss: 0, - } + DepacketizerInputState::PostMark { timestamp, loss: 0 } } else { log::debug!( "Bogus mid-access unit timestamp change after {:?}", last_nal_hdr ); - access_unit.timestamp.timestamp = pkt.timestamp.timestamp; + access_unit.timestamp.timestamp = timestamp.timestamp; DepacketizerInputState::PreMark(access_unit) } } else { @@ -543,19 +549,19 @@ fn can_end_au(nal_unit_type: UnitType) -> bool { impl AccessUnit { fn start( - pkt: &crate::client::rtp::Packet, + pkt: &crate::rtp::ReceivedPacket, additional_loss: u16, same_ts_as_prev: bool, ) -> Self { AccessUnit { - start_ctx: pkt.ctx, - end_ctx: pkt.ctx, - timestamp: pkt.timestamp, - stream_id: pkt.stream_id, + start_ctx: *pkt.ctx(), + end_ctx: *pkt.ctx(), + timestamp: pkt.timestamp(), + stream_id: pkt.stream_id(), in_fu_a: false, // TODO: overflow? - loss: pkt.loss + additional_loss, + loss: pkt.loss() + additional_loss, same_ts_as_prev, } } @@ -806,6 +812,8 @@ pub struct Packetizer { max_payload_size: u16, next_sequence_number: u16, stream_id: usize, + ssrc: u32, + payload_type: u8, state: PacketizerState, } @@ -814,6 +822,8 @@ impl Packetizer { max_payload_size: u16, stream_id: usize, initial_sequence_number: u16, + payload_type: u8, + ssrc: u32, ) -> Result { if max_payload_size < 3 { // minimum size to make progress with FU-A packets. @@ -823,6 +833,8 @@ impl Packetizer { max_payload_size, stream_id, next_sequence_number: initial_sequence_number, + ssrc, + payload_type, state: PacketizerState::Idle, }) } @@ -834,7 +846,7 @@ impl Packetizer { } // TODO: better error type? - pub fn pull(&mut self) -> Result, String> { + pub fn pull(&mut self) -> Result, String> { let max_payload_size = usize::from(self.max_payload_size); match std::mem::replace(&mut self.state, PacketizerState::Idle) { PacketizerState::Idle => Ok(None), @@ -867,11 +879,22 @@ impl Packetizer { if usize_len > max_payload_size { // start a FU-A. data.advance(1); - let mut payload = Vec::with_capacity(max_payload_size); let fu_indicator = (hdr.nal_ref_idc() << 5) | 28; let fu_header = 0b1000_0000 | hdr.nal_unit_type().id(); // START bit set. - payload.extend_from_slice(&[fu_indicator, fu_header]); - payload.extend_from_slice(&data[..max_payload_size - 2]); + let payload = IntoIterator::into_iter([fu_indicator, fu_header]) + .chain(data[..max_payload_size - 2].iter().copied()); + // TODO: ctx and channel_id are placeholders. + let pkt = ReceivedPacketBuilder { + ctx: crate::PacketContext::dummy(), + stream_id: self.stream_id, + timestamp, + ssrc: self.ssrc, + sequence_number, + loss: 0, + mark: false, + payload_type: self.payload_type, + } + .build(payload)?; data.advance(max_payload_size - 2); self.state = PacketizerState::InFragment { timestamp, @@ -879,17 +902,7 @@ impl Packetizer { left: len + 1 - u32::from(self.max_payload_size), data, }; - // TODO: ctx, channel_id, and ssrc are placeholders. - return Ok(Some(Packet { - ctx: crate::PacketContext::dummy(), - stream_id: self.stream_id, - timestamp, - ssrc: 0, - sequence_number, - loss: 0, - mark: false, - payload: Bytes::from(payload), - })); + return Ok(Some(pkt)); } // Send a plain NAL packet. (TODO: consider using STAP-A.) @@ -903,16 +916,19 @@ impl Packetizer { }; mark = false; } - Ok(Some(Packet { - ctx: crate::PacketContext::dummy(), - stream_id: self.stream_id, - timestamp, - ssrc: 0, - sequence_number, - loss: 0, - mark, - payload: data, - })) + Ok(Some( + ReceivedPacketBuilder { + ctx: crate::PacketContext::dummy(), + stream_id: self.stream_id, + timestamp, + ssrc: self.ssrc, + sequence_number, + loss: 0, + mark, + payload_type: self.payload_type, + } + .build(data)?, + )) } PacketizerState::InFragment { timestamp, @@ -955,16 +971,19 @@ impl Packetizer { } } // TODO: placeholders. - Ok(Some(Packet { - ctx: crate::PacketContext::dummy(), - stream_id: self.stream_id, - timestamp, - ssrc: 0, - sequence_number, - loss: 0, - mark, - payload: Bytes::from(payload), - })) + Ok(Some( + ReceivedPacketBuilder { + ctx: crate::PacketContext::dummy(), + stream_id: self.stream_id, + timestamp, + ssrc: self.ssrc, + sequence_number, + loss: 0, + mark, + payload_type: self.payload_type, + } + .build(payload)?, + )) } } } @@ -994,10 +1013,9 @@ enum PacketizerState { #[cfg(test)] mod tests { - use bytes::Bytes; use std::num::NonZeroU32; - use crate::{client::rtp::Packet, codec::CodecItem}; + use crate::{codec::CodecItem, rtp::ReceivedPacketBuilder}; /* * This test requires @@ -1060,69 +1078,89 @@ mod tests { clock_rate: NonZeroU32::new(90_000).unwrap(), start: 0, }; - d.push(Packet { - // plain SEI packet. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x06plain"), - }) + d.push( + ReceivedPacketBuilder { + // plain SEI packet. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build(b"\x06plain".iter().copied()) + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // STAP-A packet. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 1, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2"), - }) + d.push( + ReceivedPacketBuilder { + // STAP-A packet. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // FU-A packet, start. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 2, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x7c\x86fu-a start, "), - }) + d.push( + ReceivedPacketBuilder { + // FU-A packet, start. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 2, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x7c\x86fu-a start, ") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // FU-A packet, middle. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 3, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x7c\x06fu-a middle, "), - }) + d.push( + ReceivedPacketBuilder { + // FU-A packet, middle. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 3, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x7c\x06fu-a middle, ") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // FU-A packet, end. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 4, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x7c\x46fu-a end"), - }) + d.push( + ReceivedPacketBuilder { + // FU-A packet, end. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 4, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x7c\x46fu-a end") + .unwrap(), + ) .unwrap(); let frame = match d.pull() { Some(CodecItem::VideoFrame(frame)) => frame, @@ -1153,47 +1191,59 @@ mod tests { clock_rate: NonZeroU32::new(90_000).unwrap(), start: 0, }; - d.push(Packet { - // SPS with (incorrect) mark - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts1, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50"), - }) + d.push( + ReceivedPacketBuilder { + // SPS with (incorrect) mark + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts1, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // PPS - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts1, - ssrc: 0, - sequence_number: 1, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x68\xee\x3c\xb0"), - }) + d.push( + ReceivedPacketBuilder { + // PPS + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts1, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x68\xee\x3c\xb0") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // Slice layer without partitioning IDR. - // This has a different timestamp than the SPS and PPS, even though - // RFC 6184 section 5.1 says that "the timestamp must match that of - // the primary coded picture of the access unit and that the marker - // bit can only be set on the final packet of the access unit."" - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts2, - ssrc: 0, - sequence_number: 2, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x65slice"), - }) + d.push( + ReceivedPacketBuilder { + // Slice layer without partitioning IDR. + // This has a different timestamp than the SPS and PPS, even though + // RFC 6184 section 5.1 says that "the timestamp must match that of + // the primary coded picture of the access unit and that the marker + // bit can only be set on the final packet of the access unit."" + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts2, + ssrc: 0, + sequence_number: 2, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x65slice") + .unwrap(), + ) .unwrap(); let frame = match d.pull() { Some(CodecItem::VideoFrame(frame)) => frame, @@ -1224,18 +1274,22 @@ mod tests { clock_rate: NonZeroU32::new(90_000).unwrap(), start: 0, }; - d.push(Packet { - // Slice layer without partitioning non-IDR, representing the - // last frame of the previous GOP. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts1, - ssrc: 0, - sequence_number: 0, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x01slice"), - }) + d.push( + ReceivedPacketBuilder { + // Slice layer without partitioning non-IDR, representing the + // last frame of the previous GOP. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts1, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x01slice") + .unwrap(), + ) .unwrap(); let frame = match d.pull() { Some(CodecItem::VideoFrame(frame)) => frame, @@ -1243,43 +1297,55 @@ mod tests { }; assert_eq!(&frame.data()[..], b"\x00\x00\x00\x06\x01slice"); assert_eq!(frame.timestamp, ts1); - d.push(Packet { - // SPS with (incorrect) timestamp matching last frame. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts1, - ssrc: 0, - sequence_number: 1, - loss: 0, - mark: false, // correctly has no mark, unlike first SPS in stream. - payload: Bytes::from_static(b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50"), - }) + d.push( + ReceivedPacketBuilder { + // SPS with (incorrect) timestamp matching last frame. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts1, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, // correctly has no mark, unlike first SPS in stream. + payload_type: 0, + } + .build(*b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // PPS, again with timestamp matching last frame. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts1, - ssrc: 0, - sequence_number: 2, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x68\xee\x3c\xb0"), - }) + d.push( + ReceivedPacketBuilder { + // PPS, again with timestamp matching last frame. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts1, + ssrc: 0, + sequence_number: 2, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x68\xee\x3c\xb0") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // Slice layer without partitioning IDR. Now correct timestamp. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp: ts2, - ssrc: 0, - sequence_number: 3, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x65slice"), - }) + d.push( + ReceivedPacketBuilder { + // Slice layer without partitioning IDR. Now correct timestamp. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp: ts2, + ssrc: 0, + sequence_number: 3, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x65slice") + .unwrap(), + ) .unwrap(); let frame = match d.pull() { Some(CodecItem::VideoFrame(frame)) => frame, @@ -1308,7 +1374,7 @@ mod tests { clock_rate: NonZeroU32::new(90_000).unwrap(), start: 0, }; - d.push(Packet { // new SPS. + d.push(ReceivedPacketBuilder { // new SPS. ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, @@ -1316,33 +1382,41 @@ mod tests { sequence_number: 0, loss: 0, mark: false, - payload: Bytes::from_static(b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10"), - }).unwrap(); + payload_type: 0, + }.build(*b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10").unwrap()).unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // same PPS again. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 1, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x68\xee\x3c\x80"), - }) + d.push( + ReceivedPacketBuilder { + // same PPS again. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x68\xee\x3c\x80") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // dummy slice NAL to end the AU. - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 2, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x65slice"), - }) + d.push( + ReceivedPacketBuilder { + // dummy slice NAL to end the AU. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 2, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x65slice") + .unwrap(), + ) .unwrap(); // By codec::Depacketizer::parameters's contract, it's unspecified what the depacketizer @@ -1412,7 +1486,7 @@ mod tests { clock_rate: NonZeroU32::new(90_000).unwrap(), start: 0, }; - d.push(Packet { + d.push(ReceivedPacketBuilder { // SPS ctx: crate::PacketContext::dummy(), stream_id: 0, @@ -1421,36 +1495,43 @@ mod tests { sequence_number: 0, loss: 0, mark: false, - payload: Bytes::from_static( - b"\x67\x4d\x00\x28\xe9\x00\xf0\x04\x4f\xcb\x08\x00\x00\x1f\x48\x00\x07\x54\xe0\x20", - ), - }) + payload_type: 0, + }.build( + *b"\x67\x4d\x00\x28\xe9\x00\xf0\x04\x4f\xcb\x08\x00\x00\x1f\x48\x00\x07\x54\xe0\x20", + ).unwrap()).unwrap(); + assert!(d.pull().is_none()); + d.push( + ReceivedPacketBuilder { + // PPS + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x68\xea\x8f\x20") + .unwrap(), + ) .unwrap(); assert!(d.pull().is_none()); - d.push(Packet { - // PPS - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 1, - loss: 0, - mark: false, - payload: Bytes::from_static(b"\x68\xea\x8f\x20"), - }) - .unwrap(); - assert!(d.pull().is_none()); - d.push(Packet { - // IDR slice - ctx: crate::PacketContext::dummy(), - stream_id: 0, - timestamp, - ssrc: 0, - sequence_number: 2, - loss: 0, - mark: true, - payload: Bytes::from_static(b"\x65idr slice"), - }) + d.push( + ReceivedPacketBuilder { + // IDR slice + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 2, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x65idr slice") + .unwrap(), + ) .unwrap(); let frame = match d.pull() { Some(CodecItem::VideoFrame(frame)) => frame, diff --git a/src/codec/mod.rs b/src/codec/mod.rs index d969d95..48fe05c 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -9,7 +9,7 @@ use std::num::{NonZeroU16, NonZeroU32}; -use crate::client::rtp; +use crate::rtp::ReceivedPacket; use crate::ConnectionContext; use crate::Error; use crate::StreamContextRef; @@ -191,7 +191,7 @@ pub struct AudioFrame { pub timestamp: crate::Timestamp, pub frame_length: NonZeroU32, - /// Number of lost RTP packets before this audio frame. See [crate::client::rtp::Packet::loss]. + /// Number of lost RTP packets before this audio frame. See [crate::rtp::ReceivedPacket::loss]. /// Note that if loss occurs during a fragmented frame, more than this number of packets' worth /// of data may be skipped. pub loss: u16, @@ -237,8 +237,9 @@ pub struct MessageFrame { pub timestamp: crate::Timestamp, pub stream_id: usize, - /// Number of lost RTP packets before this message frame. See [crate::client::rtp::Packet::loss]. - /// If this is non-zero, a prefix of the message may be missing. + /// Number of lost RTP packets before this message frame. See + /// [crate::rtp::ReceivedPacket::loss]. If this is non-zero, a prefix of the + /// message may be missing. pub loss: u16, // TODO: expose bytes or Buf (for zero-copy)? @@ -273,7 +274,7 @@ pub struct VideoFrame { // parameters, see [`crate::client::Stream::parameters`]. pub new_parameters: Option>, - /// Number of lost RTP packets before this video frame. See [crate::client::rtp::Packet::loss]. + /// Number of lost RTP packets before this video frame. See [crate::rtp::ReceivedPacket::loss]. /// Note that if loss occurs during a fragmented frame, more than this number of packets' worth /// of data may be skipped. pub loss: u16, @@ -458,7 +459,7 @@ impl Depacketizer { /// Depacketizers are not required to buffer unbounded numbers of packets. Between any two /// calls to `push`, the caller must call `pull` until `pull` returns `Ok(None)`. The later /// `push` call may panic or drop data if this expectation is violated. - pub fn push(&mut self, input: rtp::Packet) -> Result<(), String> { + pub fn push(&mut self, input: ReceivedPacket) -> Result<(), String> { match &mut self.0 { DepacketizerInner::Aac(d) => d.push(input), DepacketizerInner::G723(d) => d.push(input), diff --git a/src/codec/onvif.rs b/src/codec/onvif.rs index dfef3d1..b312dba 100644 --- a/src/codec/onvif.rs +++ b/src/codec/onvif.rs @@ -57,54 +57,55 @@ impl Depacketizer { ))) } - pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> { - if pkt.loss > 0 { + pub(super) fn push(&mut self, pkt: crate::rtp::ReceivedPacket) -> Result<(), String> { + if pkt.loss() > 0 { if let State::InProgress(in_progress) = &self.state { log::debug!( "Discarding {}-byte message prefix due to loss of {} RTP packets", in_progress.data.len(), - pkt.loss + pkt.loss(), ); self.state = State::Idle; } } let mut in_progress = match std::mem::replace(&mut self.state, State::Idle) { State::InProgress(in_progress) => { - if in_progress.timestamp.timestamp != pkt.timestamp.timestamp { + if in_progress.timestamp.timestamp != pkt.timestamp().timestamp { return Err(format!( "Timestamp changed from {} to {} with message in progress", - &in_progress.timestamp, &pkt.timestamp, + &in_progress.timestamp, + &pkt.timestamp(), )); } in_progress } State::Ready(..) => panic!("push while in state ready"), State::Idle => { - if pkt.mark { + if pkt.mark() { // fast-path: avoid copy. self.state = State::Ready(super::MessageFrame { - stream_id: pkt.stream_id, - loss: pkt.loss, - ctx: pkt.ctx, - timestamp: pkt.timestamp, - data: pkt.payload, + stream_id: pkt.stream_id(), + loss: pkt.loss(), + ctx: *pkt.ctx(), + timestamp: pkt.timestamp(), + data: pkt.into_payload_bytes(), }); return Ok(()); } InProgress { - loss: pkt.loss, - ctx: pkt.ctx, - timestamp: pkt.timestamp, + loss: pkt.loss(), + ctx: *pkt.ctx(), + timestamp: pkt.timestamp(), data: BytesMut::with_capacity(self.high_water_size), } } }; - in_progress.data.put(pkt.payload); - if pkt.mark { + in_progress.data.put(pkt.payload()); + if pkt.mark() { self.high_water_size = std::cmp::max(self.high_water_size, in_progress.data.remaining()); self.state = State::Ready(super::MessageFrame { - stream_id: pkt.stream_id, + stream_id: pkt.stream_id(), ctx: in_progress.ctx, timestamp: in_progress.timestamp, data: in_progress.data.freeze(), diff --git a/src/codec/simple_audio.rs b/src/codec/simple_audio.rs index 3a3b72d..74ac405 100644 --- a/src/codec/simple_audio.rs +++ b/src/codec/simple_audio.rs @@ -48,22 +48,23 @@ impl Depacketizer { } } - pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> { + pub(super) fn push(&mut self, pkt: crate::rtp::ReceivedPacket) -> Result<(), String> { assert!(self.pending.is_none()); - let frame_length = self.frame_length(pkt.payload.len()).ok_or_else(|| { + let payload = pkt.payload(); + let frame_length = self.frame_length(payload.len()).ok_or_else(|| { format!( "invalid length {} for payload of {}-bit audio samples", - pkt.payload.len(), + payload.len(), self.bits_per_sample ) })?; self.pending = Some(super::AudioFrame { - loss: pkt.loss, - ctx: pkt.ctx, - stream_id: pkt.stream_id, - timestamp: pkt.timestamp, + loss: pkt.loss(), + ctx: *pkt.ctx(), + stream_id: pkt.stream_id(), + timestamp: pkt.timestamp(), frame_length, - data: pkt.payload, + data: pkt.into_payload_bytes(), }); Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index ec2f567..226f3a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,8 @@ mod error; mod rtcp; mod hex; +pub mod rtp; + #[cfg(test)] mod testutil; diff --git a/src/rtp.rs b/src/rtp.rs new file mode 100644 index 0000000..e998bc8 --- /dev/null +++ b/src/rtp.rs @@ -0,0 +1,344 @@ +// Copyright (C) 2021 Scott Lamb +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Handles RTP data as described in +//! [RFC 3550 section 5.1](https://datatracker.ietf.org/doc/html/rfc3550#section-5.1). + +use std::convert::TryFrom; +use std::ops::Range; + +use bytes::{Buf, Bytes}; + +use crate::{PacketContext, Timestamp}; + +/// The minimum length of an RTP header (no CSRCs or extensions). +const MIN_HEADER_LEN: u16 = 12; + +/// Raw packet without state-specific interpretation or metadata. +/// +/// This design is inspired by [`rtp-rs`](https://crates.io/crates/rtp-rs) in +/// that it primarily validates a raw buffer then provides accessors for it. +/// Some differences though: +/// +/// * allows keeping around the payload range (determined during +/// construction/validation) as a `Range` , rather than reconstructing +/// on later accesses. +/// * currently owns the `Bytes`, although this design may change when/if we +/// [redo the buffering +/// model](https://github.com/scottlamb/retina/issues/6). +/// * directly exposes the sequence number as a `u16`, rather than having an +/// extra type that I find awkward to work with. +pub(crate) struct RawPacket( + /// Full packet data, including headers. + /// + /// ```text + /// 0 1 2 3 + /// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + /// |V=2|P|X| CC |M| PT | sequence number | + /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + /// | timestamp | + /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + /// | synchronization source (SSRC) identifier | + /// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + /// | contributing source (CSRC) identifiers | + /// | .... | + /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + /// ``` + pub Bytes, +); + +impl RawPacket { + /// Validates an RTP packet, returning a wrapper and the payload range. + /// + /// The payload range is not part of the `RawPacket` to avoid extra padding + /// bytes within the containing `ReceivedPacket`. + pub fn new(data: Bytes) -> Result<(Self, Range), RawPacketError> { + // RTP doesn't have a defined maximum size but it's implied by the transport: + // * UDP packets (even with fragmentation) are at most 65,536 (minus IP/UDP headers). + // * interleaved RTSP data messages have at most 65,536 bytes of data. + let len = match u16::try_from(data.len()) { + Ok(l) => l, + Err(_) => { + return Err(RawPacketError { + reason: "too long", + data, + }) + } + }; + if len < MIN_HEADER_LEN { + return Err(RawPacketError { + reason: "too short", + data, + }); + } + if (data[0] & 0b1100_0000) != 2 << 6 { + return Err(RawPacketError { + reason: "must be version 2", + data, + }); + } + let has_padding = (data[0] & 0b0010_0000) != 0; + let has_extension = (data[0] & 0b0001_0000) != 0; + let csrc_count = data[0] & 0b0000_1111; + let csrc_end = MIN_HEADER_LEN + (4 * u16::from(csrc_count)); + let payload_start = if has_extension { + if data.len() < usize::from(csrc_end + 4) { + return Err(RawPacketError { + reason: "extension is after end of packet", + data, + }); + } + let extension_len = u16::from_be_bytes([ + data[usize::from(csrc_end) + 1], + data[usize::from(csrc_end) + 2], + ]); + match csrc_end.checked_add(extension_len) { + Some(s) => s, + None => { + return Err(RawPacketError { + reason: "extension extends beyond maximum packet size", + data, + }) + } + } + } else { + csrc_end + }; + if len < payload_start { + return Err(RawPacketError { + reason: "payload start is after end of packet", + data, + }); + } + let payload_end = if has_padding { + if len == payload_start { + return Err(RawPacketError { + reason: "missing padding", + data, + }); + } + let padding_len = u16::from(data[data.len() - 1]); + if padding_len == 0 { + return Err(RawPacketError { + reason: "invalid padding length 0", + data, + }); + } + let payload_end = match len.checked_sub(padding_len) { + Some(e) => e, + None => { + return Err(RawPacketError { + reason: "padding larger than packet", + data, + }) + } + }; + if payload_end < payload_start { + return Err(RawPacketError { + reason: "bad padding", + data, + }); + } + payload_end + } else { + len + }; + Ok((Self(data), payload_start..payload_end)) + } + + #[inline] + pub fn mark(&self) -> bool { + (self.0[1] & 0b1000_0000) != 0 + } + + #[inline] + pub fn sequence_number(&self) -> u16 { + assert!(self.0.len() >= usize::from(MIN_HEADER_LEN)); + u16::from_be_bytes([self.0[2], self.0[3]]) + } + + #[inline] + pub fn ssrc(&self) -> u32 { + assert!(self.0.len() >= usize::from(MIN_HEADER_LEN)); + u32::from_be_bytes([self.0[8], self.0[9], self.0[10], self.0[11]]) + } + + #[inline] + pub fn payload_type(&self) -> u8 { + self.0[1] & 0b0111_1111 + } + + #[inline] + pub fn timestamp(&self) -> u32 { + assert!(self.0.len() >= usize::from(MIN_HEADER_LEN)); + u32::from_be_bytes([self.0[4], self.0[5], self.0[6], self.0[7]]) + } +} + +#[derive(Debug)] +#[doc(hidden)] +pub struct RawPacketError { + pub reason: &'static str, + pub data: Bytes, +} + +pub(crate) struct RawPacketBuilder { + pub sequence_number: u16, + pub timestamp: u32, + pub payload_type: u8, + pub ssrc: u32, + pub mark: bool, +} + +impl RawPacketBuilder { + pub(crate) fn build>( + self, + payload: P, + ) -> Result<(RawPacket, Range), &'static str> { + if self.payload_type >= 0x80 { + return Err("payload type too large"); + } + let data: Bytes = IntoIterator::into_iter([ + 2 << 6, // version=2, no padding, no extensions, no CSRCs. + if self.mark { 0b1000_0000 } else { 0 } | self.payload_type, + ]) + .chain(self.sequence_number.to_be_bytes()) + .chain(self.timestamp.to_be_bytes()) + .chain(self.ssrc.to_be_bytes()) + .chain(payload) + .collect(); + let len = u16::try_from(data.len()).map_err(|_| "payload too long")?; + Ok((RawPacket(data), MIN_HEADER_LEN..len)) + } +} + +/// A received RTP packet. +/// +/// This holds more information than the packet itself: also a +/// [`PacketContext`], the stream, and extended timestamp. +pub struct ReceivedPacket { + // Currently this is constructed from crate::client::rtp, so everything here + // is pub(crate). + pub(crate) ctx: PacketContext, + pub(crate) stream_id: usize, + pub(crate) timestamp: crate::Timestamp, + pub(crate) raw: RawPacket, + pub(crate) payload_range: Range, + + // TODO: consider dropping this field in favor of a PacketItem::Loss. + // https://github.com/scottlamb/retina/issues/47 + pub(crate) loss: u16, +} + +impl std::fmt::Debug for ReceivedPacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReceivedPacket") + .field("ctx", &self.ctx) + .field("stream_id", &self.stream_id) + .field("timestamp", &self.timestamp) + .field("ssrc", &self.raw.ssrc()) + .field("sequence_number", &self.raw.sequence_number()) + .field("mark", &self.raw.mark()) + .field("payload", &crate::hex::LimitedHex::new(&self.payload(), 64)) + .finish() + } +} + +impl ReceivedPacket { + #[inline] + pub fn timestamp(&self) -> crate::Timestamp { + self.timestamp + } + + #[inline] + pub fn mark(&self) -> bool { + self.raw.mark() + } + + #[inline] + pub fn ctx(&self) -> &PacketContext { + &self.ctx + } + + #[inline] + pub fn stream_id(&self) -> usize { + self.stream_id + } + + #[inline] + pub fn ssrc(&self) -> u32 { + self.raw.ssrc() + } + + #[inline] + pub fn sequence_number(&self) -> u16 { + self.raw.sequence_number() + } + + /// Returns the raw bytes, including the RTP headers. + #[inline] + pub fn raw(&self) -> &[u8] { + &self.raw.0[..] + } + + /// Returns only the payload bytes. + #[inline] + pub fn payload(&self) -> &[u8] { + &self.raw.0[usize::from(self.payload_range.start)..usize::from(self.payload_range.end)] + } + + #[inline] + pub fn loss(&self) -> u16 { + self.loss + } + + /// Consumes the `ReceivedPacket` and returns the `Payload` as a [`Bytes`]. + /// + /// This is currently is very efficient (no copying or reference-counting), + /// although that is not an API guarantee. + #[inline] + pub fn into_payload_bytes(self) -> Bytes { + let mut data = self.raw.0; + data.truncate(usize::from(self.payload_range.end)); + data.advance(usize::from(self.payload_range.start)); + data + } +} + +/// Testing API; exposed for fuzz tests. +#[doc(hidden)] +pub struct ReceivedPacketBuilder { + pub ctx: PacketContext, + pub stream_id: usize, + pub sequence_number: u16, + pub timestamp: Timestamp, + pub payload_type: u8, + pub ssrc: u32, + pub mark: bool, + pub loss: u16, +} + +impl ReceivedPacketBuilder { + pub fn build>( + self, + payload: P, + ) -> Result { + let (raw, payload_range) = RawPacketBuilder { + sequence_number: self.sequence_number, + timestamp: self.timestamp.timestamp as u32, + payload_type: self.payload_type, + ssrc: self.ssrc, + mark: self.mark, + } + .build(payload)?; + Ok(ReceivedPacket { + ctx: self.ctx, + stream_id: self.stream_id, + timestamp: self.timestamp, + raw, + payload_range, + loss: self.loss, + }) + } +}