refactor h264 depacketizer

* add a couple tests

* fix #4: put the whole video frame (including multiple NALs) into
  a single Bytes. This is simpler to use and speeds up the
  depacketize/h264_aac_writevideo benchmark. It's also a behavior
  change; now I include non-VUI data such as SPS/PPS/SEI into the
  data. I think this is a better default but it might be worth
  making customizable.
This commit is contained in:
Scott Lamb 2021-06-24 19:18:27 -07:00
parent 4fe885fd6a
commit af7e8a77fb
4 changed files with 388 additions and 224 deletions

View File

@ -3,7 +3,6 @@
use std::num::NonZeroU16;
use bytes::Buf;
use criterion::{criterion_group, criterion_main, Criterion};
use retina::client::{rtp::StrictSequenceChecker, Timeline};
use retina::codec::{CodecItem, Depacketizer};
@ -78,10 +77,7 @@ fn criterion_benchmark(c: &mut Criterion) {
CodecItem::VideoFrame(v) => v,
_ => return,
};
let mut slices = [std::io::IoSlice::new(b""); 2];
let n = v.chunks_vectored(&mut slices);
assert_eq!(n, 2);
assert_eq!(w.write_vectored(&slices[..]).unwrap(), v.remaining());
w.write_all(&v.data()[..]).unwrap();
})
})
});

View File

@ -485,16 +485,16 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
Ok(())
}
async fn video(&mut self, mut frame: retina::codec::VideoFrame) -> Result<(), failure::Error> {
async fn video(&mut self, frame: retina::codec::VideoFrame) -> Result<(), failure::Error> {
println!(
"{}: {}-byte video frame",
&frame.timestamp,
frame.remaining()
frame.data().remaining(),
);
if let Some(ref p) = frame.new_parameters {
bail!("parameters change unimplemented. new parameters: {:#?}", p);
}
let size = u32::try_from(frame.remaining())?;
let size = u32::try_from(frame.data().remaining())?;
self.video_trak
.add_sample(self.mdat_pos, size, frame.timestamp, frame.loss)?;
self.mdat_pos = self
@ -505,7 +505,8 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
self.video_sync_sample_nums
.push(u32::try_from(self.video_trak.samples)?);
}
write_all_buf(&mut self.inner, &mut frame).await?;
let mut data = frame.into_data();
write_all_buf(&mut self.inner, &mut data).await?;
Ok(())
}

View File

