improve mp4 example error handling

* avoid half-written output (fixes #32)
* try to send TEARDOWN even on error while reading the stream.
  This isn't perfect as noted in the TODO. I think I'll want to
  change the API a bit, but not right now.
This commit is contained in:
Scott Lamb 2021-09-08 13:02:12 -07:00
parent 69f12bde8e
commit 7141b8601b

View File

@ -19,18 +19,21 @@
use anyhow::{anyhow, bail, Context, Error}; use anyhow::{anyhow, bail, Context, Error};
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use futures::StreamExt; use futures::{Future, StreamExt};
use log::{info, warn}; use log::{info, warn};
use retina::{ use retina::{
client::Transport, client::Transport,
codec::{AudioParameters, CodecItem, VideoParameters}, codec::{AudioParameters, CodecItem, VideoParameters},
}; };
use std::convert::TryFrom;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use std::{convert::TryFrom, pin::Pin};
use tokio::{
fs::File,
io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
};
#[derive(structopt::StructOpt)] #[derive(structopt::StructOpt)]
pub struct Opts { pub struct Opts {
@ -567,15 +570,115 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
} }
} }
/// Copies packets from `session` to `mp4` without handling any cleanup on error.
async fn copy<'a>(
opts: &'a Opts,
session: &'a mut retina::client::Demuxed,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
mp4: &'a mut Mp4Writer<File>,
) -> Result<(), Error> {
let sleep = match opts.duration {
Some(secs) => {
futures::future::Either::Left(tokio::time::sleep(std::time::Duration::from_secs(secs)))
}
None => futures::future::Either::Right(futures::future::pending()),
};
tokio::pin!(stop_signal);
tokio::pin!(sleep);
loop {
tokio::select! {
pkt = session.next() => {
match pkt.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::VideoFrame(f) => {
let start_ctx = f.start_ctx();
mp4.video(f).await.with_context(
|| format!("Error processing video frame starting with {}", start_ctx))?;
},
CodecItem::AudioFrame(f) => {
let ctx = f.ctx;
mp4.audio(f).await.with_context(
|| format!("Error processing audio frame, {}", ctx))?;
},
CodecItem::SenderReport(sr) => {
println!("{}: SR ts={}", sr.timestamp, sr.ntp_timestamp);
},
_ => continue,
};
},
_ = &mut stop_signal => {
info!("Stopping due to signal");
break;
},
_ = &mut sleep => {
info!("Stopping after {} seconds", opts.duration.unwrap());
break;
},
}
}
Ok(())
}
/// Writes the `.mp4`, including trying to finish or clean up the file.
async fn write_mp4<'a>(
opts: &'a Opts,
session: &'a mut retina::client::Demuxed,
video_stream: Option<(usize, VideoParameters)>,
audio_stream: Option<(usize, AudioParameters)>,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
) -> Result<(), Error> {
// Append into a filename suffixed with ".partial", then try to either rename it into
// place if it's complete or delete it otherwise.
const PARTIAL_SUFFIX: &str = ".partial";
let mut tmp_filename = opts.out.as_os_str().to_owned();
tmp_filename.push(PARTIAL_SUFFIX); // OsString::push doesn't put in a '/', unlike PathBuf::.
let tmp_filename: PathBuf = tmp_filename.into();
let out = tokio::fs::File::create(&tmp_filename).await?;
let mut mp4 = Mp4Writer::new(
video_stream.map(|(_, p)| Box::new(p)),
audio_stream.map(|(_, p)| Box::new(p)),
opts.allow_loss,
out,
)
.await?;
let result = copy(opts, session, stop_signal, &mut mp4).await;
if let Err(e) = result {
// Log errors about finishing, returning the original error.
if let Err(e) = mp4.finish().await {
log::error!(".mp4 finish failed: {}", e);
if let Err(e) = tokio::fs::remove_file(&tmp_filename).await {
log::error!("and removing .mp4 failed too: {}", e);
}
} else {
if let Err(e) = tokio::fs::rename(&tmp_filename, &opts.out).await {
log::error!("unable to move completed .mp4 into place: {}", e);
}
}
Err(e)
} else {
// Directly return errors about finishing.
if let Err(e) = mp4.finish().await {
log::error!(".mp4 finish failed: {}", e);
if let Err(e) = tokio::fs::remove_file(&tmp_filename).await {
log::error!("and removing .mp4 failed too: {}", e);
}
Err(e)
} else {
tokio::fs::rename(&tmp_filename, &opts.out).await?;
Ok(())
}
}
}
pub async fn run(opts: Opts) -> Result<(), Error> { pub async fn run(opts: Opts) -> Result<(), Error> {
if matches!(opts.transport, Transport::Udp) && !opts.allow_loss { if matches!(opts.transport, Transport::Udp) && !opts.allow_loss {
warn!("Using --transport=udp without strongly recommended --allow-loss!"); warn!("Using --transport=udp without strongly recommended --allow-loss!");
} }
let creds = super::creds(opts.src.username, opts.src.password); let creds = super::creds(opts.src.username.clone(), opts.src.password.clone());
let stop_signal = tokio::signal::ctrl_c(); let stop_signal = Box::pin(tokio::signal::ctrl_c());
let mut session = retina::client::Session::describe( let mut session = retina::client::Session::describe(
opts.src.url, opts.src.url.clone(),
retina::client::SessionOptions::default() retina::client::SessionOptions::default()
.creds(creds) .creds(creds)
.user_agent("Retina mp4 example".to_owned()) .user_agent("Retina mp4 example".to_owned())
@ -621,55 +724,12 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
.await? .await?
.demuxed()?; .demuxed()?;
// Read RTP data. // TODO: should also send a TEARDOWN if the PLAY response won't parse or if
let out = tokio::fs::File::create(opts.out).await?; // demuxed() fails. The former isn't even possible with the current API.
let mut mp4 = Mp4Writer::new(
video_stream.map(|(_, p)| Box::new(p)),
audio_stream.map(|(_, p)| Box::new(p)),
opts.allow_loss,
out,
)
.await?;
let sleep = match opts.duration { let result = write_mp4(&opts, &mut session, video_stream, audio_stream, stop_signal).await;
Some(secs) => { if let Err(e) = session.teardown().await {
futures::future::Either::Left(tokio::time::sleep(std::time::Duration::from_secs(secs))) log::error!("TEARDOWN failed: {}", e);
} }
None => futures::future::Either::Right(futures::future::pending()), result
};
tokio::pin!(stop_signal);
tokio::pin!(sleep);
loop {
tokio::select! {
pkt = session.next() => {
match pkt.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::VideoFrame(f) => {
let start_ctx = f.start_ctx();
mp4.video(f).await.with_context(
|| format!("Error processing video frame starting with {}", start_ctx))?;
},
CodecItem::AudioFrame(f) => {
let ctx = f.ctx;
mp4.audio(f).await.with_context(
|| format!("Error processing audio frame, {}", ctx))?;
},
CodecItem::SenderReport(sr) => {
println!("{}: SR ts={}", sr.timestamp, sr.ntp_timestamp);
},
_ => continue,
};
},
_ = &mut stop_signal => {
info!("Stopping due to signal");
break;
},
_ = &mut sleep => {
info!("Stopping after {} seconds", opts.duration.unwrap());
break;
},
}
}
session.teardown().await?;
mp4.finish().await?;
Ok(())
} }