mp4 example: mid-stream video param changes

https://github.com/scottlamb/moonfire-nvr/issues/217
This commit is contained in:
Scott Lamb 2022-04-11 21:47:46 -07:00
parent 7cbe39213d
commit c887998636
4 changed files with 103 additions and 77 deletions

View File

@ -109,7 +109,10 @@ async fn write_all_buf<W: AsyncWrite + Unpin, B: Buf>(
pub struct Mp4Writer<W: AsyncWrite + AsyncSeek + Send + Unpin> {
mdat_start: u32,
mdat_pos: u32,
video_params: Option<Box<VideoParameters>>,
video_params: Vec<Box<VideoParameters>>,
/// The most recently used 1-based index within `video_params`.
cur_video_params_sample_description_index: Option<u32>,
audio_params: Option<Box<AudioParameters>>,
allow_loss: bool,
@ -121,12 +124,19 @@ pub struct Mp4Writer<W: AsyncWrite + AsyncSeek + Send + Unpin> {
inner: W,
}
/// A chunk: a group of samples that have consecutive byte positions and same sample description.
struct Chunk {
first_sample_number: u32, // 1-based index
byte_pos: u32, // starting byte of first sample
sample_description_index: u32,
}
/// Tracks the parts of a `trak` atom which are common between video and audio samples.
#[derive(Default)]
struct TrakTracker {
samples: u32,
next_pos: Option<u32>,
chunks: Vec<(u32, u32)>, // (1-based sample_number, byte_pos)
chunks: Vec<Chunk>,
sizes: Vec<u32>,
/// The durations of samples in a run-length encoding form: (number of samples, duration).
@ -140,7 +150,8 @@ struct TrakTracker {
impl TrakTracker {
fn add_sample(
&mut self,
pos: u32,
sample_description_index: u32,
byte_pos: u32,
size: u32,
timestamp: retina::Timestamp,
loss: u16,
@ -150,11 +161,18 @@ impl TrakTracker {
bail!("Lost {} RTP packets mid-stream", loss);
}
self.samples += 1;
if self.next_pos != Some(pos) {
self.chunks.push((self.samples, pos));
if self.next_pos != Some(byte_pos)
|| self.chunks.last().map(|c| c.sample_description_index)
!= Some(sample_description_index)
{
self.chunks.push(Chunk {
first_sample_number: self.samples,
byte_pos,
sample_description_index,
});
}
self.sizes.push(size);
self.next_pos = Some(pos + size);
self.next_pos = Some(byte_pos + size);
if let Some(last_pts) = self.last_pts.replace(timestamp.timestamp()) {
let duration = timestamp.timestamp().checked_sub(last_pts).unwrap();
self.tot_duration += u64::try_from(duration).unwrap();
@ -197,11 +215,11 @@ impl TrakTracker {
let mut prev_sample_number = 1;
let mut chunk_number = 1;
if !self.chunks.is_empty() {
for &(sample_number, _pos) in &self.chunks[1..] {
for c in &self.chunks[1..] {
buf.put_u32(chunk_number);
buf.put_u32(sample_number - prev_sample_number);
buf.put_u32(1); // sample_description_index
prev_sample_number = sample_number;
buf.put_u32(c.first_sample_number - prev_sample_number);
buf.put_u32(c.sample_description_index);
prev_sample_number = c.first_sample_number;
chunk_number += 1;
}
buf.put_u32(chunk_number);
@ -220,8 +238,8 @@ impl TrakTracker {
write_box!(buf, b"stco", {
buf.put_u32(0); // version
buf.put_u32(u32::try_from(self.chunks.len())?); // entry_count
for &(_sample_number, pos) in &self.chunks {
buf.put_u32(pos);
for c in &self.chunks {
buf.put_u32(c.byte_pos);
}
});
Ok(())
@ -230,7 +248,6 @@ impl TrakTracker {
impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
pub async fn new(
video_params: Option<Box<VideoParameters>>,
audio_params: Option<Box<AudioParameters>>,
allow_loss: bool,
mut inner: W,
@ -248,7 +265,8 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
write_all_buf(&mut inner, &mut buf).await?;
Ok(Mp4Writer {
inner,
video_params,
video_params: Vec::new(),
cur_video_params_sample_description_index: None,
audio_params,
allow_loss,
video_trak: TrakTracker::default(),
@ -286,11 +304,11 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
}
buf.put_u32(2); // next_track_id
});
if let Some(p) = self.video_params.as_ref() {
self.write_video_trak(&mut buf, p)?;
if self.video_trak.samples > 0 {
self.write_video_trak(&mut buf)?;
}
if let Some(p) = self.audio_params.as_ref() {
self.write_audio_trak(&mut buf, p)?;
if self.audio_trak.samples > 0 {
self.write_audio_trak(&mut buf, self.audio_params.as_ref().unwrap())?;
}
});
write_all_buf(&mut self.inner, &mut buf.freeze()).await?;
@ -303,11 +321,7 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
Ok(())
}
fn write_video_trak(
&self,
buf: &mut BytesMut,
parameters: &VideoParameters,
) -> Result<(), Error> {
fn write_video_trak(&self, buf: &mut BytesMut) -> Result<(), Error> {
write_box!(buf, b"trak", {
write_box!(buf, b"tkhd", {
buf.put_u32((1 << 24) | 7); // version, flags
@ -324,11 +338,13 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
for v in &[0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000] {
buf.put_u32(*v); // matrix
}
let dims = self
.video_params
.as_ref()
.map(|p| p.pixel_dimensions())
.unwrap_or((0, 0));
let dims = self.video_params.iter().fold((0, 0), |prev_dims, p| {
let dims = p.pixel_dimensions();
(
std::cmp::max(prev_dims.0, dims.0),
std::cmp::max(prev_dims.1, dims.1),
)
});
let width = u32::from(u16::try_from(dims.0)?) << 16;
let height = u32::from(u16::try_from(dims.1)?) << 16;
buf.put_u32(width);
@ -371,8 +387,10 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
write_box!(buf, b"stbl", {
write_box!(buf, b"stsd", {
buf.put_u32(0); // version
buf.put_u32(1); // entry_count
self.write_video_sample_entry(buf, parameters)?;
buf.put_u32(u32::try_from(self.video_params.len())?); // entry_count
for p in &self.video_params {
self.write_video_sample_entry(buf, p)?;
}
});
self.video_trak.write_common_stbl_parts(buf)?;
write_box!(buf, b"stss", {
@ -522,28 +540,46 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
Ok(())
}
async fn video(&mut self, mut frame: retina::codec::VideoFrame) -> Result<(), Error> {
async fn video(
&mut self,
stream: &retina::client::Stream,
frame: retina::codec::VideoFrame,
) -> Result<(), Error> {
println!(
"{}: {}-byte video frame",
&frame.timestamp,
frame.data().remaining(),
);
if let Some(p) = frame.new_parameters.take() {
if self.video_trak.samples > 0 {
let old = self.video_params.as_ref().unwrap();
bail!(
"video parameters change unimplemented.\nold: {:#?}\nnew: {:#?}",
old,
p
);
let sample_description_index = if let (Some(i), None) = (
self.cur_video_params_sample_description_index,
frame.new_parameters.as_ref(),
) {
// Use the most recent sample description index for most frames, without having to
// scan through self.video_sample_index.
i
} else {
match stream.parameters() {
Some(Parameters::Video(params)) => {
log::info!("new video params: {:?}", params);
let pos = self.video_params.iter().position(|p| **p == params);
if let Some(pos) = pos {
u32::try_from(pos + 1)?
} else {
self.video_params.push(Box::new(params));
u32::try_from(self.video_params.len())?
}
self.video_params = Some(p);
} else if self.video_params.is_none() {
}
None => {
debug!("Discarding video frame received before parameters");
return Ok(());
}
_ => unreachable!(),
}
};
self.cur_video_params_sample_description_index = Some(sample_description_index);
let size = u32::try_from(frame.data().remaining())?;
self.video_trak.add_sample(
sample_description_index,
self.mdat_pos,
size,
frame.timestamp,
@ -571,6 +607,7 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
);
let size = u32::try_from(frame.data.remaining())?;
self.audio_trak.add_sample(
/* sample_description_index */ 1,
self.mdat_pos,
size,
frame.timestamp,
@ -606,8 +643,9 @@ async fn copy<'a>(
pkt = session.next() => {
match pkt.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::VideoFrame(f) => {
let stream = &session.streams()[f.stream_id];
let start_ctx = f.start_ctx();
mp4.video(f).await.with_context(
mp4.video(stream, f).await.with_context(
|| format!("Error processing video frame starting with {}", start_ctx))?;
},
CodecItem::AudioFrame(f) => {
@ -638,7 +676,6 @@ async fn copy<'a>(
async fn write_mp4<'a>(
opts: &'a Opts,
session: retina::client::Session<retina::client::Described>,
video_params: Option<Box<VideoParameters>>,
audio_params: Option<Box<AudioParameters>>,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
) -> Result<(), Error> {
@ -658,7 +695,7 @@ async fn write_mp4<'a>(
tmp_filename.push(PARTIAL_SUFFIX); // OsString::push doesn't put in a '/', unlike PathBuf::.
let tmp_filename: PathBuf = tmp_filename.into();
let out = tokio::fs::File::create(&tmp_filename).await?;
let mut mp4 = Mp4Writer::new(video_params, audio_params, opts.allow_loss, out).await?;
let mut mp4 = Mp4Writer::new(audio_params, opts.allow_loss, out).await?;
let result = copy(opts, &mut session, stop_signal, &mut mp4).await;
if let Err(e) = result {
// Log errors about finishing, returning the original error.
@ -706,36 +743,27 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
.teardown(opts.teardown),
)
.await?;
let (video_stream_i, video_params) = if !opts.no_video {
let s = session.streams().iter().enumerate().find_map(|(i, s)| {
let video_stream_i = if !opts.no_video {
let s = session.streams().iter().position(|s| {
if s.media == "video" {
if s.encoding_name == "h264" {
log::info!("Using h264 video stream");
return Some((
i,
match s.parameters() {
Some(Parameters::Video(v)) => Some(Box::new(v)),
Some(_) => panic!("expected parameters to match stream type video"),
None => None,
},
));
return true;
}
log::info!(
"Ignoring {} video stream because it's unsupported",
&s.encoding_name
);
}
None
false
});
if let Some((i, p)) = s {
(Some(i), p)
} else {
if s.is_none() {
log::info!("No suitable video stream found");
(None, None)
}
s
} else {
log::info!("Ignoring video streams (if any) because of --no-video");
(None, None)
None
};
if let Some(i) = video_stream_i {
session.setup(i).await?;
@ -772,14 +800,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
if video_stream_i.is_none() && audio_stream.is_none() {
bail!("Exiting because no video or audio stream was selected; see info log messages above");
}
let result = write_mp4(
&opts,
session,
video_params,
audio_stream.map(|(_i, p)| p),
stop_signal,
)
.await;
let result = write_mp4(&opts, session, audio_stream.map(|(_i, p)| p), stop_signal).await;
// Session has now been dropped, on success or failure. A TEARDOWN should
// be pending if necessary. session_group.await_teardown() will wait for it.

View File

@ -1170,7 +1170,7 @@ impl RtspConnection {
}
impl<S: State> Session<S> {
/// Returns the available streams as described the server.
/// Returns the available streams as described by the server.
pub fn streams(&self) -> &[Stream] {
&self.0.presentation.streams
}
@ -2268,6 +2268,11 @@ impl Demuxed {
pub fn tool(&self) -> Option<&Tool> {
self.session.tool()
}
/// Returns the available streams as described by the server.
pub fn streams(&self) -> &[Stream] {
&self.session.streams()
}
}
impl futures::Stream for Demuxed {

View File

@ -43,7 +43,7 @@ pub enum CodecItem {
/// calls to [`crate::client::Stream::parameters`] will return the new value.
///
/// Currently audio and message streams' parameters never change mid-stream.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Parameters {
Video(VideoParameters),
Audio(AudioParameters),
@ -59,7 +59,7 @@ pub enum Parameters {
/// Video streams' parameters may change mid-stream; if so, the frame which
/// changed them will have `VideoFrame::new_parameters` set, and subsequent
/// calls to [`crate::client::Stream::parameters`] will return the new value.
#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct VideoParameters {
pixel_dimensions: (u32, u32),
rfc6381_codec: String,
@ -131,7 +131,7 @@ impl std::fmt::Debug for VideoParameters {
}
/// Parameters which describe an audio stream.
#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct AudioParameters {
rfc6381_codec: Option<String>,
frame_length: Option<NonZeroU32>,
@ -226,7 +226,7 @@ impl Buf for AudioFrame {
}
/// Parameters which describe a message stream, for `application` media types.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct MessageParameters(onvif::CompressionType);
/// A single message, for `application` media types.

View File

@ -12,7 +12,7 @@ use bytes::{Buf, BufMut, BytesMut};
use super::CodecItem;
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum CompressionType {
Uncompressed,
GzipCompressed,