From af7e8a77fb02185519eb9764ac43612e45569943 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 24 Jun 2021 19:18:27 -0700 Subject: [PATCH] refactor h264 depacketizer * add a couple tests * fix #4: put the whole video frame (including multiple NALs) into a single Bytes. This is simpler to use and speeds up the depacketize/h264_aac_writevideo benchmark. It's also a behavior change; now I include non-VUI data such as SPS/PPS/SEI into the data. I think this is a better default but it might be worth making customizable. --- benches/depacketize.rs | 6 +- examples/client/mp4.rs | 9 +- src/codec/h264.rs | 535 ++++++++++++++++++++++++++++------------- src/codec/mod.rs | 62 +---- 4 files changed, 388 insertions(+), 224 deletions(-) diff --git a/benches/depacketize.rs b/benches/depacketize.rs index d180868..915b7d8 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -3,7 +3,6 @@ use std::num::NonZeroU16; -use bytes::Buf; use criterion::{criterion_group, criterion_main, Criterion}; use retina::client::{rtp::StrictSequenceChecker, Timeline}; use retina::codec::{CodecItem, Depacketizer}; @@ -78,10 +77,7 @@ fn criterion_benchmark(c: &mut Criterion) { CodecItem::VideoFrame(v) => v, _ => return, }; - let mut slices = [std::io::IoSlice::new(b""); 2]; - let n = v.chunks_vectored(&mut slices); - assert_eq!(n, 2); - assert_eq!(w.write_vectored(&slices[..]).unwrap(), v.remaining()); + w.write_all(&v.data()[..]).unwrap(); }) }) }); diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 2319146..dcd3df2 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -485,16 +485,16 @@ impl Mp4Writer { Ok(()) } - async fn video(&mut self, mut frame: retina::codec::VideoFrame) -> Result<(), failure::Error> { + async fn video(&mut self, frame: retina::codec::VideoFrame) -> Result<(), failure::Error> { println!( "{}: {}-byte video frame", &frame.timestamp, - frame.remaining() + frame.data().remaining(), ); if let Some(ref p) = frame.new_parameters { bail!("parameters change unimplemented. new parameters: {:#?}", p); } - let size = u32::try_from(frame.remaining())?; + let size = u32::try_from(frame.data().remaining())?; self.video_trak .add_sample(self.mdat_pos, size, frame.timestamp, frame.loss)?; self.mdat_pos = self @@ -505,7 +505,8 @@ impl Mp4Writer { self.video_sync_sample_nums .push(u32::try_from(self.video_trak.samples)?); } - write_all_buf(&mut self.inner, &mut frame).await?; + let mut data = frame.into_data(); + write_all_buf(&mut self.inner, &mut data).await?; Ok(()) } diff --git a/src/codec/h264.rs b/src/codec/h264.rs index 80db1ae..eedf66d 100644 --- a/src/codec/h264.rs +++ b/src/codec/h264.rs @@ -7,11 +7,13 @@ use std::convert::TryFrom; use bytes::{Buf, BufMut, Bytes, BytesMut}; use failure::{bail, format_err, Error}; -use h264_reader::nal::UnitType; +use h264_reader::nal::{NalHeader, UnitType}; use log::debug; use crate::client::rtp::Packet; +use super::VideoFrame; + /// A [super::Depacketizer] implementation which finds access unit boundaries /// and produces unfragmented NAL units as specified in [RFC /// 6184](https://tools.ietf.org/html/rfc6184). @@ -20,39 +22,50 @@ use crate::client::rtp::Packet; /// verify compliance with H.264 section 7.4.1.2.3 "Order of NAL units and coded /// pictures and association to access units". /// -/// Currently expects that the stream starts at an access unit boundary and has no lost packets. +/// Currently expects that the stream starts at an access unit boundary unless +/// packet loss is indicated. #[derive(Debug)] pub(crate) struct Depacketizer { input_state: DepacketizerInputState, - pending: Option, + + /// A complete video frame ready for pull. + pending: Option, + parameters: InternalParameters, - /// The largest fragment used. This is used for the buffer capacity on subsequent fragments, minimizing reallocation. - frag_high_water: usize, + /// In state `PreMark`, pieces of NALs, excluding their header bytes. + /// Kept around (empty) in other states to re-use the backing allocation. + pieces: Vec, + + /// In state `PreMark`, an entry for each NAL. + /// Kept around (empty) in other states to re-use the backing allocation. + nals: Vec, } +#[derive(Debug)] +struct Nal { + hdr: h264_reader::nal::NalHeader, + + /// The length of `Depacketizer::pieces` as this NAL finishes. + next_piece_idx: u32, + + /// The total length of this NAL, including the header byte. + len: u32, +} + +/// An access unit that is currently being accumulated during `PreMark` state. #[derive(Debug)] struct AccessUnit { start_ctx: crate::Context, end_ctx: crate::Context, timestamp: crate::Timestamp, stream_id: usize, - new_sps: Option, - new_pps: Option, + + /// True iff currently processing a FU-A. + in_fu_a: bool, /// RTP packets lost as this access unit was starting. loss: u16, - - /// Currently we expect only a single slice NAL. - picture: Option, -} - -#[derive(Debug)] -struct PreMark { - /// If a FU-A fragment is in progress, the buffer used to accumulate the NAL. - frag_buf: Option, - - access_unit: AccessUnit, } #[derive(Debug)] @@ -61,6 +74,7 @@ enum DepacketizerInputState { /// Not yet processing an access unit. New, + /// Ignoring the remainder of an access unit because of interior packet loss. Loss { timestamp: crate::Timestamp, pkts: u16, @@ -68,7 +82,7 @@ enum DepacketizerInputState { /// Currently processing an access unit. /// This will be flushed after a marked packet or when receiving a later timestamp. - PreMark(PreMark), + PreMark(AccessUnit), /// Finished processing the given packet. It's an error to receive the same timestamp again. PostMark { @@ -92,7 +106,8 @@ impl Depacketizer { Ok(Depacketizer { input_state: DepacketizerInputState::New, pending: None, - frag_high_water: 0, + pieces: Vec::new(), + nals: Vec::new(), parameters: InternalParameters::parse_format_specific_params(format_specific_params)?, }) } @@ -107,23 +122,22 @@ impl Depacketizer { panic!("push with data already pending: {:?}", p); } - // The rtp crate also has [H.264 depacketization - // logic](https://docs.rs/rtp/0.2.2/rtp/codecs/h264/struct.H264Packet.html), - // but it doesn't seem to match my use case. I want to iterate the NALs, - // not re-encode them in Annex B format. let seq = pkt.sequence_number; - let mut premark = match std::mem::replace( + let mut access_unit = match std::mem::replace( &mut self.input_state, DepacketizerInputState::New, ) { - DepacketizerInputState::New => PreMark { - access_unit: AccessUnit::start(&pkt), - frag_buf: None, - }, - DepacketizerInputState::PreMark(mut premark) => { + DepacketizerInputState::New => { + debug_assert!(self.nals.is_empty()); + debug_assert!(self.pieces.is_empty()); + AccessUnit::start(&pkt, 0) + } + DepacketizerInputState::PreMark(mut access_unit) => { if pkt.loss > 0 { - if premark.access_unit.timestamp.timestamp == pkt.timestamp.timestamp { + if access_unit.timestamp.timestamp == pkt.timestamp.timestamp { // Loss within this access unit. Ignore until mark or new timestamp. + self.nals.clear(); + self.pieces.clear(); self.input_state = if pkt.mark { DepacketizerInputState::PostMark { timestamp: pkt.timestamp, @@ -139,53 +153,41 @@ impl Depacketizer { } // A suffix of a previous access unit was lost; discard it. // A prefix of the new one may have been lost; try parsing. - PreMark { - access_unit: AccessUnit::start(&pkt), - frag_buf: None, + AccessUnit::start(&pkt, 0) + } else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp { + if !access_unit.in_fu_a { + bail!("Timestamp changed from {} to {} in the middle of a fragmented NAL at seq={:04x} {:#?}", access_unit.timestamp, pkt.timestamp, seq, &pkt.rtsp_ctx); } + access_unit.end_ctx = pkt.rtsp_ctx; + self.pending = Some(self.finalize_access_unit(access_unit)?); + AccessUnit::start(&pkt, 0) } else { - if premark.access_unit.timestamp.timestamp != pkt.timestamp.timestamp { - if premark.frag_buf.is_some() { - bail!("Timestamp changed from {} to {} in the middle of a fragmented NAL at seq={:04x} {:#?}", premark.access_unit.timestamp, pkt.timestamp, seq, &pkt.rtsp_ctx); - } - premark.access_unit.end_ctx = pkt.rtsp_ctx; - self.pending = Some(std::mem::replace( - &mut premark.access_unit, - AccessUnit::start(&pkt), - )); - } - premark + access_unit } } DepacketizerInputState::PostMark { timestamp: state_ts, loss, } => { + debug_assert!(self.nals.is_empty()); + debug_assert!(self.pieces.is_empty()); if state_ts.timestamp == pkt.timestamp.timestamp { bail!("Received packet with timestamp {} after marked packet with same timestamp at seq={:04x} {:#?}", pkt.timestamp, seq, &pkt.rtsp_ctx); } - let mut access_unit = AccessUnit::start(&pkt); - access_unit.loss += loss; - PreMark { - access_unit, - frag_buf: None, - } + AccessUnit::start(&pkt, loss) } DepacketizerInputState::Loss { timestamp, mut pkts, } => { + debug_assert!(self.nals.is_empty()); + debug_assert!(self.pieces.is_empty()); if pkt.timestamp.timestamp == timestamp.timestamp { pkts += pkt.loss; self.input_state = DepacketizerInputState::Loss { timestamp, pkts }; return Ok(()); } - let mut access_unit = AccessUnit::start(&pkt); - access_unit.loss += pkts; - PreMark { - access_unit, - frag_buf: None, - } + AccessUnit::start(&pkt, pkts) } }; @@ -202,20 +204,26 @@ impl Depacketizer { &pkt.rtsp_ctx ); } + data.advance(1); // skip the header byte. match nal_header & 0b11111 { 1..=23 => { - if premark.frag_buf.is_some() { + if access_unit.in_fu_a { bail!( "Non-fragmented NAL while fragment in progress seq {:04x} {:#?}", seq, &pkt.rtsp_ctx ); } - premark.access_unit.nal(&mut self.parameters, data)?; + let len = u32::try_from(data.len()).expect("data len < u16::MAX") + 1; + let next_piece_idx = self.add_piece(data)?; + self.nals.push(Nal { + hdr: NalHeader::new(nal_header).expect("header w/o F bit set is valid"), + next_piece_idx, + len, + }); } 24 => { // STAP-A. https://tools.ietf.org/html/rfc6184#section-5.7.1 - data.advance(1); // skip the header byte. loop { if data.remaining() < 2 { bail!( @@ -223,20 +231,39 @@ impl Depacketizer { data.remaining() ); } - let len = usize::from(data.get_u16()); - match data.remaining().cmp(&len) { + let len = data.get_u16(); + //let len = usize::from(data.get_u16()); + if len == 0 { + bail!("zero length in STAP-A"); + } + let hdr = + NalHeader::new(data[0]).map_err(|_| format_err!("bad header in STAP-A"))?; + match data.remaining().cmp(&usize::from(len)) { std::cmp::Ordering::Less => bail!( "STAP-A too short: {} bytes remaining, expecting {}-byte NAL", data.remaining(), len ), std::cmp::Ordering::Equal => { - premark.access_unit.nal(&mut self.parameters, data)?; + data.advance(1); + let next_piece_idx = self.add_piece(data)?; + self.nals.push(Nal { + hdr, + next_piece_idx, + len: u32::from(len), + }); break; } - std::cmp::Ordering::Greater => premark - .access_unit - .nal(&mut self.parameters, data.split_to(len))?, + std::cmp::Ordering::Greater => { + let mut piece = data.split_to(usize::from(len)); + piece.advance(1); + let next_piece_idx = self.add_piece(piece)?; + self.nals.push(Nal { + hdr, + next_piece_idx, + len: u32::from(len), + }); + } } } } @@ -248,14 +275,17 @@ impl Depacketizer { ), 28 => { // FU-A. https://tools.ietf.org/html/rfc6184#section-5.8 - if data.len() < 3 { + if data.len() < 2 { bail!("FU-A is too short at seq {:04x} {:#?}", seq, &pkt.rtsp_ctx); } - let fu_header = data[1]; + let fu_header = data[0]; let start = (fu_header & 0b10000000) != 0; let end = (fu_header & 0b01000000) != 0; let reserved = (fu_header & 0b00100000) != 0; - let nal_header = (nal_header & 0b011100000) | (fu_header & 0b00011111); + let nal_header = + NalHeader::new((nal_header & 0b011100000) | (fu_header & 0b00011111)) + .expect("NalHeader is valid"); + data.advance(1); if (start && end) || reserved { bail!( "Invalid FU-A header {:08b} at seq {:04x} {:#?}", @@ -264,44 +294,47 @@ impl Depacketizer { &pkt.rtsp_ctx ); } - match (start, premark.frag_buf.take()) { - (true, Some(_)) => bail!( + let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX"); + match (start, access_unit.in_fu_a) { + (true, true) => bail!( "FU-A with start bit while frag in progress at seq {:04x} {:#?}", seq, &pkt.rtsp_ctx ), - (true, None) => { - let mut frag_buf = BytesMut::with_capacity(std::cmp::max( - self.frag_high_water, - data.len() - 1, - )); - frag_buf.put_u8(nal_header); - data.advance(2); - frag_buf.put(data); - premark.frag_buf = Some(frag_buf); + (true, false) => { + self.add_piece(data)?; + self.nals.push(Nal { + hdr: nal_header, + next_piece_idx: u32::MAX, // should be overwritten later. + len: 1 + u32_len, + }); + access_unit.in_fu_a = true; } - (false, Some(mut frag_buf)) => { - if frag_buf[0] != nal_header { - bail!("FU-A has inconsistent NAL type: {:08b} then {:08b} at seq {:04x} {:#?}", frag_buf[0], nal_header, seq, &pkt.rtsp_ctx); + (false, true) => { + let pieces = self.add_piece(data)?; + let nal = self.nals.last_mut().expect("nals non-empty while in fu-a"); + if u8::from(nal_header) != u8::from(nal.hdr) { + bail!( + "FU-A has inconsistent NAL type: {:?} then {:?} at {:02x} {:?}", + nal.hdr, + nal_header, + seq, + &pkt.rtsp_ctx + ); } - data.advance(2); - frag_buf.put(data); + nal.len += u32_len; if end { - self.frag_high_water = frag_buf.len(); - premark - .access_unit - .nal(&mut self.parameters, frag_buf.freeze())?; + nal.next_piece_idx = pieces; + access_unit.in_fu_a = false; } else if pkt.mark { bail!( "FU-A with MARK and no END at seq {:04x} {:#?}", seq, pkt.rtsp_ctx ); - } else { - premark.frag_buf = Some(frag_buf); } } - (false, None) => { + (false, false) => { if pkt.loss > 0 { self.input_state = DepacketizerInputState::Loss { timestamp: pkt.timestamp, @@ -325,32 +358,85 @@ impl Depacketizer { ), } self.input_state = if pkt.mark { - premark.access_unit.end_ctx = pkt.rtsp_ctx; - self.pending = Some(premark.access_unit); + access_unit.end_ctx = pkt.rtsp_ctx; + self.pending = Some(self.finalize_access_unit(access_unit)?); DepacketizerInputState::PostMark { timestamp: pkt.timestamp, loss: 0, } } else { - DepacketizerInputState::PreMark(premark) + DepacketizerInputState::PreMark(access_unit) }; Ok(()) } pub(super) fn pull(&mut self) -> Result, Error> { - let pending = match self.pending.take() { - None => return Ok(None), - Some(p) => p, - }; - let new_parameters = if pending.new_sps.is_some() || pending.new_pps.is_some() { - let sps_nal = pending - .new_sps - .as_deref() - .unwrap_or(&self.parameters.sps_nal); - let pps_nal = pending - .new_pps - .as_deref() - .unwrap_or(&self.parameters.pps_nal); + Ok(self.pending.take().map(super::CodecItem::VideoFrame)) + } + + /// Adds a piece to `self.pieces`, erroring if it becomes absurdly large. + fn add_piece(&mut self, piece: Bytes) -> Result { + self.pieces.push(piece); + u32::try_from(self.pieces.len()).map_err(|_| format_err!("more than u32::MAX pieces!")) + } + + fn finalize_access_unit(&mut self, au: AccessUnit) -> Result { + let mut piece_idx = 0; + let mut retained_len = 0usize; + let mut is_random_access_point = false; + let mut is_disposable = true; + let mut new_sps = None; + let mut new_pps = None; + for nal in &self.nals { + let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize"); + let nal_pieces = &self.pieces[piece_idx..next_piece_idx]; + match nal.hdr.nal_unit_type() { + UnitType::SeqParameterSet => { + if !matches(&self.parameters.sps_nal[..], nal.hdr, nal_pieces) { + new_sps = Some(to_bytes(nal.hdr, nal.len, nal_pieces)); + } + } + UnitType::PicParameterSet => { + if !matches(&self.parameters.pps_nal[..], nal.hdr, nal_pieces) { + new_pps = Some(to_bytes(nal.hdr, nal.len, nal_pieces)); + } + } + UnitType::SliceLayerWithoutPartitioningIdr => is_random_access_point = true, + _ => {} + } + if nal.hdr.nal_ref_idc() != 0 { + is_disposable = false; + } + // TODO: support optionally filtering non-VUI NALs. + retained_len += 4usize + usize::try_from(nal.len).expect("u32 fits in usize"); + piece_idx = next_piece_idx; + } + let mut data = Vec::with_capacity(retained_len); + piece_idx = 0; + for nal in &self.nals { + let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize"); + let nal_pieces = &self.pieces[piece_idx..next_piece_idx]; + data.extend_from_slice(&nal.len.to_be_bytes()[..]); + data.push(nal.hdr.into()); + let mut actual_len = 1; + for piece in nal_pieces { + data.extend_from_slice(&piece[..]); + actual_len += piece.len(); + } + debug_assert_eq!( + usize::try_from(nal.len).expect("u32 fits in usize"), + actual_len + ); + piece_idx = next_piece_idx; + } + debug_assert_eq!(retained_len, data.len()); + let data = Bytes::from(data); + self.nals.clear(); + self.pieces.clear(); + + let new_parameters = if new_sps.is_some() || new_pps.is_some() { + let sps_nal = new_sps.as_deref().unwrap_or(&self.parameters.sps_nal); + let pps_nal = new_pps.as_deref().unwrap_or(&self.parameters.pps_nal); self.parameters = InternalParameters::parse_sps_and_pps(sps_nal, pps_nal)?; match self.parameters.generic_parameters { super::Parameters::Video(ref p) => Some(p.clone()), @@ -359,76 +445,32 @@ impl Depacketizer { } else { None }; - let picture = pending - .picture - .ok_or_else(|| format_err!("access unit has no picture"))?; - let nal_header = - h264_reader::nal::NalHeader::new(picture[0]).expect("nal header was previously valid"); - Ok(Some(super::CodecItem::VideoFrame(super::VideoFrame { - start_ctx: pending.start_ctx, - end_ctx: pending.end_ctx, - loss: pending.loss, + Ok(VideoFrame { new_parameters, - timestamp: pending.timestamp, - stream_id: pending.stream_id, - is_random_access_point: nal_header.nal_unit_type() - == UnitType::SliceLayerWithoutPartitioningIdr, - is_disposable: nal_header.nal_ref_idc() == 0, - pos: 0, - data_prefix: u32::try_from(picture.len()).unwrap().to_be_bytes(), - data: picture, - }))) + loss: au.loss, + start_ctx: au.start_ctx, + end_ctx: au.end_ctx, + timestamp: au.timestamp, + stream_id: au.stream_id, + is_random_access_point, + is_disposable, + data, + }) } } impl AccessUnit { - fn start(pkt: &crate::client::rtp::Packet) -> Self { + fn start(pkt: &crate::client::rtp::Packet, additional_loss: u16) -> Self { AccessUnit { start_ctx: pkt.rtsp_ctx, end_ctx: pkt.rtsp_ctx, timestamp: pkt.timestamp, stream_id: pkt.stream_id, - loss: pkt.loss, - new_sps: None, - new_pps: None, - picture: None, - } - } + in_fu_a: false, - fn nal(&mut self, parameters: &mut InternalParameters, nal: Bytes) -> Result<(), Error> { - if !nal.has_remaining() { - bail!("empty NAL"); + // TODO: overflow? + loss: pkt.loss + additional_loss, } - let nal_header = h264_reader::nal::NalHeader::new(nal[0]) - .map_err(|e| format_err!("bad NAL header 0x{:x}: {:#?}", nal[0], e))?; - let unit_type = nal_header.nal_unit_type(); - match unit_type { - UnitType::SeqParameterSet => { - if self.new_sps.is_some() { - bail!("multiple SPSs in access unit"); - } - if nal != parameters.sps_nal { - self.new_sps = Some(nal); - } - } - UnitType::PicParameterSet => { - if self.new_pps.is_some() { - bail!("multiple PPSs in access unit"); - } - if nal != parameters.pps_nal { - self.new_pps = Some(nal); - } - } - UnitType::SliceLayerWithoutPartitioningIdr - | UnitType::SliceLayerWithoutPartitioningNonIdr => { - if self.picture.is_some() { - bail!("currently expect only one picture NAL per access unit"); - } - self.picture = Some(nal); - } - _ => {} - } - Ok(()) } } @@ -582,8 +624,169 @@ impl InternalParameters { } } +/// Returns true iff the bytes of `nal` equal the bytes of `[hdr, ..data]`. +fn matches(nal: &[u8], hdr: NalHeader, pieces: &[Bytes]) -> bool { + if nal.is_empty() || nal[0] != u8::from(hdr) { + return false; + } + let mut nal_pos = 1; + for piece in pieces { + let new_pos = nal_pos + piece.len(); + if nal.len() < new_pos { + return false; + } + if &piece[..] != &nal[nal_pos..new_pos] { + return false; + } + nal_pos = new_pos; + } + nal_pos == nal.len() +} + +/// Saves the given NAL to a contiguous Bytes. +fn to_bytes(hdr: NalHeader, len: u32, pieces: &[Bytes]) -> Bytes { + let len = usize::try_from(len).expect("u32 fits in usize"); + let mut out = Vec::with_capacity(len); + out.push(hdr.into()); + for piece in pieces { + out.extend_from_slice(&piece[..]); + } + debug_assert_eq!(len, out.len()); + out.into() +} + #[cfg(test)] mod tests { + use std::num::NonZeroU32; + + use bytes::Bytes; + + use crate::{client::rtp::Packet, codec::CodecItem}; + + #[test] + fn depacketize() { + let mut d = super::Depacketizer::new(90_000, Some("packetization-mode=1;profile-level-id=64001E;sprop-parameter-sets=Z2QAHqwsaoLA9puCgIKgAAADACAAAAMD0IAA,aO4xshsA")).unwrap(); + let timestamp = crate::Timestamp { + timestamp: 0, + clock_rate: NonZeroU32::new(90_000).unwrap(), + start: 0, + }; + d.push(Packet { + // plain SEI packet. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 0, + loss: 0, + mark: false, + payload: Bytes::from_static(b"\x06plain"), + }) + .unwrap(); + assert!(d.pull().unwrap().is_none()); + d.push(Packet { + // STAP-A packet. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 1, + loss: 0, + mark: false, + payload: Bytes::from_static(b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2"), + }) + .unwrap(); + assert!(d.pull().unwrap().is_none()); + d.push(Packet { + // FU-A packet, start. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 2, + loss: 0, + mark: false, + payload: Bytes::from_static(b"\x7c\x86fu-a start, "), + }) + .unwrap(); + assert!(d.pull().unwrap().is_none()); + d.push(Packet { + // FU-A packet, middle. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 3, + loss: 0, + mark: false, + payload: Bytes::from_static(b"\x7c\x06fu-a middle, "), + }) + .unwrap(); + assert!(d.pull().unwrap().is_none()); + d.push(Packet { + // FU-A packet, end. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 4, + loss: 0, + mark: true, + payload: Bytes::from_static(b"\x7c\x46fu-a end"), + }) + .unwrap(); + let frame = match d.pull() { + Ok(Some(CodecItem::VideoFrame(frame))) => frame, + _ => panic!(), + }; + assert_eq!( + &frame.data()[..], + b"\x00\x00\x00\x06\x06plain\ + \x00\x00\x00\x09\x06stap-a 1\ + \x00\x00\x00\x09\x06stap-a 2\ + \x00\x00\x00\x22\x66fu-a start, fu-a middle, fu-a end" + ); + } + + #[test] + fn depacketize_parameter_change() { + let mut d = super::Depacketizer::new(90_000, Some("a=fmtp:96 profile-level-id=420029; packetization-mode=1; sprop-parameter-sets=Z01AHppkBYHv/lBgYGQAAA+gAAE4gBA=,aO48gA==")).unwrap(); + match d.parameters() { + Some(crate::codec::Parameters::Video(v)) => { + assert_eq!(v.pixel_dimensions(), (704, 480)); + } + _ => unreachable!(), + } + let timestamp = crate::Timestamp { + timestamp: 0, + clock_rate: NonZeroU32::new(90_000).unwrap(), + start: 0, + }; + d.push(Packet { // new SPS. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 0, + loss: 0, + mark: false, + payload: Bytes::from_static(b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10"), + }).unwrap(); + assert!(d.pull().unwrap().is_none()); + d.push(Packet { + // same PPS again. + rtsp_ctx: crate::Context::dummy(), + stream_id: 0, + timestamp, + sequence_number: 1, + loss: 0, + mark: true, + payload: Bytes::from_static(b"\x68\xee\x3c\x80"), + }) + .unwrap(); + let frame = match d.pull() { + Ok(Some(CodecItem::VideoFrame(frame))) => frame, + _ => panic!(), + }; + assert!(frame.new_parameters.is_some()); + let p = frame.new_parameters.unwrap(); + assert_eq!(p.pixel_dimensions(), (640, 480)); + } + #[test] fn gw_security() { let params = super::InternalParameters::parse_format_specific_params( diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 8504def..b65b4aa 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -259,25 +259,29 @@ pub struct VideoFrame { /// In H.264 terms, this is a frame with `nal_ref_idc == 0`. pub is_disposable: bool, - /// Position within `concat(data_prefix, data)`. - pos: u32, - - data_prefix: [u8; 4], - - /// Frame content in the requested format. Currently in a single [bytes::Bytes] - /// allocation, but this may change when supporting H.264 partitioned slices - /// or if we revise the fragmentation implementation. data: bytes::Bytes, } impl VideoFrame { + #[inline] pub fn start_ctx(&self) -> crate::Context { self.start_ctx } + #[inline] pub fn end_ctx(&self) -> crate::Context { self.end_ctx } + + #[inline] + pub fn data(&self) -> &Bytes { + &self.data + } + + #[inline] + pub fn into_data(self) -> Bytes { + self.data + } } impl std::fmt::Debug for VideoFrame { @@ -291,52 +295,12 @@ impl std::fmt::Debug for VideoFrame { .field("new_parameters", &self.new_parameters) .field("is_random_access_point", &self.is_random_access_point) .field("is_disposable", &self.is_disposable) - .field("pos", &self.pos) - .field("data_len", &(self.data.len() + 4)) + .field("data_len", &self.data.len()) //.field("data", &self.data.hex_dump()) .finish() } } -impl bytes::Buf for VideoFrame { - fn remaining(&self) -> usize { - self.data.len() + 4 - (self.pos as usize) - } - - fn chunk(&self) -> &[u8] { - let pos = self.pos as usize; - if let Some(pos_within_data) = pos.checked_sub(4) { - &self.data[pos_within_data..] - } else { - &self.data_prefix[pos..] - } - } - - fn advance(&mut self, cnt: usize) { - assert!((self.pos as usize) + cnt <= 4 + self.data.len()); - self.pos += cnt as u32; - } - - fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { - match dst.len() { - 0 => 0, - 1 => { - dst[0] = std::io::IoSlice::new(self.chunk()); - 1 - } - _ if self.pos < 4 => { - dst[0] = std::io::IoSlice::new(&self.data_prefix[self.pos as usize..]); - dst[1] = std::io::IoSlice::new(&self.data); - 2 - } - _ => { - dst[0] = std::io::IoSlice::new(&self.data[(self.pos - 4) as usize..]); - 1 - } - } - } -} - /// Turns RTP packets into [CodecItem]s. /// This interface unstable and for internal use; it's exposed for direct fuzzing and benchmarking. #[doc(hidden)]