revamp Packet type

As described in #47 and #58: revamp this type to keep the full raw
packet, and provide accessors rather than raw fields. It's also
smaller now, which is nice.
This commit is contained in:
Scott Lamb 2022-04-28 15:00:36 -07:00
parent 7ea09dde3c
commit 2e34bf927e
17 changed files with 1134 additions and 721 deletions

View File

@ -14,6 +14,9 @@
with each packet.
* BREAKING: `PacketItem` and `CodecItem` are now `#[non_exhaustive]` for
future expansion.
* BREAKING: `retina::client::rtp::Packet` is now
`retina::rtp::ReceivedPacket`, and field access has been removed in favor
of accessors.
## `v0.3.9` (2022-04-12)

7
Cargo.lock generated
View File

@ -928,7 +928,6 @@ dependencies = [
"pin-project",
"pretty-hex",
"rand",
"rtp-rs",
"rtsp-types",
"sdp-types",
"smallvec",
@ -951,12 +950,6 @@ dependencies = [
"mpeg4-audio-const",
]
[[package]]
name = "rtp-rs"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355"
[[package]]
name = "rtsp-types"
version = "0.0.3"

View File

@ -23,7 +23,6 @@ once_cell = "1.7.2"
pin-project = "1.0.7"
pretty-hex = "0.2.1"
rand = "0.8.3"
rtp-rs = "0.6.0"
rtsp-types = "0.0.3"
sdp-types = "0.1.4"
smallvec = { version = "1.6.1", features = ["union"] }

View File

@ -79,28 +79,20 @@ fn make_test_data(max_payload_size: u16) -> Bytes {
];
let mut dummy_frame = vec![0; 1048576];
dummy_frame[4] = h264_reader::nal::UnitType::SliceLayerWithoutPartitioningIdr.id();
let mut p = retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104).unwrap();
let mut p =
retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104, 96, 0x4cacc3d1).unwrap();
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
let mut pkt_buf = vec![0; 65536];
for _ in 0..30 {
for &f in &frame_sizes {
dummy_frame[0..4].copy_from_slice(&f.to_be_bytes()[..]);
let frame = Bytes::copy_from_slice(&dummy_frame[..(usize::try_from(f).unwrap() + 4)]);
p.push(timestamp, frame).unwrap();
while let Some(pkt) = p.pull().unwrap() {
let pkt_len = rtp_rs::RtpPacketBuilder::new()
.payload_type(96)
.marked(pkt.mark)
.sequence(rtp_rs::Seq::from(pkt.sequence_number))
.ssrc(0x4cacc3d1)
.timestamp(pkt.timestamp.timestamp() as u32)
.payload(&pkt.payload)
.build_into(&mut pkt_buf)
.unwrap();
let pkt = pkt.raw();
data.push(b'$'); // interleaved data
data.push(0); // channel 0
data.extend_from_slice(&u16::try_from(pkt_len).unwrap().to_be_bytes()[..]);
data.extend_from_slice(&pkt_buf[..pkt_len]);
data.extend_from_slice(&u16::try_from(pkt.len()).unwrap().to_be_bytes()[..]);
data.extend_from_slice(&pkt);
}
timestamp = timestamp.try_add(3000).unwrap();
}

7
fuzz/Cargo.lock generated
View File

@ -544,7 +544,6 @@ dependencies = [
"pin-project",
"pretty-hex",
"rand",
"rtp-rs",
"rtsp-types",
"sdp-types",
"smallvec",
@ -575,12 +574,6 @@ dependencies = [
"mpeg4-audio-const",
]
[[package]]
name = "rtp-rs"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355"
[[package]]
name = "rtsp-types"
version = "0.0.3"

View File

@ -2,12 +2,11 @@
// SPDX-License-Identifier: MIT OR Apache-2.0
#![no_main]
use bytes::{Buf, Bytes};
use libfuzzer_sys::fuzz_target;
use std::num::NonZeroU32;
fuzz_target!(|data: &[u8]| {
let mut data = Bytes::copy_from_slice(data);
let mut data = data;
let mut depacketizer = retina::codec::Depacketizer::new(
"video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=64001E;sprop-parameter-sets=Z2QAHqwsaoLA9puCgIKgAAADACAAAAMD0IAA,aO4xshsA")).unwrap();
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
@ -15,31 +14,38 @@ fuzz_target!(|data: &[u8]| {
let conn_ctx = retina::ConnectionContext::dummy();
let stream_ctx = retina::StreamContextRef::dummy();
let pkt_ctx = retina::PacketContext::dummy();
while data.has_remaining() {
let hdr = data.get_u8();
loop {
let (hdr, rest) = match data.split_first() {
Some(r) => r,
None => return,
};
let ts_change = (hdr & 0b001) != 0;
let mark = (hdr & 0b010) != 0;
let loss = (hdr & 0b100) != 0;
let len = usize::from(hdr >> 3);
if len > data.remaining() {
if rest.len() < len {
return;
}
let (payload, rest) = rest.split_at(len);
data = rest;
if loss {
sequence_number = sequence_number.wrapping_add(1);
}
if ts_change {
timestamp = timestamp.try_add(1).unwrap();
}
let pkt = retina::client::rtp::Packet {
let pkt = retina::rtp::ReceivedPacketBuilder {
ctx: pkt_ctx,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number,
loss: u16::from(loss),
payload_type: 96,
mark,
payload: data.split_off(len),
};
}
.build(payload.iter().copied())
.unwrap();
//println!("pkt: {:#?}", pkt);
if depacketizer.push(pkt).is_err() {
return;

View File

@ -18,7 +18,7 @@ fuzz_target!(|data: &[u8]| {
let conn_ctx = retina::ConnectionContext::dummy();
let stream_ctx = retina::StreamContextRef::dummy();
let max_payload_size = u16::from_be_bytes([data[0], data[1]]);
let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0) {
let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0, 0, 0) {
Ok(p) => p,
Err(_) => return,
};
@ -39,7 +39,7 @@ fuzz_target!(|data: &[u8]| {
let frame = loop {
match p.pull() {
Ok(Some(pkt)) => {
let mark = pkt.mark;
let mark = pkt.mark();
if d.push(pkt).is_err() {
return;
}

View File

@ -1798,7 +1798,7 @@ async fn punch_firewall_hole(
#[derive(Debug)]
#[non_exhaustive]
pub enum PacketItem {
RtpPacket(rtp::Packet),
RtpPacket(crate::rtp::ReceivedPacket),
SenderReport(rtp::SenderReport),
}
@ -2315,7 +2315,7 @@ impl futures::Stream for Demuxed {
loop {
let (stream_id, pkt) = match self.state {
DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) {
Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id, Some(p)),
Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id(), Some(p)),
Some(Ok(PacketItem::SenderReport(p))) => {
return Poll::Ready(Some(Ok(CodecItem::SenderReport(p))))
}
@ -2342,10 +2342,10 @@ impl futures::Stream for Demuxed {
.inner
.ctx();
if let Some(p) = pkt {
let pkt_ctx = p.ctx;
let stream_id = p.stream_id;
let ssrc = p.ssrc;
let sequence_number = p.sequence_number;
let pkt_ctx = *p.ctx();
let stream_id = p.stream_id();
let ssrc = p.ssrc();
let sequence_number = p.sequence_number();
depacketizer.push(p).map_err(|description| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
@ -2498,9 +2498,9 @@ mod tests {
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(&p.payload()[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
@ -2608,9 +2608,9 @@ mod tests {
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(&p.payload()[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
@ -2771,7 +2771,6 @@ mod tests {
("SessionOptions", std::mem::size_of::<SessionOptions>()),
("Demuxed", std::mem::size_of::<Demuxed>()),
("Stream", std::mem::size_of::<Stream>()),
("rtp::Packet", std::mem::size_of::<rtp::Packet>()),
(
"rtp::SenderReport",
std::mem::size_of::<rtp::SenderReport>(),

View File

@ -3,53 +3,17 @@
//! RTP and RTCP handling; see [RFC 3550](https://datatracker.ietf.org/doc/html/rfc3550).
use bytes::{Buf, Bytes};
use bytes::Bytes;
use log::{debug, trace};
use crate::client::PacketItem;
use crate::rtp::{RawPacket, ReceivedPacket};
use crate::{
ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner,
};
use super::{SessionOptions, Timeline};
/// A received RTP packet.
pub struct Packet {
pub ctx: PacketContext,
pub stream_id: usize,
pub timestamp: crate::Timestamp,
pub ssrc: u32,
pub sequence_number: u16,
/// Number of skipped sequence numbers since the last packet.
///
/// In the case of the first packet on the stream, this may also report loss
/// packets since the `RTP-Info` header's `seq` value. However, currently
/// that header is not required to be present and may be ignored (see
/// [`super::PlayOptions::ignore_zero_seq()`].)
pub loss: u16,
pub mark: bool,
/// Guaranteed to be less than u16::MAX bytes.
pub payload: Bytes,
}
impl std::fmt::Debug for Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Packet")
.field("ctx", &self.ctx)
.field("stream_id", &self.stream_id)
.field("timestamp", &self.timestamp)
.field("ssrc", &self.ssrc)
.field("sequence_number", &self.sequence_number)
.field("loss", &self.loss)
.field("mark", &self.mark)
.field("payload", &crate::hex::LimitedHex::new(&self.payload, 64))
.finish()
}
}
/// An RTCP sender report.
#[derive(Debug)]
pub struct SenderReport {
@ -65,7 +29,7 @@ pub struct SenderReport {
/// When using UDP, skips and logs out-of-order packets. When using TCP,
/// fails on them.
///
/// This reports packet loss (via [Packet::loss]) but doesn't prohibit it
/// This reports packet loss (via [ReceivedPacket::loss]) but doesn't prohibit it
/// of more than `i16::MAX` which would be indistinguishable from non-monotonic sequence numbers.
/// Servers sometimes drop packets internally even when sending data via TCP.
///
@ -110,9 +74,9 @@ impl InorderParser {
pkt_ctx: &PacketContext,
timeline: &mut Timeline,
stream_id: usize,
mut data: Bytes,
data: Bytes,
) -> Result<Option<PacketItem>, Error> {
let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| {
let (raw, payload_range) = RawPacket::new(data).map_err(|e| {
wrap!(ErrorInt::PacketError {
conn_ctx: *conn_ctx,
stream_ctx: stream_ctx.to_owned(),
@ -121,8 +85,8 @@ impl InorderParser {
description: format!(
"corrupt RTP header while expecting seq={:04x?}: {:?}\n{:#?}",
&self.next_seq,
e,
crate::hex::LimitedHex::new(&data, 64),
e.reason,
crate::hex::LimitedHex::new(&e.data[..], 64),
),
})
})?;
@ -133,13 +97,13 @@ impl InorderParser {
// "Out-of-order packet or large loss" error. In UDP streams, if these
// are delivered out of order, they will cause the more important other
// packet with the same sequence number to be skipped.
if reader.payload_type() == 50 {
if raw.payload_type() == 50 {
debug!("skipping pkt with invalid payload type 50");
return Ok(None);
}
let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq.
let ssrc = reader.ssrc();
let sequence_number = raw.sequence_number();
let ssrc = raw.ssrc();
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
if matches!(self.ssrc, Some(s) if s != ssrc) {
if matches!(stream_ctx.0, StreamContextRefInner::Udp(_)) {
@ -178,12 +142,12 @@ impl InorderParser {
"Skipping out-of-order seq={:04x} when expecting ssrc={:08x?} seq={:04x?}",
sequence_number,
self.ssrc,
self.next_seq
self.next_seq,
);
return Ok(None);
}
}
let timestamp = match timeline.advance_to(reader.timestamp()) {
let timestamp = match timeline.advance_to(raw.timestamp()) {
Ok(ts) => ts,
Err(description) => bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
@ -196,31 +160,15 @@ impl InorderParser {
}),
};
self.ssrc = Some(ssrc);
let mark = reader.mark();
let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
stream_ctx: stream_ctx.to_owned(),
pkt_ctx: *pkt_ctx,
stream_id,
ssrc,
sequence_number,
description: "empty payload".into(),
})
})?;
data.truncate(payload_range.end);
data.advance(payload_range.start);
self.next_seq = Some(sequence_number.wrapping_add(1));
self.seen_packets += 1;
Ok(Some(PacketItem::RtpPacket(Packet {
Ok(Some(PacketItem::RtpPacket(ReceivedPacket {
ctx: *pkt_ctx,
stream_id,
timestamp,
ssrc,
sequence_number,
raw,
payload_range,
loss,
mark,
payload: data,
})))
}
@ -300,6 +248,15 @@ mod tests {
let stream_ctx = StreamContextRef::dummy();
// Normal packet.
let (pkt, _payload_range) = crate::rtp::RawPacketBuilder {
sequence_number: 0x1234,
timestamp: 141000,
payload_type: 105,
ssrc: 0xd25614e,
mark: true,
}
.build(*b"foo")
.unwrap();
match parser.rtp(
&SessionOptions::default(),
stream_ctx,
@ -308,22 +265,22 @@ mod tests {
&PacketContext::dummy(),
&mut timeline,
0,
rtp_rs::RtpPacketBuilder::new()
.payload_type(105)
.ssrc(0xd25614e)
.sequence(0x1234.into())
.timestamp(141000)
.marked(true)
.payload(b"foo")
.build()
.unwrap()
.into(),
pkt.0,
) {
Ok(Some(PacketItem::RtpPacket(_))) => {}
o => panic!("unexpected packet 1 result: {:#?}", o),
}
// Mystery pt=50 packet with same sequence number.
let (pkt, _payload_range) = crate::rtp::RawPacketBuilder {
sequence_number: 0x1234,
timestamp: 141000,
payload_type: 50,
ssrc: 0xd25614e,
mark: true,
}
.build(*b"bar")
.unwrap();
match parser.rtp(
&SessionOptions::default(),
stream_ctx,
@ -332,16 +289,7 @@ mod tests {
&PacketContext::dummy(),
&mut timeline,
0,
rtp_rs::RtpPacketBuilder::new()
.payload_type(50)
.ssrc(0xd25614e)
.sequence(0x1234.into())
.timestamp(141000)
.marked(true)
.payload(b"bar")
.build()
.unwrap()
.into(),
pkt.0,
) {
Ok(None) => {}
o => panic!("unexpected packet 2 result: {:#?}", o),
@ -360,6 +308,15 @@ mod tests {
};
let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp));
let session_options = SessionOptions::default();
let (pkt, _payload_range) = crate::rtp::RawPacketBuilder {
sequence_number: 2,
timestamp: 2,
payload_type: 96,
ssrc: 0xd25614e,
mark: true,
}
.build(*b"pkt 2")
.unwrap();
match parser.rtp(
&session_options,
stream_ctx,
@ -368,23 +325,23 @@ mod tests {
&PacketContext::dummy(),
&mut timeline,
0,
rtp_rs::RtpPacketBuilder::new()
.payload_type(96)
.ssrc(0xd25614e)
.sequence(2.into())
.timestamp(2)
.marked(true)
.payload(b"pkt 2")
.build()
.unwrap()
.into(),
pkt.0,
) {
Ok(Some(PacketItem::RtpPacket(p))) => {
assert_eq!(p.timestamp.elapsed(), 0);
assert_eq!(p.timestamp().elapsed(), 0);
}
o => panic!("unexpected packet 2 result: {:#?}", o),
}
let (pkt, _payload_range) = crate::rtp::RawPacketBuilder {
sequence_number: 1,
timestamp: 1,
payload_type: 96,
ssrc: 0xd25614e,
mark: true,
}
.build(*b"pkt 1")
.unwrap();
match parser.rtp(
&session_options,
stream_ctx,
@ -393,21 +350,21 @@ mod tests {
&PacketContext::dummy(),
&mut timeline,
0,
rtp_rs::RtpPacketBuilder::new()
.payload_type(96)
.ssrc(0xd25614e)
.sequence(1.into())
.timestamp(1)
.marked(true)
.payload(b"pkt 1")
.build()
.unwrap()
.into(),
pkt.0,
) {
Ok(None) => {}
o => panic!("unexpected packet 1 result: {:#?}", o),
}
let (pkt, _payload_range) = crate::rtp::RawPacketBuilder {
sequence_number: 3,
timestamp: 3,
payload_type: 96,
ssrc: 0xd25614e,
mark: true,
}
.build(*b"pkt 3")
.unwrap();
match parser.rtp(
&session_options,
stream_ctx,
@ -416,20 +373,11 @@ mod tests {
&PacketContext::dummy(),
&mut timeline,
0,
rtp_rs::RtpPacketBuilder::new()
.payload_type(96)
.ssrc(0xd25614e)
.sequence(3.into())
.timestamp(3)
.marked(true)
.payload(b"pkt 3")
.build()
.unwrap()
.into(),
pkt.0,
) {
Ok(Some(PacketItem::RtpPacket(p))) => {
// The missing timestamp shouldn't have adjusted time.
assert_eq!(p.timestamp.elapsed(), 1);
assert_eq!(p.timestamp().elapsed(), 1);
}
o => panic!("unexpected packet 2 result: {:#?}", o),
}

View File

@ -14,14 +14,14 @@
//! ISO base media file format.
//! * ISO/IEC 14496-14: MP4 File Format.
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use std::{
convert::TryFrom,
fmt::Debug,
num::{NonZeroU16, NonZeroU32},
};
use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error, StreamContextRef};
use crate::{error::ErrorInt, rtp::ReceivedPacket, ConnectionContext, Error, StreamContextRef};
use super::CodecItem;
@ -442,7 +442,7 @@ pub(crate) struct Depacketizer {
/// beginning of a fragment.
#[derive(Debug)]
struct Aggregate {
ctx: crate::PacketContext,
pkt: ReceivedPacket,
/// RTP packets lost before the next frame in this aggregate. Includes old
/// loss that caused a previous fragment to be too short.
@ -455,16 +455,6 @@ struct Aggregate {
/// to be too short.
loss_since_mark: bool,
stream_id: usize,
ssrc: u32,
sequence_number: u16,
/// The RTP-level timestamp; frame `i` is at timestamp `timestamp + frame_length*i`.
timestamp: crate::Timestamp,
/// The buffer, positioned at frame 0's header.
buf: Bytes,
/// The index in range `[0, frame_count)` of the next frame to return from `pull`.
frame_i: u16,
@ -472,13 +462,8 @@ struct Aggregate {
/// been returned by `pull`).
frame_count: u16,
/// The starting byte offset of `frame_i`'s data within `buf`.
/// The starting byte offset of `frame_i`'s data within `pkt.payload()`.
data_off: usize,
/// If a mark was set on this packet. When this is false, this should
/// actually be the start of a fragmented frame, but that conversion is
/// currently deferred until `pull`.
mark: bool,
}
/// The received prefix of a single access unit which has been spread across multiple packets.
@ -552,12 +537,12 @@ impl Depacketizer {
Some(super::Parameters::Audio(self.config.to_parameters()))
}
pub(super) fn push(&mut self, mut pkt: Packet) -> Result<(), String> {
if pkt.loss > 0 {
pub(super) fn push(&mut self, pkt: ReceivedPacket) -> Result<(), String> {
if pkt.loss() > 0 {
if let DepacketizerState::Fragmented(ref mut f) = self.state {
log::debug!(
"Discarding in-progress fragmented AAC frame due to loss of {} RTP packets.",
pkt.loss
pkt.loss(),
);
self.state = DepacketizerState::Idle {
prev_loss: f.loss, // note this packet's loss will be added in later.
@ -567,18 +552,19 @@ impl Depacketizer {
}
// Read the AU headers.
if pkt.payload.len() < 2 {
let payload = pkt.payload();
if payload.len() < 2 {
return Err("packet too short for au-header-length".to_string());
}
let au_headers_length_bits = pkt.payload.get_u16();
let au_headers_length_bits = u16::from_be_bytes([payload[0], payload[1]]);
// AAC-hbr requires 16-bit AU headers: 13-bit size, 3-bit index.
if (au_headers_length_bits & 0x7) != 0 {
return Err(format!("bad au-headers-length {}", au_headers_length_bits));
}
let au_headers_count = au_headers_length_bits >> 4;
let data_off = usize::from(au_headers_count) << 1;
if pkt.payload.len() < (usize::from(au_headers_count) << 1) {
let data_off = 2 + (usize::from(au_headers_count) << 1);
if payload.len() < data_off {
return Err("packet too short for au-headers".to_string());
}
match &mut self.state {
@ -589,21 +575,22 @@ impl Depacketizer {
au_headers_count
));
}
if (pkt.timestamp.timestamp as u16) != frag.rtp_timestamp {
if (pkt.timestamp().timestamp as u16) != frag.rtp_timestamp {
return Err(format!(
"Timestamp changed from 0x{:04x} to 0x{:04x} mid-fragment",
frag.rtp_timestamp, pkt.timestamp.timestamp as u16
frag.rtp_timestamp,
pkt.timestamp().timestamp as u16
));
}
let au_header = u16::from_be_bytes([pkt.payload[0], pkt.payload[1]]);
let au_header = u16::from_be_bytes([payload[2], payload[3]]);
let size = usize::from(au_header >> 3);
if size != usize::from(frag.size) {
return Err(format!("size changed {}->{} mid-fragment", frag.size, size));
}
let data = &pkt.payload[data_off..];
let data = &payload[data_off..];
match (frag.buf.len() + data.len()).cmp(&size) {
std::cmp::Ordering::Less => {
if pkt.mark {
if pkt.mark() {
if frag.loss_since_mark {
self.state = DepacketizerState::Idle {
prev_loss: frag.loss,
@ -621,18 +608,18 @@ impl Depacketizer {
frag.buf.extend_from_slice(data);
}
std::cmp::Ordering::Equal => {
if !pkt.mark {
if !pkt.mark() {
return Err(
"frag not marked complete when full data present".to_string()
);
}
frag.buf.extend_from_slice(data);
self.state = DepacketizerState::Ready(super::AudioFrame {
ctx: pkt.ctx,
ctx: *pkt.ctx(),
loss: frag.loss,
frame_length: NonZeroU32::from(self.config.frame_length),
stream_id: pkt.stream_id,
timestamp: pkt.timestamp,
stream_id: pkt.stream_id(),
timestamp: pkt.timestamp(),
data: std::mem::take(&mut frag.buf).freeze(),
});
}
@ -647,19 +634,14 @@ impl Depacketizer {
if au_headers_count == 0 {
return Err("aggregate with no headers".to_string());
}
let loss = pkt.loss();
self.state = DepacketizerState::Aggregated(Aggregate {
ctx: pkt.ctx,
loss: *prev_loss + pkt.loss,
loss_since_mark: *loss_since_mark || pkt.loss > 0,
stream_id: pkt.stream_id,
ssrc: pkt.ssrc,
sequence_number: pkt.sequence_number,
timestamp: pkt.timestamp,
buf: pkt.payload,
pkt,
loss: *prev_loss + loss,
loss_since_mark: *loss_since_mark || loss > 0,
frame_i: 0,
frame_count: au_headers_count,
data_off,
mark: pkt.mark,
});
}
DepacketizerState::Ready(..) => panic!("push when in state ready"),
@ -683,7 +665,9 @@ impl Depacketizer {
}
DepacketizerState::Aggregated(mut agg) => {
let i = usize::from(agg.frame_i);
let au_header = u16::from_be_bytes([agg.buf[i << 1], agg.buf[(i << 1) + 1]]);
let payload = agg.pkt.payload();
let mark = agg.pkt.mark();
let au_header = u16::from_be_bytes([payload[2 + (i << 1)], payload[3 + (i << 1)]]);
let size = usize::from(au_header >> 3);
let index = au_header & 0b111;
if index != 0 {
@ -698,7 +682,7 @@ impl Depacketizer {
"interleaving not yet supported".to_owned(),
));
}
if size > agg.buf.len() - agg.data_off {
if size > payload.len() - agg.data_off {
// start of fragment
if agg.frame_count != 1 {
return Err(error(
@ -708,7 +692,7 @@ impl Depacketizer {
"fragmented AUs must not share packets".to_owned(),
));
}
if agg.mark {
if mark {
if agg.loss_since_mark {
log::debug!(
"Discarding in-progress fragmented AAC frame due to loss of {} RTP packets.",
@ -728,9 +712,9 @@ impl Depacketizer {
));
}
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&agg.buf[agg.data_off..]);
buf.extend_from_slice(&payload[agg.data_off..]);
self.state = DepacketizerState::Fragmented(Fragment {
rtp_timestamp: agg.timestamp.timestamp as u16,
rtp_timestamp: agg.pkt.timestamp().timestamp as u16,
loss: agg.loss,
loss_since_mark: agg.loss_since_mark,
size: size as u16,
@ -738,7 +722,7 @@ impl Depacketizer {
});
return Ok(None);
}
if !agg.mark {
if !mark {
return Err(error(
*conn_ctx,
stream_ctx,
@ -748,11 +732,11 @@ impl Depacketizer {
}
let delta = u32::from(agg.frame_i) * u32::from(self.config.frame_length.get());
let agg_timestamp = agg.timestamp;
let agg_timestamp = agg.pkt.timestamp();
let frame = super::AudioFrame {
ctx: agg.ctx,
ctx: *agg.pkt.ctx(),
loss: agg.loss,
stream_id: agg.stream_id,
stream_id: agg.pkt.stream_id(),
frame_length: NonZeroU32::from(self.config.frame_length),
// u16 * u16 can't overflow u32, but i64 + u32 can overflow i64.
@ -770,7 +754,7 @@ impl Depacketizer {
))
}
},
data: agg.buf.slice(agg.data_off..agg.data_off + size),
data: Bytes::copy_from_slice(&payload[agg.data_off..agg.data_off + size]),
};
agg.loss = 0;
agg.data_off += size;
@ -793,16 +777,18 @@ fn error(
Error(std::sync::Arc::new(ErrorInt::RtpPacketError {
conn_ctx,
stream_ctx: stream_ctx.to_owned(),
pkt_ctx: agg.ctx,
stream_id: agg.stream_id,
ssrc: agg.ssrc,
sequence_number: agg.sequence_number,
pkt_ctx: *agg.pkt.ctx(),
stream_id: agg.pkt.stream_id(),
ssrc: agg.pkt.ssrc(),
sequence_number: agg.pkt.sequence_number(),
description,
}))
}
#[cfg(test)]
mod tests {
use crate::{rtp::ReceivedPacketBuilder, PacketContext};
use super::*;
#[test]
@ -834,23 +820,26 @@ mod tests {
};
// Single frame.
d.push(Packet {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
ctx: PacketContext::dummy(),
stream_id: 0,
sequence_number: 0,
timestamp,
payload_type: 0,
ssrc: 0,
mark: true,
loss: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x20, // AU-header: AU-size=4 + AU-index=0
b'a', b's', b'd', b'f',
]),
})
])
.unwrap(),
)
.unwrap();
let a = match d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -867,15 +856,18 @@ mod tests {
.is_none());
// Aggregate of 3 frames.
d.push(Packet {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x30, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 3 headers
@ -883,8 +875,9 @@ mod tests {
0x00, 0x18, // AU-header: AU-size=3 + AU-index-delta=0
0x00, 0x18, // AU-header: AU-size=3 + AU-index-delta=0
b'f', b'o', b'o', b'b', b'a', b'r', b'b', b'a', b'z',
]),
})
])
.unwrap(),
)
.unwrap();
let a = match d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -919,67 +912,79 @@ mod tests {
.is_none());
// Fragment across 3 packets.
d.push(Packet {
// fragment 1/3.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// fragment 1/3.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'f', b'o', b'o',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
.unwrap()
.is_none());
d.push(Packet {
// fragment 2/3.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// fragment 2/3.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'r',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
.unwrap()
.is_none());
d.push(Packet {
// fragment 3/3.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// fragment 3/3.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'z',
]),
})
])
.unwrap(),
)
.unwrap();
let a = match d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1011,43 +1016,51 @@ mod tests {
};
// Fragment
d.push(Packet {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: false,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: false,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'r',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
.unwrap()
.is_none());
d.push(Packet {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'z',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1055,23 +1068,27 @@ mod tests {
.is_none());
// Following frame reports the loss.
d.push(Packet {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x20, // AU-header: AU-size=4 + AU-index=0
b'a', b's', b'd', b'f',
]),
})
])
.unwrap(),
)
.unwrap();
let a = match d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1102,46 +1119,54 @@ mod tests {
start: 0,
};
d.push(Packet {
// 1/3
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// 1/3
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'f', b'o', b'o',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
.unwrap()
.is_none());
// Fragment 2/3 is lost
d.push(Packet {
// 3/3 reports the loss
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// 3/3 reports the loss
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'z',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1149,23 +1174,27 @@ mod tests {
.is_none());
// Following frame reports the loss.
d.push(Packet {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x20, // AU-header: AU-size=4 + AU-index=0
b'a', b's', b'd', b'f',
]),
})
])
.unwrap(),
)
.unwrap();
let a = match d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1196,46 +1225,54 @@ mod tests {
start: 0,
};
d.push(Packet {
// 1/3
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// 1/3
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'f', b'o', b'o',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
.unwrap()
.is_none());
// Fragment 2/3 is lost
d.push(Packet {
// 3/3 reports the loss
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// 3/3 reports the loss
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'z',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1243,23 +1280,27 @@ mod tests {
.is_none());
// Following frame reports the loss.
d.push(Packet {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// single frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x20, // AU-header: AU-size=4 + AU-index=0
b'a', b's', b'd', b'f',
]),
})
])
.unwrap(),
)
.unwrap();
let a = match d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1293,23 +1334,27 @@ mod tests {
start: 0,
};
d.push(Packet {
// end of previous fragment, first parts missing.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// end of previous fragment, first parts missing.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 1,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'r',
]),
})
])
.unwrap(),
)
.unwrap();
assert!(d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
@ -1317,23 +1362,27 @@ mod tests {
.is_none());
// Incomplete fragment with no reported loss.
d.push(Packet {
// end of previous fragment, first parts missing.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(&[
d.push(
ReceivedPacketBuilder {
// end of previous fragment, first parts missing.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build([
// https://datatracker.ietf.org/doc/html/rfc3640#section-3.2.1
0x00,
0x10, // AU-headers-length: 16 bits (13-bit size + 3-bit index) => 1 header
0x00, 0x48, // AU-header: AU-size=9 + AU-index=0
b'b', b'a', b'r',
]),
})
])
.unwrap(),
)
.unwrap();
let e = d
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())

View File

@ -36,32 +36,33 @@ impl Depacketizer {
}))
}
fn validate(pkt: &crate::client::rtp::Packet) -> bool {
let expected_hdr_bits = match pkt.payload.len() {
fn validate(pkt: &crate::rtp::ReceivedPacket) -> bool {
let payload = pkt.payload();
let expected_hdr_bits = match payload.len() {
24 => 0b00,
20 => 0b01,
4 => 0b10,
_ => return false,
};
let actual_hdr_bits = pkt.payload[0] & 0b11;
let actual_hdr_bits = payload[0] & 0b11;
actual_hdr_bits == expected_hdr_bits
}
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> {
pub(super) fn push(&mut self, pkt: crate::rtp::ReceivedPacket) -> Result<(), String> {
assert!(self.pending.is_none());
if !Self::validate(&pkt) {
return Err(format!(
"Invalid G.723 packet: {:#?}",
crate::hex::LimitedHex::new(&pkt.payload, 64),
crate::hex::LimitedHex::new(pkt.payload(), 64),
));
}
self.pending = Some(super::AudioFrame {
ctx: pkt.ctx,
loss: pkt.loss,
stream_id: pkt.stream_id,
timestamp: pkt.timestamp,
ctx: *pkt.ctx(),
loss: pkt.loss(),
stream_id: pkt.stream_id(),
timestamp: pkt.timestamp(),
frame_length: NonZeroU32::new(240).unwrap(),
data: pkt.payload,
data: pkt.into_payload_bytes(),
});
Ok(())
}

View File

@ -10,7 +10,10 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use h264_reader::nal::{NalHeader, UnitType};
use log::{debug, log_enabled, trace};
use crate::{client::rtp::Packet, Error, Timestamp};
use crate::{
rtp::{ReceivedPacket, ReceivedPacketBuilder},
Error, Timestamp,
};
use super::VideoFrame;
@ -130,7 +133,7 @@ impl Depacketizer {
.map(|p| super::Parameters::Video(p.generic_parameters.clone()))
}
pub(super) fn push(&mut self, pkt: Packet) -> Result<(), String> {
pub(super) fn push(&mut self, pkt: ReceivedPacket) -> Result<(), String> {
// Push shouldn't be called until pull is exhausted.
if let Some(p) = self.pending.as_ref() {
panic!("push with data already pending: {:?}", p);
@ -144,22 +147,23 @@ impl Depacketizer {
AccessUnit::start(&pkt, 0, false)
}
DepacketizerInputState::PreMark(mut access_unit) => {
if pkt.loss > 0 {
let loss = pkt.loss();
if loss > 0 {
self.nals.clear();
self.pieces.clear();
if access_unit.timestamp.timestamp == pkt.timestamp.timestamp {
if access_unit.timestamp.timestamp == pkt.timestamp().timestamp {
// Loss within this access unit. Ignore until mark or new timestamp.
self.input_state = if pkt.mark {
self.input_state = if pkt.mark() {
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
loss: pkt.loss,
timestamp: pkt.timestamp(),
loss,
}
} else {
self.pieces.clear();
self.nals.clear();
DepacketizerInputState::Loss {
timestamp: pkt.timestamp,
pkts: pkt.loss,
timestamp: pkt.timestamp(),
pkts: loss,
}
};
return Ok(());
@ -167,16 +171,17 @@ impl Depacketizer {
// A suffix of a previous access unit was lost; discard it.
// A prefix of the new one may have been lost; try parsing.
AccessUnit::start(&pkt, 0, false)
} else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp {
} else if access_unit.timestamp.timestamp != pkt.timestamp().timestamp {
if access_unit.in_fu_a {
return Err(format!(
"Timestamp changed from {} to {} in the middle of a fragmented NAL",
access_unit.timestamp, pkt.timestamp
access_unit.timestamp,
pkt.timestamp()
));
}
let last_nal_hdr = self.nals.last().unwrap().hdr;
if can_end_au(last_nal_hdr.nal_unit_type()) {
access_unit.end_ctx = pkt.ctx;
access_unit.end_ctx = *pkt.ctx();
self.pending =
Some(self.finalize_access_unit(access_unit, "ts change")?);
AccessUnit::start(&pkt, 0, false)
@ -185,7 +190,7 @@ impl Depacketizer {
"Bogus mid-access unit timestamp change after {:?}",
last_nal_hdr
);
access_unit.timestamp.timestamp = pkt.timestamp.timestamp;
access_unit.timestamp.timestamp = pkt.timestamp().timestamp;
access_unit
}
} else {
@ -198,7 +203,7 @@ impl Depacketizer {
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
AccessUnit::start(&pkt, loss, state_ts.timestamp == pkt.timestamp.timestamp)
AccessUnit::start(&pkt, loss, state_ts.timestamp == pkt.timestamp().timestamp)
}
DepacketizerInputState::Loss {
timestamp,
@ -206,8 +211,8 @@ impl Depacketizer {
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if pkt.timestamp.timestamp == timestamp.timestamp {
pkts += pkt.loss;
if pkt.timestamp().timestamp == timestamp.timestamp {
pkts += pkt.loss();
self.input_state = DepacketizerInputState::Loss { timestamp, pkts };
return Ok(());
}
@ -215,7 +220,11 @@ impl Depacketizer {
}
};
let mut data = pkt.payload;
let ctx = *pkt.ctx();
let mark = pkt.mark();
let loss = pkt.loss();
let timestamp = pkt.timestamp();
let mut data = pkt.into_payload_bytes();
if data.is_empty() {
return Err("Empty NAL".into());
}
@ -309,7 +318,7 @@ impl Depacketizer {
if (start && end) || reserved {
return Err(format!("Invalid FU-A header {:02x}", fu_header));
}
if !end && pkt.mark {
if !end && mark {
return Err("FU-A pkt with MARK && !END".into());
}
let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX");
@ -337,17 +346,17 @@ impl Depacketizer {
if end {
nal.next_piece_idx = pieces;
access_unit.in_fu_a = false;
} else if pkt.mark {
} else if mark {
return Err("FU-A has MARK and no END".into());
}
}
(false, false) => {
if pkt.loss > 0 {
if loss > 0 {
self.pieces.clear();
self.nals.clear();
self.input_state = DepacketizerInputState::Loss {
timestamp: pkt.timestamp,
pkts: pkt.loss,
timestamp,
pkts: loss,
};
return Ok(());
}
@ -357,21 +366,18 @@ impl Depacketizer {
}
_ => return Err(format!("bad nal header {:02x}", nal_header)),
}
self.input_state = if pkt.mark {
self.input_state = if mark {
let last_nal_hdr = self.nals.last().unwrap().hdr;
if can_end_au(last_nal_hdr.nal_unit_type()) {
access_unit.end_ctx = pkt.ctx;
access_unit.end_ctx = ctx;
self.pending = Some(self.finalize_access_unit(access_unit, "mark")?);
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
loss: 0,
}
DepacketizerInputState::PostMark { timestamp, loss: 0 }
} else {
log::debug!(
"Bogus mid-access unit timestamp change after {:?}",
last_nal_hdr
);
access_unit.timestamp.timestamp = pkt.timestamp.timestamp;
access_unit.timestamp.timestamp = timestamp.timestamp;
DepacketizerInputState::PreMark(access_unit)
}
} else {
@ -543,19 +549,19 @@ fn can_end_au(nal_unit_type: UnitType) -> bool {
impl AccessUnit {
fn start(
pkt: &crate::client::rtp::Packet,
pkt: &crate::rtp::ReceivedPacket,
additional_loss: u16,
same_ts_as_prev: bool,
) -> Self {
AccessUnit {
start_ctx: pkt.ctx,
end_ctx: pkt.ctx,
timestamp: pkt.timestamp,
stream_id: pkt.stream_id,
start_ctx: *pkt.ctx(),
end_ctx: *pkt.ctx(),
timestamp: pkt.timestamp(),
stream_id: pkt.stream_id(),
in_fu_a: false,
// TODO: overflow?
loss: pkt.loss + additional_loss,
loss: pkt.loss() + additional_loss,
same_ts_as_prev,
}
}
@ -806,6 +812,8 @@ pub struct Packetizer {
max_payload_size: u16,
next_sequence_number: u16,
stream_id: usize,
ssrc: u32,
payload_type: u8,
state: PacketizerState,
}
@ -814,6 +822,8 @@ impl Packetizer {
max_payload_size: u16,
stream_id: usize,
initial_sequence_number: u16,
payload_type: u8,
ssrc: u32,
) -> Result<Self, String> {
if max_payload_size < 3 {
// minimum size to make progress with FU-A packets.
@ -823,6 +833,8 @@ impl Packetizer {
max_payload_size,
stream_id,
next_sequence_number: initial_sequence_number,
ssrc,
payload_type,
state: PacketizerState::Idle,
})
}
@ -834,7 +846,7 @@ impl Packetizer {
}
// TODO: better error type?
pub fn pull(&mut self) -> Result<Option<Packet>, String> {
pub fn pull(&mut self) -> Result<Option<ReceivedPacket>, String> {
let max_payload_size = usize::from(self.max_payload_size);
match std::mem::replace(&mut self.state, PacketizerState::Idle) {
PacketizerState::Idle => Ok(None),
@ -867,11 +879,22 @@ impl Packetizer {
if usize_len > max_payload_size {
// start a FU-A.
data.advance(1);
let mut payload = Vec::with_capacity(max_payload_size);
let fu_indicator = (hdr.nal_ref_idc() << 5) | 28;
let fu_header = 0b1000_0000 | hdr.nal_unit_type().id(); // START bit set.
payload.extend_from_slice(&[fu_indicator, fu_header]);
payload.extend_from_slice(&data[..max_payload_size - 2]);
let payload = IntoIterator::into_iter([fu_indicator, fu_header])
.chain(data[..max_payload_size - 2].iter().copied());
// TODO: ctx and channel_id are placeholders.
let pkt = ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: self.ssrc,
sequence_number,
loss: 0,
mark: false,
payload_type: self.payload_type,
}
.build(payload)?;
data.advance(max_payload_size - 2);
self.state = PacketizerState::InFragment {
timestamp,
@ -879,17 +902,7 @@ impl Packetizer {
left: len + 1 - u32::from(self.max_payload_size),
data,
};
// TODO: ctx, channel_id, and ssrc are placeholders.
return Ok(Some(Packet {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: 0,
sequence_number,
loss: 0,
mark: false,
payload: Bytes::from(payload),
}));
return Ok(Some(pkt));
}
// Send a plain NAL packet. (TODO: consider using STAP-A.)
@ -903,16 +916,19 @@ impl Packetizer {
};
mark = false;
}
Ok(Some(Packet {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: 0,
sequence_number,
loss: 0,
mark,
payload: data,
}))
Ok(Some(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: self.ssrc,
sequence_number,
loss: 0,
mark,
payload_type: self.payload_type,
}
.build(data)?,
))
}
PacketizerState::InFragment {
timestamp,
@ -955,16 +971,19 @@ impl Packetizer {
}
}
// TODO: placeholders.
Ok(Some(Packet {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: 0,
sequence_number,
loss: 0,
mark,
payload: Bytes::from(payload),
}))
Ok(Some(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: self.ssrc,
sequence_number,
loss: 0,
mark,
payload_type: self.payload_type,
}
.build(payload)?,
))
}
}
}
@ -994,10 +1013,9 @@ enum PacketizerState {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use std::num::NonZeroU32;
use crate::{client::rtp::Packet, codec::CodecItem};
use crate::{codec::CodecItem, rtp::ReceivedPacketBuilder};
/*
* This test requires
@ -1060,69 +1078,89 @@ mod tests {
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet {
// plain SEI packet.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x06plain"),
})
d.push(
ReceivedPacketBuilder {
// plain SEI packet.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build(b"\x06plain".iter().copied())
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// STAP-A packet.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2"),
})
d.push(
ReceivedPacketBuilder {
// STAP-A packet.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, start.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x7c\x86fu-a start, "),
})
d.push(
ReceivedPacketBuilder {
// FU-A packet, start.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x7c\x86fu-a start, ")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, middle.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x7c\x06fu-a middle, "),
})
d.push(
ReceivedPacketBuilder {
// FU-A packet, middle.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x7c\x06fu-a middle, ")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// FU-A packet, end.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 4,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x7c\x46fu-a end"),
})
d.push(
ReceivedPacketBuilder {
// FU-A packet, end.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 4,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x7c\x46fu-a end")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
@ -1153,47 +1191,59 @@ mod tests {
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet {
// SPS with (incorrect) mark
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50"),
})
d.push(
ReceivedPacketBuilder {
// SPS with (incorrect) mark
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// PPS
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x68\xee\x3c\xb0"),
})
d.push(
ReceivedPacketBuilder {
// PPS
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xee\x3c\xb0")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// Slice layer without partitioning IDR.
// This has a different timestamp than the SPS and PPS, even though
// RFC 6184 section 5.1 says that "the timestamp must match that of
// the primary coded picture of the access unit and that the marker
// bit can only be set on the final packet of the access unit.""
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x65slice"),
})
d.push(
ReceivedPacketBuilder {
// Slice layer without partitioning IDR.
// This has a different timestamp than the SPS and PPS, even though
// RFC 6184 section 5.1 says that "the timestamp must match that of
// the primary coded picture of the access unit and that the marker
// bit can only be set on the final packet of the access unit.""
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
@ -1224,18 +1274,22 @@ mod tests {
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet {
// Slice layer without partitioning non-IDR, representing the
// last frame of the previous GOP.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x01slice"),
})
d.push(
ReceivedPacketBuilder {
// Slice layer without partitioning non-IDR, representing the
// last frame of the previous GOP.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x01slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
@ -1243,43 +1297,55 @@ mod tests {
};
assert_eq!(&frame.data()[..], b"\x00\x00\x00\x06\x01slice");
assert_eq!(frame.timestamp, ts1);
d.push(Packet {
// SPS with (incorrect) timestamp matching last frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false, // correctly has no mark, unlike first SPS in stream.
payload: Bytes::from_static(b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50"),
})
d.push(
ReceivedPacketBuilder {
// SPS with (incorrect) timestamp matching last frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false, // correctly has no mark, unlike first SPS in stream.
payload_type: 0,
}
.build(*b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// PPS, again with timestamp matching last frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x68\xee\x3c\xb0"),
})
d.push(
ReceivedPacketBuilder {
// PPS, again with timestamp matching last frame.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xee\x3c\xb0")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// Slice layer without partitioning IDR. Now correct timestamp.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x65slice"),
})
d.push(
ReceivedPacketBuilder {
// Slice layer without partitioning IDR. Now correct timestamp.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
@ -1308,7 +1374,7 @@ mod tests {
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet { // new SPS.
d.push(ReceivedPacketBuilder { // new SPS.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
@ -1316,33 +1382,41 @@ mod tests {
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10"),
}).unwrap();
payload_type: 0,
}.build(*b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10").unwrap()).unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// same PPS again.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x68\xee\x3c\x80"),
})
d.push(
ReceivedPacketBuilder {
// same PPS again.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xee\x3c\x80")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// dummy slice NAL to end the AU.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x65slice"),
})
d.push(
ReceivedPacketBuilder {
// dummy slice NAL to end the AU.
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65slice")
.unwrap(),
)
.unwrap();
// By codec::Depacketizer::parameters's contract, it's unspecified what the depacketizer
@ -1412,7 +1486,7 @@ mod tests {
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet {
d.push(ReceivedPacketBuilder {
// SPS
ctx: crate::PacketContext::dummy(),
stream_id: 0,
@ -1421,36 +1495,43 @@ mod tests {
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(
b"\x67\x4d\x00\x28\xe9\x00\xf0\x04\x4f\xcb\x08\x00\x00\x1f\x48\x00\x07\x54\xe0\x20",
),
})
payload_type: 0,
}.build(
*b"\x67\x4d\x00\x28\xe9\x00\xf0\x04\x4f\xcb\x08\x00\x00\x1f\x48\x00\x07\x54\xe0\x20",
).unwrap()).unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
// PPS
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xea\x8f\x20")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// PPS
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x68\xea\x8f\x20"),
})
.unwrap();
assert!(d.pull().is_none());
d.push(Packet {
// IDR slice
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x65idr slice"),
})
d.push(
ReceivedPacketBuilder {
// IDR slice
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65idr slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,

View File

@ -9,7 +9,7 @@
use std::num::{NonZeroU16, NonZeroU32};
use crate::client::rtp;
use crate::rtp::ReceivedPacket;
use crate::ConnectionContext;
use crate::Error;
use crate::StreamContextRef;
@ -191,7 +191,7 @@ pub struct AudioFrame {
pub timestamp: crate::Timestamp,
pub frame_length: NonZeroU32,
/// Number of lost RTP packets before this audio frame. See [crate::client::rtp::Packet::loss].
/// Number of lost RTP packets before this audio frame. See [crate::rtp::ReceivedPacket::loss].
/// Note that if loss occurs during a fragmented frame, more than this number of packets' worth
/// of data may be skipped.
pub loss: u16,
@ -237,8 +237,9 @@ pub struct MessageFrame {
pub timestamp: crate::Timestamp,
pub stream_id: usize,
/// Number of lost RTP packets before this message frame. See [crate::client::rtp::Packet::loss].
/// If this is non-zero, a prefix of the message may be missing.
/// Number of lost RTP packets before this message frame. See
/// [crate::rtp::ReceivedPacket::loss]. If this is non-zero, a prefix of the
/// message may be missing.
pub loss: u16,
// TODO: expose bytes or Buf (for zero-copy)?
@ -273,7 +274,7 @@ pub struct VideoFrame {
// parameters, see [`crate::client::Stream::parameters`].
pub new_parameters: Option<Box<VideoParameters>>,
/// Number of lost RTP packets before this video frame. See [crate::client::rtp::Packet::loss].
/// Number of lost RTP packets before this video frame. See [crate::rtp::ReceivedPacket::loss].
/// Note that if loss occurs during a fragmented frame, more than this number of packets' worth
/// of data may be skipped.
pub loss: u16,
@ -458,7 +459,7 @@ impl Depacketizer {
/// Depacketizers are not required to buffer unbounded numbers of packets. Between any two
/// calls to `push`, the caller must call `pull` until `pull` returns `Ok(None)`. The later
/// `push` call may panic or drop data if this expectation is violated.
pub fn push(&mut self, input: rtp::Packet) -> Result<(), String> {
pub fn push(&mut self, input: ReceivedPacket) -> Result<(), String> {
match &mut self.0 {
DepacketizerInner::Aac(d) => d.push(input),
DepacketizerInner::G723(d) => d.push(input),

View File

@ -57,54 +57,55 @@ impl Depacketizer {
)))
}
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> {
if pkt.loss > 0 {
pub(super) fn push(&mut self, pkt: crate::rtp::ReceivedPacket) -> Result<(), String> {
if pkt.loss() > 0 {
if let State::InProgress(in_progress) = &self.state {
log::debug!(
"Discarding {}-byte message prefix due to loss of {} RTP packets",
in_progress.data.len(),
pkt.loss
pkt.loss(),
);
self.state = State::Idle;
}
}
let mut in_progress = match std::mem::replace(&mut self.state, State::Idle) {
State::InProgress(in_progress) => {
if in_progress.timestamp.timestamp != pkt.timestamp.timestamp {
if in_progress.timestamp.timestamp != pkt.timestamp().timestamp {
return Err(format!(
"Timestamp changed from {} to {} with message in progress",
&in_progress.timestamp, &pkt.timestamp,
&in_progress.timestamp,
&pkt.timestamp(),
));
}
in_progress
}
State::Ready(..) => panic!("push while in state ready"),
State::Idle => {
if pkt.mark {
if pkt.mark() {
// fast-path: avoid copy.
self.state = State::Ready(super::MessageFrame {
stream_id: pkt.stream_id,
loss: pkt.loss,
ctx: pkt.ctx,
timestamp: pkt.timestamp,
data: pkt.payload,
stream_id: pkt.stream_id(),
loss: pkt.loss(),
ctx: *pkt.ctx(),
timestamp: pkt.timestamp(),
data: pkt.into_payload_bytes(),
});
return Ok(());
}
InProgress {
loss: pkt.loss,
ctx: pkt.ctx,
timestamp: pkt.timestamp,
loss: pkt.loss(),
ctx: *pkt.ctx(),
timestamp: pkt.timestamp(),
data: BytesMut::with_capacity(self.high_water_size),
}
}
};
in_progress.data.put(pkt.payload);
if pkt.mark {
in_progress.data.put(pkt.payload());
if pkt.mark() {
self.high_water_size =
std::cmp::max(self.high_water_size, in_progress.data.remaining());
self.state = State::Ready(super::MessageFrame {
stream_id: pkt.stream_id,
stream_id: pkt.stream_id(),
ctx: in_progress.ctx,
timestamp: in_progress.timestamp,
data: in_progress.data.freeze(),

View File

@ -48,22 +48,23 @@ impl Depacketizer {
}
}
pub(super) fn push(&mut self, pkt: crate::client::rtp::Packet) -> Result<(), String> {
pub(super) fn push(&mut self, pkt: crate::rtp::ReceivedPacket) -> Result<(), String> {
assert!(self.pending.is_none());
let frame_length = self.frame_length(pkt.payload.len()).ok_or_else(|| {
let payload = pkt.payload();
let frame_length = self.frame_length(payload.len()).ok_or_else(|| {
format!(
"invalid length {} for payload of {}-bit audio samples",
pkt.payload.len(),
payload.len(),
self.bits_per_sample
)
})?;
self.pending = Some(super::AudioFrame {
loss: pkt.loss,
ctx: pkt.ctx,
stream_id: pkt.stream_id,
timestamp: pkt.timestamp,
loss: pkt.loss(),
ctx: *pkt.ctx(),
stream_id: pkt.stream_id(),
timestamp: pkt.timestamp(),
frame_length,
data: pkt.payload,
data: pkt.into_payload_bytes(),
});
Ok(())
}

View File

@ -23,6 +23,8 @@ mod error;
mod rtcp;
mod hex;
pub mod rtp;
#[cfg(test)]
mod testutil;

344
src/rtp.rs Normal file
View File

@ -0,0 +1,344 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
//! Handles RTP data as described in
//! [RFC 3550 section 5.1](https://datatracker.ietf.org/doc/html/rfc3550#section-5.1).
use std::convert::TryFrom;
use std::ops::Range;
use bytes::{Buf, Bytes};
use crate::{PacketContext, Timestamp};
/// The minimum length of an RTP header (no CSRCs or extensions).
const MIN_HEADER_LEN: u16 = 12;
/// Raw packet without state-specific interpretation or metadata.
///
/// This design is inspired by [`rtp-rs`](https://crates.io/crates/rtp-rs) in
/// that it primarily validates a raw buffer then provides accessors for it.
/// Some differences though:
///
/// * allows keeping around the payload range (determined during
/// construction/validation) as a `Range<u16>` , rather than reconstructing
/// on later accesses.
/// * currently owns the `Bytes`, although this design may change when/if we
/// [redo the buffering
/// model](https://github.com/scottlamb/retina/issues/6).
/// * directly exposes the sequence number as a `u16`, rather than having an
/// extra type that I find awkward to work with.
pub(crate) struct RawPacket(
/// Full packet data, including headers.
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// |V=2|P|X| CC |M| PT | sequence number |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | timestamp |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | synchronization source (SSRC) identifier |
/// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
/// | contributing source (CSRC) identifiers |
/// | .... |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// ```
pub Bytes,
);
impl RawPacket {
/// Validates an RTP packet, returning a wrapper and the payload range.
///
/// The payload range is not part of the `RawPacket` to avoid extra padding
/// bytes within the containing `ReceivedPacket`.
pub fn new(data: Bytes) -> Result<(Self, Range<u16>), RawPacketError> {
// RTP doesn't have a defined maximum size but it's implied by the transport:
// * UDP packets (even with fragmentation) are at most 65,536 (minus IP/UDP headers).
// * interleaved RTSP data messages have at most 65,536 bytes of data.
let len = match u16::try_from(data.len()) {
Ok(l) => l,
Err(_) => {
return Err(RawPacketError {
reason: "too long",
data,
})
}
};
if len < MIN_HEADER_LEN {
return Err(RawPacketError {
reason: "too short",
data,
});
}
if (data[0] & 0b1100_0000) != 2 << 6 {
return Err(RawPacketError {
reason: "must be version 2",
data,
});
}
let has_padding = (data[0] & 0b0010_0000) != 0;
let has_extension = (data[0] & 0b0001_0000) != 0;
let csrc_count = data[0] & 0b0000_1111;
let csrc_end = MIN_HEADER_LEN + (4 * u16::from(csrc_count));
let payload_start = if has_extension {
if data.len() < usize::from(csrc_end + 4) {
return Err(RawPacketError {
reason: "extension is after end of packet",
data,
});
}
let extension_len = u16::from_be_bytes([
data[usize::from(csrc_end) + 1],
data[usize::from(csrc_end) + 2],
]);
match csrc_end.checked_add(extension_len) {
Some(s) => s,
None => {
return Err(RawPacketError {
reason: "extension extends beyond maximum packet size",
data,
})
}
}
} else {
csrc_end
};
if len < payload_start {
return Err(RawPacketError {
reason: "payload start is after end of packet",
data,
});
}
let payload_end = if has_padding {
if len == payload_start {
return Err(RawPacketError {
reason: "missing padding",
data,
});
}
let padding_len = u16::from(data[data.len() - 1]);
if padding_len == 0 {
return Err(RawPacketError {
reason: "invalid padding length 0",
data,
});
}
let payload_end = match len.checked_sub(padding_len) {
Some(e) => e,
None => {
return Err(RawPacketError {
reason: "padding larger than packet",
data,
})
}
};
if payload_end < payload_start {
return Err(RawPacketError {
reason: "bad padding",
data,
});
}
payload_end
} else {
len
};
Ok((Self(data), payload_start..payload_end))
}
#[inline]
pub fn mark(&self) -> bool {
(self.0[1] & 0b1000_0000) != 0
}
#[inline]
pub fn sequence_number(&self) -> u16 {
assert!(self.0.len() >= usize::from(MIN_HEADER_LEN));
u16::from_be_bytes([self.0[2], self.0[3]])
}
#[inline]
pub fn ssrc(&self) -> u32 {
assert!(self.0.len() >= usize::from(MIN_HEADER_LEN));
u32::from_be_bytes([self.0[8], self.0[9], self.0[10], self.0[11]])
}
#[inline]
pub fn payload_type(&self) -> u8 {
self.0[1] & 0b0111_1111
}
#[inline]
pub fn timestamp(&self) -> u32 {
assert!(self.0.len() >= usize::from(MIN_HEADER_LEN));
u32::from_be_bytes([self.0[4], self.0[5], self.0[6], self.0[7]])
}
}
#[derive(Debug)]
#[doc(hidden)]
pub struct RawPacketError {
pub reason: &'static str,
pub data: Bytes,
}
pub(crate) struct RawPacketBuilder {
pub sequence_number: u16,
pub timestamp: u32,
pub payload_type: u8,
pub ssrc: u32,
pub mark: bool,
}
impl RawPacketBuilder {
pub(crate) fn build<P: IntoIterator<Item = u8>>(
self,
payload: P,
) -> Result<(RawPacket, Range<u16>), &'static str> {
if self.payload_type >= 0x80 {
return Err("payload type too large");
}
let data: Bytes = IntoIterator::into_iter([
2 << 6, // version=2, no padding, no extensions, no CSRCs.
if self.mark { 0b1000_0000 } else { 0 } | self.payload_type,
])
.chain(self.sequence_number.to_be_bytes())
.chain(self.timestamp.to_be_bytes())
.chain(self.ssrc.to_be_bytes())
.chain(payload)
.collect();
let len = u16::try_from(data.len()).map_err(|_| "payload too long")?;
Ok((RawPacket(data), MIN_HEADER_LEN..len))
}
}
/// A received RTP packet.
///
/// This holds more information than the packet itself: also a
/// [`PacketContext`], the stream, and extended timestamp.
pub struct ReceivedPacket {
// Currently this is constructed from crate::client::rtp, so everything here
// is pub(crate).
pub(crate) ctx: PacketContext,
pub(crate) stream_id: usize,
pub(crate) timestamp: crate::Timestamp,
pub(crate) raw: RawPacket,
pub(crate) payload_range: Range<u16>,
// TODO: consider dropping this field in favor of a PacketItem::Loss.
// https://github.com/scottlamb/retina/issues/47
pub(crate) loss: u16,
}
impl std::fmt::Debug for ReceivedPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReceivedPacket")
.field("ctx", &self.ctx)
.field("stream_id", &self.stream_id)
.field("timestamp", &self.timestamp)
.field("ssrc", &self.raw.ssrc())
.field("sequence_number", &self.raw.sequence_number())
.field("mark", &self.raw.mark())
.field("payload", &crate::hex::LimitedHex::new(&self.payload(), 64))
.finish()
}
}
impl ReceivedPacket {
#[inline]
pub fn timestamp(&self) -> crate::Timestamp {
self.timestamp
}
#[inline]
pub fn mark(&self) -> bool {
self.raw.mark()
}
#[inline]
pub fn ctx(&self) -> &PacketContext {
&self.ctx
}
#[inline]
pub fn stream_id(&self) -> usize {
self.stream_id
}
#[inline]
pub fn ssrc(&self) -> u32 {
self.raw.ssrc()
}
#[inline]
pub fn sequence_number(&self) -> u16 {
self.raw.sequence_number()
}
/// Returns the raw bytes, including the RTP headers.
#[inline]
pub fn raw(&self) -> &[u8] {
&self.raw.0[..]
}
/// Returns only the payload bytes.
#[inline]
pub fn payload(&self) -> &[u8] {
&self.raw.0[usize::from(self.payload_range.start)..usize::from(self.payload_range.end)]
}
#[inline]
pub fn loss(&self) -> u16 {
self.loss
}
/// Consumes the `ReceivedPacket` and returns the `Payload` as a [`Bytes`].
///
/// This is currently is very efficient (no copying or reference-counting),
/// although that is not an API guarantee.
#[inline]
pub fn into_payload_bytes(self) -> Bytes {
let mut data = self.raw.0;
data.truncate(usize::from(self.payload_range.end));
data.advance(usize::from(self.payload_range.start));
data
}
}
/// Testing API; exposed for fuzz tests.
#[doc(hidden)]
pub struct ReceivedPacketBuilder {
pub ctx: PacketContext,
pub stream_id: usize,
pub sequence_number: u16,
pub timestamp: Timestamp,
pub payload_type: u8,
pub ssrc: u32,
pub mark: bool,
pub loss: u16,
}
impl ReceivedPacketBuilder {
pub fn build<P: IntoIterator<Item = u8>>(
self,
payload: P,
) -> Result<ReceivedPacket, &'static str> {
let (raw, payload_range) = RawPacketBuilder {
sequence_number: self.sequence_number,
timestamp: self.timestamp.timestamp as u32,
payload_type: self.payload_type,
ssrc: self.ssrc,
mark: self.mark,
}
.build(payload)?;
Ok(ReceivedPacket {
ctx: self.ctx,
stream_id: self.stream_id,
timestamp: self.timestamp,
raw,
payload_range,
loss: self.loss,
})
}
}