diff --git a/Cargo.lock b/Cargo.lock index 803ab87..ff3b2a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3094,3 +3094,19 @@ dependencies = [ "syn", "synstructure", ] + +[[patch.unused]] +name = "messagebus" +version = "0.10.0" + +[[patch.unused]] +name = "mpeg2ts" +version = "0.1.1" + +[[patch.unused]] +name = "metrix" +version = "0.2.1" + +[[patch.unused]] +name = "metrix-macros" +version = "0.2.1" diff --git a/examples/client/out.ts b/examples/client/out.ts new file mode 100644 index 0000000..2ff8dd8 Binary files /dev/null and b/examples/client/out.ts differ diff --git a/examples/client/src/mp4.rs b/examples/client/src/mp4.rs index 9c052f4..85ae329 100644 --- a/examples/client/src/mp4.rs +++ b/examples/client/src/mp4.rs @@ -572,7 +572,7 @@ impl Mp4Writer { self.mdat_pos, size, frame.timestamp(), - frame.loss(), + frame.loss() as _, self.allow_loss, )?; @@ -601,7 +601,7 @@ impl Mp4Writer { self.mdat_pos, size, frame.timestamp(), - frame.loss(), + frame.loss() as _, self.allow_loss, )?; self.mdat_pos = self diff --git a/examples/client/src/ts.rs b/examples/client/src/ts.rs index fe15619..3207620 100644 --- a/examples/client/src/ts.rs +++ b/examples/client/src/ts.rs @@ -541,6 +541,11 @@ pub async fn run(opts: Opts) -> Result<(), Error> { return true; } + if s.encoding_name() == "h265" { + log::info!("Using h265 video stream"); + return true; + } + log::info!( "Ignoring {} video stream because it's unsupported", s.encoding_name(), @@ -679,7 +684,7 @@ fn default_pmt_packet() -> TsPacket { version_number: VersionNumber::default(), table: vec![ EsInfo { - stream_type: StreamType::H264, + stream_type: StreamType::H265, elementary_pid: Pid::new(VIDEO_ES_PID).unwrap(), descriptors: vec![], }, diff --git a/src/codec/aac.rs b/src/codec/aac.rs index aaa4bc4..2501fc6 100644 --- a/src/codec/aac.rs +++ b/src/codec/aac.rs @@ -26,6 +26,9 @@ use crate::{error::ErrorInt, rtp::ReceivedPacket, ConnectionContext, Error, Stre use super::{AudioParameters, CodecItem}; +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Params {} + /// An AudioSpecificConfig as in ISO/IEC 14496-3 section 1.6.2.1. /// /// Currently stores the raw form and a few fields of interest. diff --git a/src/codec/g723.rs b/src/codec/g723.rs index 6b6fa21..be04d21 100644 --- a/src/codec/g723.rs +++ b/src/codec/g723.rs @@ -9,6 +9,9 @@ use super::AudioParameters; const FIXED_CLOCK_RATE: u32 = 8_000; +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Params {} + #[derive(Debug)] pub(crate) struct Depacketizer { pending: Option, diff --git a/src/codec/h264.rs b/src/codec/h264.rs index 8fe4ecb..f1197c6 100644 --- a/src/codec/h264.rs +++ b/src/codec/h264.rs @@ -17,6 +17,12 @@ use crate::{ use super::VideoFrame; +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Params { + pub sps: Bytes, + pub pps: Bytes, +} + /// A [super::Depacketizer] implementation which finds access unit boundaries /// and produces unfragmented NAL units as specified in [RFC /// 6184](https://tools.ietf.org/html/rfc6184). @@ -478,7 +484,10 @@ impl Depacketizer { for nal in &self.nals { let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize"); let nal_pieces = &self.pieces[piece_idx..next_piece_idx]; - data.extend_from_slice(&nal.len.to_be_bytes()[..]); + + // data.extend_from_slice(&nal.len.to_be_bytes()[..]); + data.extend_from_slice(&[0, 0, 0, 1]); + data.push(nal.hdr.into()); let mut actual_len = 1; for piece in nal_pieces { @@ -761,6 +770,10 @@ impl InternalParameters { pixel_dimensions, pixel_aspect_ratio, frame_rate, + codec_specific_params: crate::codec::CodecSpecificParams::Avc(Params { + sps: sps_nal.clone(), + pps: pps_nal.clone(), + }), extra_data: avc_decoder_config, }, sps_nal, diff --git a/src/codec/h265.rs b/src/codec/h265.rs index 9e27dac..180cfcd 100644 --- a/src/codec/h265.rs +++ b/src/codec/h265.rs @@ -6,7 +6,6 @@ use rtp::codecs::h265::{H265NALUHeader, H265Packet, H265Payload}; use rtp::packetizer::Depacketizer as _; use std::collections::VecDeque; -use std::time::{Duration, Instant}; use super::{VideoFrame, VideoParameters}; use crate::rtp::ReceivedPacket; @@ -17,9 +16,10 @@ const H265NALU_BLA_N_LP: u8 = 18; const H265NALU_IDR_W_RADL: u8 = 19; const H265NALU_IDR_N_LP: u8 = 20; const H265NALU_CRA_NUT: u8 = 21; -const H265NALU_VPS: u8 = 32; -const H265NALU_SPS: u8 = 33; -const H265NALU_PPS: u8 = 34; + +// const H265NALU_VPS: u8 = 32; +// const H265NALU_SPS: u8 = 33; +// const H265NALU_PPS: u8 = 34; #[derive(Debug)] #[allow(clippy::large_enum_variant)] @@ -32,20 +32,19 @@ enum DepacketizerInputState { Process(AccessUnit), } -#[derive(Debug, Default)] -struct Params { - vps: Option>, - pps: Option>, - sps: Option>, - sei: Option>, - profile_id: Option, - using_donl_field: bool, +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct Params { + pub vps: Option, + pub pps: Option, + pub sps: Option, + pub sei: Option, + pub profile_id: Option, + pub using_donl_field: bool, } impl From<&str> for Params { fn from(input: &str) -> Self { let mut params = Params::default(); - for mut item in input.split(';').map(|x| x.split('=')) { let Some(key) = item.next() else {continue}; let Some(value) = item.next() else {continue}; @@ -58,22 +57,22 @@ impl From<&str> for Params { } } else if key.eq_ignore_ascii_case("sprop-vps") { match base64::decode(value.trim()) { - Ok(val) => params.vps = Some(val), + Ok(val) => params.vps = Some(val.into()), Err(err) => log::error!("attr[sprop-vps] parse error: {}", err), } } else if key.eq_ignore_ascii_case("sprop-pps") { match base64::decode(value.trim()) { - Ok(val) => params.pps = Some(val), + Ok(val) => params.pps = Some(val.into()), Err(err) => log::error!("attr[sprop-pps] parse error: {}", err), } } else if key.eq_ignore_ascii_case("sprop-sps") { match base64::decode(value.trim()) { - Ok(val) => params.sps = Some(val), + Ok(val) => params.sps = Some(val.into()), Err(err) => log::error!("attr[sprop-sps] parse error: {}", err), } } else if key.eq_ignore_ascii_case("sprop-sei") { match base64::decode(value.trim()) { - Ok(val) => params.sei = Some(val), + Ok(val) => params.sei = Some(val.into()), Err(err) => log::error!("attr[sprop-sei] parse error: {}", err), } } else if key.eq_ignore_ascii_case("sprop-max-don-diff") { @@ -111,7 +110,6 @@ pub(crate) struct Depacketizer { /// A complete video frames ready for pull. pending: VecDeque, parameters: Params, - parameters_sent_time: Option, generic_params: Option, } @@ -127,87 +125,27 @@ impl Depacketizer { )); } + let parameters = format_specific_params.map(Params::from).unwrap_or_default(); + // TODO proper parse SPS, PPS for `generic_params` let generic_params = Some(VideoParameters { pixel_dimensions: (0, 0), rfc6381_codec: "hevc".to_string(), pixel_aspect_ratio: None, frame_rate: None, + codec_specific_params: crate::codec::CodecSpecificParams::Hevc(parameters.clone()), extra_data: Vec::new().into(), }); - log::debug!(">>> FSP: {:?}", format_specific_params); - Ok(Depacketizer { pkts_loss: 0, input_state: DepacketizerInputState::Idle, pending: VecDeque::new(), - parameters: format_specific_params.map(Into::into).unwrap_or_default(), - parameters_sent_time: None, + parameters, generic_params, }) } - fn send_params( - &mut self, - ctx: crate::PacketContext, - stream_id: usize, - timestamp: crate::Timestamp, - ) { - if let Some(vps) = &self.parameters.vps { - let mut data = Vec::with_capacity(vps.len() + 4); - data.put_u32(vps.len() as u32); - data.extend_from_slice(&vps); - - self.pending.push_back(VideoFrame { - start_ctx: ctx, - end_ctx: ctx, - has_new_parameters: false, - loss: 0, - timestamp, - stream_id, - is_random_access_point: false, - is_disposable: false, - data, - }); - } - - if let Some(sps) = &self.parameters.sps { - let mut data = Vec::with_capacity(sps.len() + 4); - data.put_u32(sps.len() as u32); - data.extend_from_slice(&sps); - - self.pending.push_back(VideoFrame { - start_ctx: ctx, - end_ctx: ctx, - has_new_parameters: false, - loss: 0, - timestamp, - stream_id, - is_random_access_point: false, - is_disposable: false, - data, - }); - } - if let Some(pps) = &self.parameters.pps { - let mut data = Vec::with_capacity(pps.len() + 4); - data.put_u32(pps.len() as u32); - data.extend_from_slice(&pps); - - self.pending.push_back(VideoFrame { - start_ctx: ctx, - end_ctx: ctx, - has_new_parameters: false, - loss: 0, - timestamp, - stream_id, - is_random_access_point: false, - is_disposable: false, - data, - }); - } - } - pub(super) fn parameters(&self) -> Option { self.generic_params .as_ref() @@ -224,19 +162,12 @@ impl Depacketizer { let stream_id = pkt.stream_id(); let timestamp = pkt.timestamp(); - if matches!(self.parameters_sent_time, None) - | matches!(self.parameters_sent_time, Some(inst) if inst.elapsed() > Duration::from_secs(5)) - { - self.parameters_sent_time = Some(Instant::now()); - self.send_params(ctx, stream_id, timestamp); - } - self.pkts_loss += loss as u32; let payload = pkt.into_payload_bytes(); let mut pkt = H265Packet::default(); - pkt.with_donl(self.parameters.using_donl_field); + pkt.with_donl(self.parameters.using_donl_field); pkt.depacketize(&payload) .map_err(|err| format!("{}", err))?; @@ -246,7 +177,7 @@ impl Depacketizer { H265Payload::H265SingleNALUnitPacket(pkt) => { let mut au = AccessUnit::start(ctx, timestamp, stream_id); au.push(pkt.payload(), Some(pkt.payload_header())); - au.finsh(&mut self.pending, ctx, self.pkts_loss); + au.finish(&mut self.pending, ctx, self.pkts_loss); self.pkts_loss = 0; DepacketizerInputState::Idle @@ -256,14 +187,14 @@ impl Depacketizer { let mut au = AccessUnit::start(ctx, timestamp, stream_id); let first = pkt.first_unit().unwrap(); au.push(first.nal_unit(), None); - au.finsh(&mut self.pending, ctx, self.pkts_loss); + au.finish(&mut self.pending, ctx, self.pkts_loss); self.pkts_loss = 0; let others = pkt.other_units(); for u in others { let mut au = AccessUnit::start(ctx, timestamp, stream_id); au.push(u.nal_unit(), None); - au.finsh(&mut self.pending, ctx, self.pkts_loss); + au.finish(&mut self.pending, ctx, self.pkts_loss); } DepacketizerInputState::Idle @@ -304,7 +235,7 @@ impl Depacketizer { au.push(pkt.payload(), Some(H265NALUHeader(header))); if fu_header.e() { - au.finsh(&mut self.pending, ctx, self.pkts_loss); + au.finish(&mut self.pending, ctx, self.pkts_loss); self.pkts_loss = 0; DepacketizerInputState::Idle @@ -340,7 +271,7 @@ struct AccessUnit { impl AccessUnit { fn start(ctx: crate::PacketContext, timestamp: crate::Timestamp, stream_id: usize) -> Self { let mut data = BytesMut::new(); - data.put_u32(0); + data.put_slice(&[0, 0, 0, 1]); data.put_u16(0); AccessUnit { @@ -359,10 +290,10 @@ impl AccessUnit { self.data.put(data); } - fn finsh(self, dst: &mut VecDeque, ctx: crate::PacketContext, loss: u32) { + fn finish(self, dst: &mut VecDeque, ctx: crate::PacketContext, loss: u32) { let mut data = self.data.to_vec(); - let length = data.len() as u32 - 4; - (&mut data[0..4]).put_u32(length); + // let length = data.len() as u32 - 4; + // data[0..4].copy_from_slice(); if let Some(hdr) = self.header { (&mut data[4..6]).put_u16(hdr.0); diff --git a/src/codec/mod.rs b/src/codec/mod.rs index a845b66..50ced59 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -71,6 +71,7 @@ pub struct VideoParameters { rfc6381_codec: String, pixel_aspect_ratio: Option<(u32, u32)>, frame_rate: Option<(u32, u32)>, + codec_specific_params: CodecSpecificParams, extra_data: Bytes, } @@ -119,6 +120,11 @@ impl VideoParameters { pub fn extra_data(&self) -> &[u8] { &self.extra_data } + + /// Codec-specific parameters (like SPS, PPS..) + pub fn codec_specific_params(&self) -> &CodecSpecificParams { + &self.codec_specific_params + } } impl std::fmt::Debug for VideoParameters { @@ -420,6 +426,36 @@ impl std::fmt::Debug for VideoFrame { } } +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum CodecSpecificParams { + Avc(h264::Params), + Hevc(h265::Params), + Aac(aac::Params), + SimpleAudio(simple_audio::Params), + G723(g723::Params), + Onvif(onvif::Params), +} + +impl CodecSpecificParams { + pub fn as_hevc(&self) -> Option<&h265::Params> { + if let Self::Hevc(v) = self { + Some(v) + } else { + None + } + } + + pub fn as_avc(&self) -> Option<&h264::Params> { + if let Self::Avc(v) = self { + Some(v) + } else { + None + } + } +} + +// impl CodecSpecificParams {} + /// Turns RTP packets into [`CodecItem`]s. /// /// This interface unstable and for internal use; it's exposed for direct fuzzing and benchmarking. diff --git a/src/codec/onvif.rs b/src/codec/onvif.rs index 75a4622..4a72c0f 100644 --- a/src/codec/onvif.rs +++ b/src/codec/onvif.rs @@ -12,6 +12,9 @@ use bytes::{Buf, BufMut, BytesMut}; use super::{CodecItem, MessageParameters}; +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Params {} + #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum CompressionType { Uncompressed, diff --git a/src/codec/simple_audio.rs b/src/codec/simple_audio.rs index f8908ec..5f41ae8 100644 --- a/src/codec/simple_audio.rs +++ b/src/codec/simple_audio.rs @@ -8,6 +8,9 @@ use std::num::NonZeroU32; use super::{AudioParameters, CodecItem}; +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Params {} + #[derive(Debug)] pub(crate) struct Depacketizer { parameters: AudioParameters,