From c887998636bbfc96213ce67b33c2a19e6a679136 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Mon, 11 Apr 2022 21:47:46 -0700 Subject: [PATCH] mp4 example: mid-stream video param changes https://github.com/scottlamb/moonfire-nvr/issues/217 --- examples/client/mp4.rs | 163 +++++++++++++++++++++++------------------ src/client/mod.rs | 7 +- src/codec/mod.rs | 8 +- src/codec/onvif.rs | 2 +- 4 files changed, 103 insertions(+), 77 deletions(-) diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 29622b4..bd2c92e 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -109,7 +109,10 @@ async fn write_all_buf( pub struct Mp4Writer { mdat_start: u32, mdat_pos: u32, - video_params: Option>, + video_params: Vec>, + + /// The most recently used 1-based index within `video_params`. + cur_video_params_sample_description_index: Option, audio_params: Option>, allow_loss: bool, @@ -121,12 +124,19 @@ pub struct Mp4Writer { inner: W, } +/// A chunk: a group of samples that have consecutive byte positions and same sample description. +struct Chunk { + first_sample_number: u32, // 1-based index + byte_pos: u32, // starting byte of first sample + sample_description_index: u32, +} + /// Tracks the parts of a `trak` atom which are common between video and audio samples. #[derive(Default)] struct TrakTracker { samples: u32, next_pos: Option, - chunks: Vec<(u32, u32)>, // (1-based sample_number, byte_pos) + chunks: Vec, sizes: Vec, /// The durations of samples in a run-length encoding form: (number of samples, duration). @@ -140,7 +150,8 @@ struct TrakTracker { impl TrakTracker { fn add_sample( &mut self, - pos: u32, + sample_description_index: u32, + byte_pos: u32, size: u32, timestamp: retina::Timestamp, loss: u16, @@ -150,11 +161,18 @@ impl TrakTracker { bail!("Lost {} RTP packets mid-stream", loss); } self.samples += 1; - if self.next_pos != Some(pos) { - self.chunks.push((self.samples, pos)); + if self.next_pos != Some(byte_pos) + || self.chunks.last().map(|c| c.sample_description_index) + != Some(sample_description_index) + { + self.chunks.push(Chunk { + first_sample_number: self.samples, + byte_pos, + sample_description_index, + }); } self.sizes.push(size); - self.next_pos = Some(pos + size); + self.next_pos = Some(byte_pos + size); if let Some(last_pts) = self.last_pts.replace(timestamp.timestamp()) { let duration = timestamp.timestamp().checked_sub(last_pts).unwrap(); self.tot_duration += u64::try_from(duration).unwrap(); @@ -197,11 +215,11 @@ impl TrakTracker { let mut prev_sample_number = 1; let mut chunk_number = 1; if !self.chunks.is_empty() { - for &(sample_number, _pos) in &self.chunks[1..] { + for c in &self.chunks[1..] { buf.put_u32(chunk_number); - buf.put_u32(sample_number - prev_sample_number); - buf.put_u32(1); // sample_description_index - prev_sample_number = sample_number; + buf.put_u32(c.first_sample_number - prev_sample_number); + buf.put_u32(c.sample_description_index); + prev_sample_number = c.first_sample_number; chunk_number += 1; } buf.put_u32(chunk_number); @@ -220,8 +238,8 @@ impl TrakTracker { write_box!(buf, b"stco", { buf.put_u32(0); // version buf.put_u32(u32::try_from(self.chunks.len())?); // entry_count - for &(_sample_number, pos) in &self.chunks { - buf.put_u32(pos); + for c in &self.chunks { + buf.put_u32(c.byte_pos); } }); Ok(()) @@ -230,7 +248,6 @@ impl TrakTracker { impl Mp4Writer { pub async fn new( - video_params: Option>, audio_params: Option>, allow_loss: bool, mut inner: W, @@ -248,7 +265,8 @@ impl Mp4Writer { write_all_buf(&mut inner, &mut buf).await?; Ok(Mp4Writer { inner, - video_params, + video_params: Vec::new(), + cur_video_params_sample_description_index: None, audio_params, allow_loss, video_trak: TrakTracker::default(), @@ -286,11 +304,11 @@ impl Mp4Writer { } buf.put_u32(2); // next_track_id }); - if let Some(p) = self.video_params.as_ref() { - self.write_video_trak(&mut buf, p)?; + if self.video_trak.samples > 0 { + self.write_video_trak(&mut buf)?; } - if let Some(p) = self.audio_params.as_ref() { - self.write_audio_trak(&mut buf, p)?; + if self.audio_trak.samples > 0 { + self.write_audio_trak(&mut buf, self.audio_params.as_ref().unwrap())?; } }); write_all_buf(&mut self.inner, &mut buf.freeze()).await?; @@ -303,11 +321,7 @@ impl Mp4Writer { Ok(()) } - fn write_video_trak( - &self, - buf: &mut BytesMut, - parameters: &VideoParameters, - ) -> Result<(), Error> { + fn write_video_trak(&self, buf: &mut BytesMut) -> Result<(), Error> { write_box!(buf, b"trak", { write_box!(buf, b"tkhd", { buf.put_u32((1 << 24) | 7); // version, flags @@ -324,11 +338,13 @@ impl Mp4Writer { for v in &[0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000] { buf.put_u32(*v); // matrix } - let dims = self - .video_params - .as_ref() - .map(|p| p.pixel_dimensions()) - .unwrap_or((0, 0)); + let dims = self.video_params.iter().fold((0, 0), |prev_dims, p| { + let dims = p.pixel_dimensions(); + ( + std::cmp::max(prev_dims.0, dims.0), + std::cmp::max(prev_dims.1, dims.1), + ) + }); let width = u32::from(u16::try_from(dims.0)?) << 16; let height = u32::from(u16::try_from(dims.1)?) << 16; buf.put_u32(width); @@ -371,8 +387,10 @@ impl Mp4Writer { write_box!(buf, b"stbl", { write_box!(buf, b"stsd", { buf.put_u32(0); // version - buf.put_u32(1); // entry_count - self.write_video_sample_entry(buf, parameters)?; + buf.put_u32(u32::try_from(self.video_params.len())?); // entry_count + for p in &self.video_params { + self.write_video_sample_entry(buf, p)?; + } }); self.video_trak.write_common_stbl_parts(buf)?; write_box!(buf, b"stss", { @@ -522,28 +540,46 @@ impl Mp4Writer { Ok(()) } - async fn video(&mut self, mut frame: retina::codec::VideoFrame) -> Result<(), Error> { + async fn video( + &mut self, + stream: &retina::client::Stream, + frame: retina::codec::VideoFrame, + ) -> Result<(), Error> { println!( "{}: {}-byte video frame", &frame.timestamp, frame.data().remaining(), ); - if let Some(p) = frame.new_parameters.take() { - if self.video_trak.samples > 0 { - let old = self.video_params.as_ref().unwrap(); - bail!( - "video parameters change unimplemented.\nold: {:#?}\nnew: {:#?}", - old, - p - ); + let sample_description_index = if let (Some(i), None) = ( + self.cur_video_params_sample_description_index, + frame.new_parameters.as_ref(), + ) { + // Use the most recent sample description index for most frames, without having to + // scan through self.video_sample_index. + i + } else { + match stream.parameters() { + Some(Parameters::Video(params)) => { + log::info!("new video params: {:?}", params); + let pos = self.video_params.iter().position(|p| **p == params); + if let Some(pos) = pos { + u32::try_from(pos + 1)? + } else { + self.video_params.push(Box::new(params)); + u32::try_from(self.video_params.len())? + } + } + None => { + debug!("Discarding video frame received before parameters"); + return Ok(()); + } + _ => unreachable!(), } - self.video_params = Some(p); - } else if self.video_params.is_none() { - debug!("Discarding video frame received before parameters"); - return Ok(()); - } + }; + self.cur_video_params_sample_description_index = Some(sample_description_index); let size = u32::try_from(frame.data().remaining())?; self.video_trak.add_sample( + sample_description_index, self.mdat_pos, size, frame.timestamp, @@ -571,6 +607,7 @@ impl Mp4Writer { ); let size = u32::try_from(frame.data.remaining())?; self.audio_trak.add_sample( + /* sample_description_index */ 1, self.mdat_pos, size, frame.timestamp, @@ -606,8 +643,9 @@ async fn copy<'a>( pkt = session.next() => { match pkt.ok_or_else(|| anyhow!("EOF"))?? { CodecItem::VideoFrame(f) => { + let stream = &session.streams()[f.stream_id]; let start_ctx = f.start_ctx(); - mp4.video(f).await.with_context( + mp4.video(stream, f).await.with_context( || format!("Error processing video frame starting with {}", start_ctx))?; }, CodecItem::AudioFrame(f) => { @@ -638,7 +676,6 @@ async fn copy<'a>( async fn write_mp4<'a>( opts: &'a Opts, session: retina::client::Session, - video_params: Option>, audio_params: Option>, stop_signal: Pin>>>, ) -> Result<(), Error> { @@ -658,7 +695,7 @@ async fn write_mp4<'a>( tmp_filename.push(PARTIAL_SUFFIX); // OsString::push doesn't put in a '/', unlike PathBuf::. let tmp_filename: PathBuf = tmp_filename.into(); let out = tokio::fs::File::create(&tmp_filename).await?; - let mut mp4 = Mp4Writer::new(video_params, audio_params, opts.allow_loss, out).await?; + let mut mp4 = Mp4Writer::new(audio_params, opts.allow_loss, out).await?; let result = copy(opts, &mut session, stop_signal, &mut mp4).await; if let Err(e) = result { // Log errors about finishing, returning the original error. @@ -706,36 +743,27 @@ pub async fn run(opts: Opts) -> Result<(), Error> { .teardown(opts.teardown), ) .await?; - let (video_stream_i, video_params) = if !opts.no_video { - let s = session.streams().iter().enumerate().find_map(|(i, s)| { + let video_stream_i = if !opts.no_video { + let s = session.streams().iter().position(|s| { if s.media == "video" { if s.encoding_name == "h264" { log::info!("Using h264 video stream"); - return Some(( - i, - match s.parameters() { - Some(Parameters::Video(v)) => Some(Box::new(v)), - Some(_) => panic!("expected parameters to match stream type video"), - None => None, - }, - )); + return true; } log::info!( "Ignoring {} video stream because it's unsupported", &s.encoding_name ); } - None + false }); - if let Some((i, p)) = s { - (Some(i), p) - } else { + if s.is_none() { log::info!("No suitable video stream found"); - (None, None) } + s } else { log::info!("Ignoring video streams (if any) because of --no-video"); - (None, None) + None }; if let Some(i) = video_stream_i { session.setup(i).await?; @@ -772,14 +800,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { if video_stream_i.is_none() && audio_stream.is_none() { bail!("Exiting because no video or audio stream was selected; see info log messages above"); } - let result = write_mp4( - &opts, - session, - video_params, - audio_stream.map(|(_i, p)| p), - stop_signal, - ) - .await; + let result = write_mp4(&opts, session, audio_stream.map(|(_i, p)| p), stop_signal).await; // Session has now been dropped, on success or failure. A TEARDOWN should // be pending if necessary. session_group.await_teardown() will wait for it. diff --git a/src/client/mod.rs b/src/client/mod.rs index a66f8e7..b55cc93 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1170,7 +1170,7 @@ impl RtspConnection { } impl Session { - /// Returns the available streams as described the server. + /// Returns the available streams as described by the server. pub fn streams(&self) -> &[Stream] { &self.0.presentation.streams } @@ -2268,6 +2268,11 @@ impl Demuxed { pub fn tool(&self) -> Option<&Tool> { self.session.tool() } + + /// Returns the available streams as described by the server. + pub fn streams(&self) -> &[Stream] { + &self.session.streams() + } } impl futures::Stream for Demuxed { diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 0617e8f..ea97417 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -43,7 +43,7 @@ pub enum CodecItem { /// calls to [`crate::client::Stream::parameters`] will return the new value. /// /// Currently audio and message streams' parameters never change mid-stream. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum Parameters { Video(VideoParameters), Audio(AudioParameters), @@ -59,7 +59,7 @@ pub enum Parameters { /// Video streams' parameters may change mid-stream; if so, the frame which /// changed them will have `VideoFrame::new_parameters` set, and subsequent /// calls to [`crate::client::Stream::parameters`] will return the new value. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct VideoParameters { pixel_dimensions: (u32, u32), rfc6381_codec: String, @@ -131,7 +131,7 @@ impl std::fmt::Debug for VideoParameters { } /// Parameters which describe an audio stream. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct AudioParameters { rfc6381_codec: Option, frame_length: Option, @@ -226,7 +226,7 @@ impl Buf for AudioFrame { } /// Parameters which describe a message stream, for `application` media types. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct MessageParameters(onvif::CompressionType); /// A single message, for `application` media types. diff --git a/src/codec/onvif.rs b/src/codec/onvif.rs index 5401d6b..dfef3d1 100644 --- a/src/codec/onvif.rs +++ b/src/codec/onvif.rs @@ -12,7 +12,7 @@ use bytes::{Buf, BufMut, BytesMut}; use super::CodecItem; -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum CompressionType { Uncompressed, GzipCompressed,