@ -7,11 +7,13 @@ use std::convert::TryFrom;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use failure::{bail, format_err, Error};
use h264_reader::nal::UnitType;
use h264_reader::nal::{NalHeader, UnitType};
use log::debug;
use crate::client::rtp::Packet;
use super::VideoFrame;
/// A [super::Depacketizer] implementation which finds access unit boundaries
/// and produces unfragmented NAL units as specified in [RFC
/// 6184](https://tools.ietf.org/html/rfc6184).
@ -20,39 +22,50 @@ use crate::client::rtp::Packet;
/// verify compliance with H.264 section 7.4.1.2.3 "Order of NAL units and coded
/// pictures and association to access units".
///
/// Currently expects that the stream starts at an access unit boundary and has no lost packets.
/// Currently expects that the stream starts at an access unit boundary unless
/// packet loss is indicated.
#[derive(Debug)]
pub(crate) struct Depacketizer {
input_state: DepacketizerInputState,
pending: Option<AccessUnit>,
/// A complete video frame ready for pull.
pending: Option<VideoFrame>,
parameters: InternalParameters,
/// The largest fragment used. This is used for the buffer capacity on subsequent fragments, minimizing reallocation.
frag_high_water: usize,
/// In state `PreMark`, pieces of NALs, excluding their header bytes.
/// Kept around (empty) in other states to re-use the backing allocation.
pieces: Vec<Bytes>,
/// In state `PreMark`, an entry for each NAL.
/// Kept around (empty) in other states to re-use the backing allocation.
nals: Vec<Nal>,
}
#[derive(Debug)]
struct Nal {
hdr: h264_reader::nal::NalHeader,
/// The length of `Depacketizer::pieces` as this NAL finishes.
next_piece_idx: u32,
/// The total length of this NAL, including the header byte.
len: u32,
}
/// An access unit that is currently being accumulated during `PreMark` state.
#[derive(Debug)]
struct AccessUnit {
start_ctx: crate::Context,
end_ctx: crate::Context,
timestamp: crate::Timestamp,
stream_id: usize,
new_sps: Option<Bytes>,
new_pps: Option<Bytes>,
/// True iff currently processing a FU-A.
in_fu_a: bool,
/// RTP packets lost as this access unit was starting.
loss: u16,
/// Currently we expect only a single slice NAL.
picture: Option<Bytes>,
}
#[derive(Debug)]
struct PreMark {
/// If a FU-A fragment is in progress, the buffer used to accumulate the NAL.
frag_buf: Option<BytesMut>,
access_unit: AccessUnit,
}
#[derive(Debug)]
@ -61,6 +74,7 @@ enum DepacketizerInputState {
/// Not yet processing an access unit.
New,
/// Ignoring the remainder of an access unit because of interior packet loss.
Loss {
timestamp: crate::Timestamp,
pkts: u16,
@ -68,7 +82,7 @@ enum DepacketizerInputState {
/// Currently processing an access unit.
/// This will be flushed after a marked packet or when receiving a later timestamp.
PreMark(PreMark),
PreMark(AccessUnit),
/// Finished processing the given packet. It's an error to receive the same timestamp again.
PostMark {
@ -92,7 +106,8 @@ impl Depacketizer {
Ok(Depacketizer {
input_state: DepacketizerInputState::New,
pending: None,
frag_high_water: 0,
pieces: Vec::new(),
nals: Vec::new(),
parameters: InternalParameters::parse_format_specific_params(format_specific_params)?,
})
}
@ -107,23 +122,22 @@ impl Depacketizer {
panic!("push with data already pending: {:?}", p);
}
// The rtp crate also has [H.264 depacketization
// logic](https://docs.rs/rtp/0.2.2/rtp/codecs/h264/struct.H264Packet.html),
// but it doesn't seem to match my use case. I want to iterate the NALs,
// not re-encode them in Annex B format.
let seq = pkt.sequence_number;
let mut premark = match std::mem::replace(
let mut access_unit = match std::mem::replace(
&mut self.input_state,
DepacketizerInputState::New,
) {
DepacketizerInputState::New => PreMark {
access_unit: AccessUnit::start(&pkt),
frag_buf: None,
},
DepacketizerInputState::PreMark(mut premark) => {
DepacketizerInputState::New => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
AccessUnit::start(&pkt, 0)
}
DepacketizerInputState::PreMark(mut access_unit) => {
if pkt.loss > 0 {
if premark.access_unit.timestamp.timestamp == pkt.timestamp.timestamp {
if access_unit.timestamp.timestamp == pkt.timestamp.timestamp {
// Loss within this access unit. Ignore until mark or new timestamp.
self.nals.clear();
self.pieces.clear();
self.input_state = if pkt.mark {
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
@ -139,53 +153,41 @@ impl Depacketizer {
}
// A suffix of a previous access unit was lost; discard it.
// A prefix of the new one may have been lost; try parsing.
PreMark {
access_unit: AccessUnit::start(&pkt),
frag_buf: None,
AccessUnit::start(&pkt, 0)
} else if access_unit.timestamp.timestamp != pkt.timestamp.timestamp {
if !access_unit.in_fu_a {
bail!("Timestamp changed from {} to {} in the middle of a fragmented NAL at seq={:04x} {:#?}", access_unit.timestamp, pkt.timestamp, seq, &pkt.rtsp_ctx);
}
access_unit.end_ctx = pkt.rtsp_ctx;
self.pending = Some(self.finalize_access_unit(access_unit)?);
AccessUnit::start(&pkt, 0)
} else {
if premark.access_unit.timestamp.timestamp != pkt.timestamp.timestamp {
if premark.frag_buf.is_some() {
bail!("Timestamp changed from {} to {} in the middle of a fragmented NAL at seq={:04x} {:#?}", premark.access_unit.timestamp, pkt.timestamp, seq, &pkt.rtsp_ctx);
}
premark.access_unit.end_ctx = pkt.rtsp_ctx;
self.pending = Some(std::mem::replace(
&mut premark.access_unit,
AccessUnit::start(&pkt),
));
}
premark
access_unit
}
}
DepacketizerInputState::PostMark {
timestamp: state_ts,
loss,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if state_ts.timestamp == pkt.timestamp.timestamp {
bail!("Received packet with timestamp {} after marked packet with same timestamp at seq={:04x} {:#?}", pkt.timestamp, seq, &pkt.rtsp_ctx);
}
let mut access_unit = AccessUnit::start(&pkt);
access_unit.loss += loss;
PreMark {
access_unit,
frag_buf: None,
}
AccessUnit::start(&pkt, loss)
}
DepacketizerInputState::Loss {
timestamp,
mut pkts,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if pkt.timestamp.timestamp == timestamp.timestamp {
pkts += pkt.loss;
self.input_state = DepacketizerInputState::Loss { timestamp, pkts };
return Ok(());
}
let mut access_unit = AccessUnit::start(&pkt);
access_unit.loss += pkts;
PreMark {
access_unit,
frag_buf: None,
}
AccessUnit::start(&pkt, pkts)
}
};
@ -202,20 +204,26 @@ impl Depacketizer {
&pkt.rtsp_ctx
);
}
data.advance(1); // skip the header byte.
match nal_header & 0b11111 {
1..=23 => {
if premark.frag_buf.is_some() {
if access_unit.in_fu_a {
bail!(
"Non-fragmented NAL while fragment in progress seq {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
);
}
premark.access_unit.nal(&mut self.parameters, data)?;
let len = u32::try_from(data.len()).expect("data len < u16::MAX") + 1;
let next_piece_idx = self.add_piece(data)?;
self.nals.push(Nal {
hdr: NalHeader::new(nal_header).expect("header w/o F bit set is valid"),
next_piece_idx,
len,
});
}
24 => {
// STAP-A. https://tools.ietf.org/html/rfc6184#section-5.7.1
data.advance(1); // skip the header byte.
loop {
if data.remaining() < 2 {
bail!(
@ -223,20 +231,39 @@ impl Depacketizer {
data.remaining()
);
}
let len = usize::from(data.get_u16());
match data.remaining().cmp(&len) {
let len = data.get_u16();
//let len = usize::from(data.get_u16());
if len == 0 {
bail!("zero length in STAP-A");
}
let hdr =
NalHeader::new(data[0]).map_err(|_| format_err!("bad header in STAP-A"))?;
match data.remaining().cmp(&usize::from(len)) {
std::cmp::Ordering::Less => bail!(
"STAP-A too short: {} bytes remaining, expecting {}-byte NAL",
data.remaining(),
len
),
std::cmp::Ordering::Equal => {
premark.access_unit.nal(&mut self.parameters, data)?;
data.advance(1);
let next_piece_idx = self.add_piece(data)?;
self.nals.push(Nal {
hdr,
next_piece_idx,
len: u32::from(len),
});
break;
}
std::cmp::Ordering::Greater => premark
.access_unit
.nal(&mut self.parameters, data.split_to(len))?,
std::cmp::Ordering::Greater => {
let mut piece = data.split_to(usize::from(len));
piece.advance(1);
let next_piece_idx = self.add_piece(piece)?;
self.nals.push(Nal {
hdr,
next_piece_idx,
len: u32::from(len),
});
}
}
}
}
@ -248,14 +275,17 @@ impl Depacketizer {
),
28 => {
// FU-A. https://tools.ietf.org/html/rfc6184#section-5.8
if data.len() < 3 {
if data.len() < 2 {
bail!("FU-A is too short at seq {:04x} {:#?}", seq, &pkt.rtsp_ctx);
}
let fu_header = data[1];
let fu_header = data[0];
let start = (fu_header & 0b10000000) != 0;
let end = (fu_header & 0b01000000) != 0;
let reserved = (fu_header & 0b00100000) != 0;
let nal_header = (nal_header & 0b011100000) | (fu_header & 0b00011111);
let nal_header =
NalHeader::new((nal_header & 0b011100000) | (fu_header & 0b00011111))
.expect("NalHeader is valid");
data.advance(1);
if (start && end) || reserved {
bail!(
"Invalid FU-A header {:08b} at seq {:04x} {:#?}",
@ -264,44 +294,47 @@ impl Depacketizer {
&pkt.rtsp_ctx
);
}
match (start, premark.frag_buf.take()) {
(true, Some(_)) => bail!(
let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX");
match (start, access_unit.in_fu_a) {
(true, true) => bail!(
"FU-A with start bit while frag in progress at seq {:04x} {:#?}",
seq,
&pkt.rtsp_ctx
),
(true, None) => {
let mut frag_buf = BytesMut::with_capacity(std::cmp::max(
self.frag_high_water,
data.len() - 1,
));
frag_buf.put_u8(nal_header);
data.advance(2);
frag_buf.put(data);
premark.frag_buf = Some(frag_buf);
(true, false) => {
self.add_piece(data)?;
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: u32::MAX, // should be overwritten later.
len: 1 + u32_len,
});
access_unit.in_fu_a = true;
}
(false, Some(mut frag_buf)) => {
if frag_buf[0] != nal_header {
bail!("FU-A has inconsistent NAL type: {:08b} then {:08b} at seq {:04x} {:#?}", frag_buf[0], nal_header, seq, &pkt.rtsp_ctx);
(false, true) => {
let pieces = self.add_piece(data)?;
let nal = self.nals.last_mut().expect("nals non-empty while in fu-a");
if u8::from(nal_header) != u8::from(nal.hdr) {
bail!(
"FU-A has inconsistent NAL type: {:?} then {:?} at {:02x} {:?}",
nal.hdr,
nal_header,
seq,
&pkt.rtsp_ctx
);
}
data.advance(2);
frag_buf.put(data);
nal.len += u32_len;
if end {
self.frag_high_water = frag_buf.len();
premark
.access_unit
.nal(&mut self.parameters, frag_buf.freeze())?;
nal.next_piece_idx = pieces;
access_unit.in_fu_a = false;
} else if pkt.mark {
bail!(
"FU-A with MARK and no END at seq {:04x} {:#?}",
seq,
pkt.rtsp_ctx
);
} else {
premark.frag_buf = Some(frag_buf);
}
}
(false, None) => {
(false, false) => {
if pkt.loss > 0 {
self.input_state = DepacketizerInputState::Loss {
timestamp: pkt.timestamp,
@ -325,32 +358,85 @@ impl Depacketizer {
),
}
self.input_state = if pkt.mark {
premark.access_unit.end_ctx = pkt.rtsp_ctx;
self.pending = Some(premark.access_unit);
access_unit.end_ctx = pkt.rtsp_ctx;
self.pending = Some(self.finalize_access_unit(access_unit)?);
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp,
loss: 0,
}
} else {
DepacketizerInputState::PreMark(premark)
DepacketizerInputState::PreMark(access_unit)
};
Ok(())
}
pub(super) fn pull(&mut self) -> Result<Option<super::CodecItem>, Error> {
let pending = match self.pending.take() {
None => return Ok(None),
Some(p) => p,
};
let new_parameters = if pending.new_sps.is_some() || pending.new_pps.is_some() {
let sps_nal = pending
.new_sps
.as_deref()
.unwrap_or(&self.parameters.sps_nal);
let pps_nal = pending
.new_pps
.as_deref()
.unwrap_or(&self.parameters.pps_nal);
Ok(self.pending.take().map(super::CodecItem::VideoFrame))
}
/// Adds a piece to `self.pieces`, erroring if it becomes absurdly large.
fn add_piece(&mut self, piece: Bytes) -> Result<u32, Error> {
self.pieces.push(piece);
u32::try_from(self.pieces.len()).map_err(|_| format_err!("more than u32::MAX pieces!"))
}
fn finalize_access_unit(&mut self, au: AccessUnit) -> Result<VideoFrame, Error> {
let mut piece_idx = 0;
let mut retained_len = 0usize;
let mut is_random_access_point = false;
let mut is_disposable = true;
let mut new_sps = None;
let mut new_pps = None;
for nal in &self.nals {
let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize");
let nal_pieces = &self.pieces[piece_idx..next_piece_idx];
match nal.hdr.nal_unit_type() {
UnitType::SeqParameterSet => {
if !matches(&self.parameters.sps_nal[..], nal.hdr, nal_pieces) {
new_sps = Some(to_bytes(nal.hdr, nal.len, nal_pieces));
}
}
UnitType::PicParameterSet => {
if !matches(&self.parameters.pps_nal[..], nal.hdr, nal_pieces) {
new_pps = Some(to_bytes(nal.hdr, nal.len, nal_pieces));
}
}
UnitType::SliceLayerWithoutPartitioningIdr => is_random_access_point = true,
_ => {}
}
if nal.hdr.nal_ref_idc() != 0 {
is_disposable = false;
}
// TODO: support optionally filtering non-VUI NALs.
retained_len += 4usize + usize::try_from(nal.len).expect("u32 fits in usize");
piece_idx = next_piece_idx;
}
let mut data = Vec::with_capacity(retained_len);
piece_idx = 0;
for nal in &self.nals {
let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize");
let nal_pieces = &self.pieces[piece_idx..next_piece_idx];
data.extend_from_slice(&nal.len.to_be_bytes()[..]);
data.push(nal.hdr.into());
let mut actual_len = 1;
for piece in nal_pieces {
data.extend_from_slice(&piece[..]);
actual_len += piece.len();
}
debug_assert_eq!(
usize::try_from(nal.len).expect("u32 fits in usize"),
actual_len
);
piece_idx = next_piece_idx;
}
debug_assert_eq!(retained_len, data.len());
let data = Bytes::from(data);
self.nals.clear();
self.pieces.clear();
let new_parameters = if new_sps.is_some() || new_pps.is_some() {
let sps_nal = new_sps.as_deref().unwrap_or(&self.parameters.sps_nal);
let pps_nal = new_pps.as_deref().unwrap_or(&self.parameters.pps_nal);
self.parameters = InternalParameters::parse_sps_and_pps(sps_nal, pps_nal)?;
match self.parameters.generic_parameters {
super::Parameters::Video(ref p) => Some(p.clone()),
@ -359,76 +445,32 @@ impl Depacketizer {
} else {
None
};
let picture = pending
.picture
.ok_or_else(|| format_err!("access unit has no picture"))?;
let nal_header =
h264_reader::nal::NalHeader::new(picture[0]).expect("nal header was previously valid");
Ok(Some(super::CodecItem::VideoFrame(super::VideoFrame {
start_ctx: pending.start_ctx,
end_ctx: pending.end_ctx,
loss: pending.loss,
Ok(VideoFrame {
new_parameters,
timestamp: pending.timestamp,
stream_id: pending.stream_id,
is_random_access_point: nal_header.nal_unit_type()
== UnitType::SliceLayerWithoutPartitioningIdr,
is_disposable: nal_header.nal_ref_idc() == 0,
pos: 0,
data_prefix: u32::try_from(picture.len()).unwrap().to_be_bytes(),
data: picture,
})))
loss: au.loss,
start_ctx: au.start_ctx,
end_ctx: au.end_ctx,
timestamp: au.timestamp,
stream_id: au.stream_id,
is_random_access_point,
is_disposable,
data,
})
}
}
impl AccessUnit {
fn start(pkt: &crate::client::rtp::Packet) -> Self {
fn start(pkt: &crate::client::rtp::Packet, additional_loss: u16) -> Self {
AccessUnit {
start_ctx: pkt.rtsp_ctx,
end_ctx: pkt.rtsp_ctx,
timestamp: pkt.timestamp,
stream_id: pkt.stream_id,
loss: pkt.loss,
new_sps: None,
new_pps: None,
picture: None,
}
}
in_fu_a: false,
fn nal(&mut self, parameters: &mut InternalParameters, nal: Bytes) -> Result<(), Error> {
if !nal.has_remaining() {
bail!("empty NAL");
// TODO: overflow?
loss: pkt.loss + additional_loss,
}
let nal_header = h264_reader::nal::NalHeader::new(nal[0])
.map_err(|e| format_err!("bad NAL header 0x{:x}: {:#?}", nal[0], e))?;
let unit_type = nal_header.nal_unit_type();
match unit_type {
UnitType::SeqParameterSet => {
if self.new_sps.is_some() {
bail!("multiple SPSs in access unit");
}
if nal != parameters.sps_nal {
self.new_sps = Some(nal);
}
}
UnitType::PicParameterSet => {
if self.new_pps.is_some() {
bail!("multiple PPSs in access unit");
}
if nal != parameters.pps_nal {
self.new_pps = Some(nal);
}
}
UnitType::SliceLayerWithoutPartitioningIdr
| UnitType::SliceLayerWithoutPartitioningNonIdr => {
if self.picture.is_some() {
bail!("currently expect only one picture NAL per access unit");
}
self.picture = Some(nal);
}
_ => {}
}
Ok(())
}
}
@ -582,8 +624,169 @@ impl InternalParameters {
}
}
/// Returns true iff the bytes of `nal` equal the bytes of `[hdr, ..data]`.
fn matches(nal: &[u8], hdr: NalHeader, pieces: &[Bytes]) -> bool {
if nal.is_empty() || nal[0] != u8::from(hdr) {
return false;
}
let mut nal_pos = 1;
for piece in pieces {
let new_pos = nal_pos + piece.len();
if nal.len() < new_pos {
return false;
}
if &piece[..] != &nal[nal_pos..new_pos] {
return false;
}
nal_pos = new_pos;
}
nal_pos == nal.len()
}
/// Saves the given NAL to a contiguous Bytes.
fn to_bytes(hdr: NalHeader, len: u32, pieces: &[Bytes]) -> Bytes {
let len = usize::try_from(len).expect("u32 fits in usize");
let mut out = Vec::with_capacity(len);
out.push(hdr.into());
for piece in pieces {
out.extend_from_slice(&piece[..]);
}
debug_assert_eq!(len, out.len());
out.into()
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;
use bytes::Bytes;
use crate::{client::rtp::Packet, codec::CodecItem};
#[test]
fn depacketize() {
let mut d = super::Depacketizer::new(90_000, Some("packetization-mode=1;profile-level-id=64001E;sprop-parameter-sets=Z2QAHqwsaoLA9puCgIKgAAADACAAAAMD0IAA,aO4xshsA")).unwrap();
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet {
// plain SEI packet.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x06plain"),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
d.push(Packet {
// STAP-A packet.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 1,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2"),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
d.push(Packet {
// FU-A packet, start.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 2,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x7c\x86fu-a start, "),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
d.push(Packet {
// FU-A packet, middle.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 3,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x7c\x06fu-a middle, "),
})
.unwrap();
assert!(d.pull().unwrap().is_none());
d.push(Packet {
// FU-A packet, end.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 4,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x7c\x46fu-a end"),
})
.unwrap();
let frame = match d.pull() {
Ok(Some(CodecItem::VideoFrame(frame))) => frame,
_ => panic!(),
};
assert_eq!(
&frame.data()[..],
b"\x00\x00\x00\x06\x06plain\
\x00\x00\x00\x09\x06stap-a 1\
\x00\x00\x00\x09\x06stap-a 2\
\x00\x00\x00\x22\x66fu-a start, fu-a middle, fu-a end"
);
}
#[test]
fn depacketize_parameter_change() {
let mut d = super::Depacketizer::new(90_000, Some("a=fmtp:96 profile-level-id=420029; packetization-mode=1; sprop-parameter-sets=Z01AHppkBYHv/lBgYGQAAA+gAAE4gBA=,aO48gA==")).unwrap();
match d.parameters() {
Some(crate::codec::Parameters::Video(v)) => {
assert_eq!(v.pixel_dimensions(), (704, 480));
}
_ => unreachable!(),
}
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(Packet { // new SPS.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 0,
loss: 0,
mark: false,
payload: Bytes::from_static(b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10"),
}).unwrap();
assert!(d.pull().unwrap().is_none());
d.push(Packet {
// same PPS again.
rtsp_ctx: crate::Context::dummy(),
stream_id: 0,
timestamp,
sequence_number: 1,
loss: 0,
mark: true,
payload: Bytes::from_static(b"\x68\xee\x3c\x80"),
})
.unwrap();
let frame = match d.pull() {
Ok(Some(CodecItem::VideoFrame(frame))) => frame,
_ => panic!(),
};
assert!(frame.new_parameters.is_some());
let p = frame.new_parameters.unwrap();
assert_eq!(p.pixel_dimensions(), (640, 480));
}
#[test]
fn gw_security() {
let params = super::InternalParameters::parse_format_specific_params(

View File

@ -259,25 +259,29 @@ pub struct VideoFrame {
/// In H.264 terms, this is a frame with `nal_ref_idc == 0`.
pub is_disposable: bool,
/// Position within `concat(data_prefix, data)`.
pos: u32,
data_prefix: [u8; 4],
/// Frame content in the requested format. Currently in a single [bytes::Bytes]
/// allocation, but this may change when supporting H.264 partitioned slices
/// or if we revise the fragmentation implementation.
data: bytes::Bytes,
}
impl VideoFrame {
#[inline]
pub fn start_ctx(&self) -> crate::Context {
self.start_ctx
}
#[inline]
pub fn end_ctx(&self) -> crate::Context {
self.end_ctx
}
#[inline]
pub fn data(&self) -> &Bytes {
&self.data
}
#[inline]
pub fn into_data(self) -> Bytes {
self.data
}
}
impl std::fmt::Debug for VideoFrame {
@ -291,52 +295,12 @@ impl std::fmt::Debug for VideoFrame {
.field("new_parameters", &self.new_parameters)
.field("is_random_access_point", &self.is_random_access_point)
.field("is_disposable", &self.is_disposable)
.field("pos", &self.pos)
.field("data_len", &(self.data.len() + 4))
.field("data_len", &self.data.len())
//.field("data", &self.data.hex_dump())
.finish()
}
}
impl bytes::Buf for VideoFrame {
fn remaining(&self) -> usize {
self.data.len() + 4 - (self.pos as usize)
}
fn chunk(&self) -> &[u8] {
let pos = self.pos as usize;
if let Some(pos_within_data) = pos.checked_sub(4) {
&self.data[pos_within_data..]
} else {
&self.data_prefix[pos..]
}
}
fn advance(&mut self, cnt: usize) {
assert!((self.pos as usize) + cnt <= 4 + self.data.len());
self.pos += cnt as u32;
}
fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
match dst.len() {
0 => 0,
1 => {
dst[0] = std::io::IoSlice::new(self.chunk());
1
}
_ if self.pos < 4 => {
dst[0] = std::io::IoSlice::new(&self.data_prefix[self.pos as usize..]);
dst[1] = std::io::IoSlice::new(&self.data);
2
}
_ => {
dst[0] = std::io::IoSlice::new(&self.data[(self.pos - 4) as usize..]);
1
}
}
}
}
/// Turns RTP packets into [CodecItem]s.
/// This interface unstable and for internal use; it's exposed for direct fuzzing and benchmarking.
#[doc(hidden)]