From 6df76843844216aee4ceb2513a085eeb4ab4bc14 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Mon, 14 Nov 2022 18:49:42 +0400 Subject: [PATCH] Add H265 Codec --- Cargo.lock | 109 +++- Cargo.toml | 2 + examples/client/Cargo.toml | 2 + examples/client/src/main.rs | 4 + examples/client/src/mp4.rs | 3 + examples/client/src/ts.rs | 915 ++++++++++++++++++++++++++++++ examples/webrtc-proxy/src/main.rs | 10 + player/app.js | 49 ++ player/index.html | 26 + player/style.css | 4 + src/client/mod.rs | 5 + src/codec/aac.rs | 4 +- src/codec/g723.rs | 2 +- src/codec/h264.rs | 2 +- src/codec/h265.rs | 392 +++++++++++++ src/codec/mod.rs | 23 +- src/codec/onvif.rs | 4 +- src/codec/simple_audio.rs | 2 +- 18 files changed, 1526 insertions(+), 32 deletions(-) create mode 100644 examples/client/src/ts.rs create mode 100644 player/app.js create mode 100644 player/index.html create mode 100644 player/style.css create mode 100644 src/codec/h265.rs diff --git a/Cargo.lock b/Cargo.lock index de6ec82..803ab87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,6 +116,12 @@ version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704" +[[package]] +name = "arc-swap" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" + [[package]] name = "arrayvec" version = "0.5.2" @@ -364,9 +370,11 @@ dependencies = [ "futures", "itertools", "log", + "mpeg2ts", "mylog", "retina", "structopt", + "thiserror", "tokio", "url", ] @@ -1159,7 +1167,7 @@ dependencies = [ "tokio", "waitgroup", "webrtc-srtp", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -1311,6 +1319,16 @@ dependencies = [ "four-cc", ] +[[package]] +name = "mpeg2ts" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a38c92bbb0d15f62ed915c392f2d675e3c0889c98be7ad418ae660bfcb7f3b2" +dependencies = [ + "byteorder", + "trackable 0.2.24", +] + [[package]] name = "mpeg4-audio-const" version = "0.2.0" @@ -1786,6 +1804,7 @@ checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" name = "retina" version = "0.4.3" dependencies = [ + "arc-swap", "base64", "bitstream-io", "bytes", @@ -1800,6 +1819,7 @@ dependencies = [ "pin-project", "pretty-hex", "rand", + "rtp", "rtsp-types", "sdp-types", "smallvec", @@ -1855,20 +1875,20 @@ checksum = "80d9625e47edb43aca711ec826ad12154d364ada9e60f4e6f8d40471b3e1e156" dependencies = [ "bytes", "thiserror", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] name = "rtp" -version = "0.6.5" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5110c12c9f7d1e76eba80076cce4ccb82ee085bd10a62472468de0663240f8b5" +checksum = "68ecd5b57967801ff3616239508be0ecc50c6a10a9ca0716475002bf9dc44210" dependencies = [ "async-trait", "bytes", "rand", "thiserror", - "webrtc-util", + "webrtc-util 0.6.0", ] [[package]] @@ -2237,7 +2257,7 @@ dependencies = [ "thiserror", "tokio", "url", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2295,18 +2315,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.31" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.31" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" dependencies = [ "proc-macro2", "quote", @@ -2414,6 +2434,35 @@ dependencies = [ "once_cell", ] +[[package]] +name = "trackable" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98abb9e7300b9ac902cc04920945a874c1973e08c310627cc4458c04b70dd32" +dependencies = [ + "trackable 1.2.0", + "trackable_derive", +] + +[[package]] +name = "trackable" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "017e2a1a93718e4e8386d037cfb8add78f1d690467f4350fb582f55af1203167" +dependencies = [ + "trackable_derive", +] + +[[package]] +name = "trackable_derive" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebeb235c5847e2f82cfe0f07eb971d1e5f6804b18dac2ae16349cc604380f82f" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "turn" version = "0.5.4" @@ -2429,7 +2478,7 @@ dependencies = [ "stun", "thiserror", "tokio", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2680,7 +2729,7 @@ dependencies = [ "webrtc-media", "webrtc-sctp", "webrtc-srtp", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2695,7 +2744,7 @@ dependencies = [ "thiserror", "tokio", "webrtc-sctp", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2731,7 +2780,7 @@ dependencies = [ "thiserror", "tokio", "webpki", - "webrtc-util", + "webrtc-util 0.5.4", "x25519-dalek", "x509-parser 0.9.2", ] @@ -2754,7 +2803,7 @@ dependencies = [ "uuid", "waitgroup", "webrtc-mdns", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2767,7 +2816,7 @@ dependencies = [ "socket2", "thiserror", "tokio", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2783,7 +2832,7 @@ dependencies = [ "rand", "rtp", "thiserror", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2818,7 +2867,7 @@ dependencies = [ "rand", "thiserror", "tokio", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2842,7 +2891,7 @@ dependencies = [ "subtle", "thiserror", "tokio", - "webrtc-util", + "webrtc-util 0.5.4", ] [[package]] @@ -2867,6 +2916,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "webrtc-util" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4921b6a976b5570004e9c1b29ae109a81a73e2370e80627efa315f9ad0105f43" +dependencies = [ + "async-trait", + "bitflags", + "bytes", + "cc", + "ipnet", + "lazy_static", + "libc", + "log", + "nix 0.24.1", + "parking_lot", + "rand", + "thiserror", + "tokio", + "winapi", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 098e715..b2ca976 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ include = ["src/**/*", "benches", "Cargo.toml"] rust-version = "1.59" [dependencies] +arc-swap = "1.5.1" base64 = "0.13.0" bitstream-io = "1.1" bytes = "1.0.1" @@ -28,6 +29,7 @@ once_cell = "1.7.2" pin-project = "1.0.7" pretty-hex = "0.3.0" rand = "0.8.3" +rtp = "0.6.7" rtsp-types = "0.0.3" sdp-types = "0.1.4" smallvec = { version = "1.6.1", features = ["union"] } diff --git a/examples/client/Cargo.toml b/examples/client/Cargo.toml index 48084fb..60a7c53 100644 --- a/examples/client/Cargo.toml +++ b/examples/client/Cargo.toml @@ -16,3 +16,5 @@ anyhow = "1.0.41" itertools = "0.10.1" mylog = { git = "https://github.com/scottlamb/mylog" } structopt = "0.3.21" +mpeg2ts = "0.1.1" +thiserror = "1.0.37" diff --git a/examples/client/src/main.rs b/examples/client/src/main.rs index 4c658f4..a070dbb 100644 --- a/examples/client/src/main.rs +++ b/examples/client/src/main.rs @@ -6,6 +6,7 @@ mod info; mod mp4; mod onvif; +mod ts; use anyhow::Error; use log::{error, info}; @@ -35,6 +36,8 @@ enum Cmd { Mp4(mp4::Opts), /// Follows ONVIF metadata stream; use Ctrl+C to stop. Onvif(onvif::Opts), + /// Transport Stream + Ts(ts::Opts), } fn init_logging() -> mylog::Handle { @@ -85,5 +88,6 @@ async fn main_inner() -> Result<(), Error> { Cmd::Info(opts) => info::run(opts).await, Cmd::Mp4(opts) => mp4::run(opts).await, Cmd::Onvif(opts) => onvif::run(opts).await, + Cmd::Ts(opts) => ts::run(opts).await, } } diff --git a/examples/client/src/mp4.rs b/examples/client/src/mp4.rs index 534ac79..9c052f4 100644 --- a/examples/client/src/mp4.rs +++ b/examples/client/src/mp4.rs @@ -575,13 +575,16 @@ impl Mp4Writer { frame.loss(), self.allow_loss, )?; + self.mdat_pos = self .mdat_pos .checked_add(size) .ok_or_else(|| anyhow!("mdat_pos overflow"))?; + if frame.is_random_access_point() { self.video_sync_sample_nums.push(self.video_trak.samples); } + self.inner.write_all(frame.data()).await?; Ok(()) } diff --git a/examples/client/src/ts.rs b/examples/client/src/ts.rs new file mode 100644 index 0000000..fe15619 --- /dev/null +++ b/examples/client/src/ts.rs @@ -0,0 +1,915 @@ +// Copyright (C) 2021 Scott Lamb +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Proof-of-concept `.mp4` writer. +//! +//! This writes media data (`mdat`) to a stream, buffering parameters for a +//! `moov` atom at the end. This avoids the need to buffer the media data +//! (`mdat`) first or reserved a fixed size for the `moov`, but it will slow +//! playback, particularly when serving `.mp4` files remotely. +//! +//! For a more high-quality implementation, see [Moonfire NVR](https://github.com/scottlamb/moonfire-nvr). +//! It's better tested, places the `moov` atom at the start, can do HTTP range +//! serving for arbitrary time ranges, and supports standard and fragmented +//! `.mp4` files. +//! +//! See the BMFF spec, ISO/IEC 14496-12:2015: +//! https://github.com/scottlamb/moonfire-nvr/wiki/Standards-and-specifications +//! https://standards.iso.org/ittf/PubliclyAvailableStandards/c068960_ISO_IEC_14496-12_2015.zip + +use anyhow::{anyhow, bail, Error, Context}; +use log::{info, warn, debug}; +use bytes::{Buf, BufMut}; +use futures::{Future, StreamExt}; +use mpeg2ts::{ts::{ContinuityCounter, TsPacket, TsHeader, Pid, TsPayload, TsPacketWriter, WriteTsPacket, AdaptationField, payload}, time::{Timestamp, ClockReference}, pes::PesHeader, es::StreamId}; +use retina::{ + client::{SetupOptions, Transport}, + codec::{AudioParameters, CodecItem, VideoParameters, ParametersRef}, +}; + +use std::{num::NonZeroU32, io::Cursor, path::PathBuf, pin::Pin, sync::Arc}; + + +#[derive(thiserror::Error, Debug)] +pub enum TsError { + #[error("Failed to create TS file")] + FileCreationFailed(#[from] std::io::Error), + + #[error("Failed to write TS file")] + WriteError, + + #[error("Packet ID {0} is not valid")] + InvalidPacketId(u16), + + #[error("Invalid timestamp {0}")] + InvalidTimestamp(u64), + + #[error("Packet payload exceeded packet limit")] + PayloadTooBig, +} + +#[derive(structopt::StructOpt)] +pub struct Opts { + #[structopt(flatten)] + src: super::Source, + + /// Policy for handling the `rtptime` parameter normally seem in the `RTP-Info` header. + /// One of `default`, `require`, `ignore`, `permissive`. + #[structopt(default_value, long)] + initial_timestamp: retina::client::InitialTimestampPolicy, + + /// Don't attempt to include video streams. + #[structopt(long)] + no_video: bool, + + /// Don't attempt to include audio streams. + #[structopt(long)] + no_audio: bool, + + /// Allow lost packets mid-stream without aborting. + #[structopt(long)] + allow_loss: bool, + + /// When to issue a `TEARDOWN` request: `auto`, `always`, or `never`. + #[structopt(default_value, long)] + teardown: retina::client::TeardownPolicy, + + /// Duration after which to exit automatically, in seconds. + #[structopt(long, name = "secs")] + duration: Option, + + /// The transport to use: `tcp` or `udp` (experimental). + /// + /// Note: `--allow-loss` is strongly recommended with `udp`. + #[structopt(default_value, long)] + transport: retina::client::Transport, + + /// Path to `.mp4` file to write. + #[structopt(parse(try_from_str))] + out: PathBuf, +} + +const PMT_PID: u16 = 256; +const VIDEO_ES_PID: u16 = 257; +const AUDIO_ES_PID: u16 = 258; +const PES_VIDEO_STREAM_ID: u8 = 224; +const PES_AUDIO_STREAM_ID: u8 = 192; + +pub struct Mpeg2TsWriter { + video_continuity_counter: ContinuityCounter, + audio_continuity_counter: ContinuityCounter, + + packets: Vec, + + video_params: Vec, + audio_params: Option>, + + cur_video_params_sample_description_index: Option, + + video_last_pcr: Option, + audio_last_pcr: Option, + + inner: W, +} + +impl Mpeg2TsWriter { + pub async fn new( + audio_params: Option>, + inner: W, + ) -> Result { + Ok(Mpeg2TsWriter { + inner, + video_params: Vec::new(), + audio_params, + video_continuity_counter: Default::default(), + audio_continuity_counter: Default::default(), + packets: Default::default(), + cur_video_params_sample_description_index: None, + video_last_pcr: None, + audio_last_pcr: None, + }) + } + + pub async fn finish(mut self) -> Result<(), Error> { + let packets: Vec<_> = self.packets.drain(..).collect(); + let mut writer = TsPacketWriter::new(self.inner); + + writer + .write_ts_packet(&default_pat_packet()) + .map_err(|_| TsError::WriteError)?; + + writer + .write_ts_packet(&default_pmt_packet()) + .map_err(|_| TsError::WriteError)?; + + for packet in &packets { + writer + .write_ts_packet(packet) + .map_err(|_| TsError::WriteError)?; + } + + println!("FINISH"); + + Ok(()) + } + + fn push_video(&mut self, timestamp: Timestamp, keyframe: bool, video: Vec) -> Result<(), TsError> { + let mut header = default_ts_header(VIDEO_ES_PID)?; + header.continuity_counter = self.video_continuity_counter; + + let mut buf = Cursor::new(video); + let packet = { + let data = { + let pes_data = if buf.remaining() < 153 { + buf.chunk() + } else { + &buf.chunk()[..153] + }; + + make_raw_payload(pes_data)? + }; + buf.advance(data.len()); + + let adaptation_field = if keyframe { + Some(AdaptationField { + discontinuity_indicator: false, + random_access_indicator: true, + es_priority_indicator: false, + pcr: Some(ClockReference::from(timestamp)), + opcr: None, + splice_countdown: None, + transport_private_data: Vec::new(), + extension: None, + }) + } else { + None + }; + + TsPacket { + header: header.clone(), + adaptation_field, + payload: Some(TsPayload::Pes(payload::Pes { + header: PesHeader { + stream_id: StreamId::new(PES_VIDEO_STREAM_ID), + priority: false, + data_alignment_indicator: false, + copyright: false, + original_or_copy: false, + pts: Some(timestamp), + dts: None, + escr: None, + }, + pes_packet_len: 0, + data + })), + } + }; + + self.packets.push(packet); + header.continuity_counter.increment(); + + while buf.has_remaining() { + let raw_payload = { + let pes_data = if buf.remaining() < payload::Bytes::MAX_SIZE { + buf.chunk() + } else { + &buf.chunk()[..payload::Bytes::MAX_SIZE] + }; + make_raw_payload(&pes_data)? + }; + buf.advance(raw_payload.len()); + + let packet = TsPacket { + header: header.clone(), + adaptation_field: None, + payload: Some(TsPayload::Raw(raw_payload)) + }; + + self.packets.push(packet); + header.continuity_counter.increment(); + } + + self.video_continuity_counter = header.continuity_counter; + + Ok(()) + } + + async fn video( + &mut self, + stream: &retina::client::Stream, + frame: retina::codec::VideoFrame, + ) -> Result<(), Error> { + println!( + "{:?}({}): {}-byte video frame{}", + &frame.timestamp(), + &frame.timestamp(), + frame.data().remaining(), + if frame.is_random_access_point() { "*" } else { "" } + ); + + let sample_description_index = if let (Some(i), false) = ( + self.cur_video_params_sample_description_index, + frame.has_new_parameters(), + ) { + // Use the most recent sample description index for most frames, without having to + // scan through self.video_sample_index. + i + } else { + match stream.parameters() { + Some(ParametersRef::Video(params)) => { + log::info!("new video params: {:?}", params); + + let pos = self.video_params.iter().position(|p| p == params); + if let Some(pos) = pos { + u32::try_from(pos + 1)? + } else { + self.video_params.push(params.clone()); + u32::try_from(self.video_params.len())? + } + } + None => { + debug!("Discarding video frame received before parameters"); + return Ok(()); + } + _ => unreachable!(), + } + }; + self.cur_video_params_sample_description_index = Some(sample_description_index); + + let timestamp = make_timestamp(frame.timestamp().elapsed_secs() + 1.4)?; + let keyframe = frame.is_random_access_point(); + let data = convert_h264(frame)?; + + self.push_video(timestamp, keyframe, data)?; + + Ok(()) + } + + fn push_audio(&mut self, timestamp: Timestamp, audio: Vec) -> Result<(), TsError> { + let mut buf = Cursor::new(audio); + let data = { + let pes_data = if buf.remaining() < 153 { + buf.chunk() + } else { + &buf.chunk()[..153] + }; + make_raw_payload(&pes_data)? + }; + buf.advance(data.len()); + + let mut header = default_ts_header(AUDIO_ES_PID)?; + header.continuity_counter = self.audio_continuity_counter; + + let adaptation_field = if self.audio_continuity_counter.as_u8() % 4 == 0 { + let pcr = if let Some(last_ts) = self.video_last_pcr { + if timestamp != last_ts { + Some(ClockReference::from(timestamp)) + } else { + None + } + } else { + Some(ClockReference::from(timestamp)) + }; + + pcr.map(|pcr| AdaptationField { + discontinuity_indicator: false, + random_access_indicator: true, + es_priority_indicator: false, + pcr: Some(pcr), + opcr: None, + splice_countdown: None, + transport_private_data: Vec::new(), + extension: None, + }) + } else { + None + }; + + let packet = TsPacket { + header: header.clone(), + adaptation_field, + payload: Some(TsPayload::Pes(payload::Pes { + header: PesHeader { + stream_id: StreamId::new(PES_AUDIO_STREAM_ID), + priority: false, + data_alignment_indicator: false, + copyright: false, + original_or_copy: false, + pts: Some(timestamp), + dts: None, + escr: None, + }, + pes_packet_len: 0, + data + })), + }; + + self.packets.push(packet); + header.continuity_counter.increment(); + + while buf.has_remaining() { + let raw_payload = { + let pes_data = if buf.remaining() < payload::Bytes::MAX_SIZE { + buf.chunk() + } else { + &buf.chunk()[..payload::Bytes::MAX_SIZE] + }; + + make_raw_payload(&pes_data)? + }; + + buf.advance(raw_payload.len()); + + let packet = TsPacket { + header: header.clone(), + adaptation_field: None, + payload: Some(TsPayload::Raw(raw_payload)) + }; + + self.packets.push(packet); + header.continuity_counter.increment(); + } + + self.audio_continuity_counter = header.continuity_counter; + + Ok(()) + } + + async fn audio(&mut self, frame: retina::codec::AudioFrame) -> Result<(), Error> { + println!( + "{:?}({}): {}-byte audio frame", + &frame.timestamp(), + &frame.timestamp(), + frame.data().remaining(), + ); + + let timestamp = make_timestamp(frame.timestamp().elapsed_secs() + 1.4)?; + + if let Some(ap) = &self.audio_params { + let asc = AudioSpecificConfiguration::try_from(ap.extra_data())?; + let data = convert_aac(frame, asc)?; + self.push_audio(timestamp, data)?; + } + + Ok(()) + } +} + +/// Copies packets from `session` to `mp4` without handling any cleanup on error. +async fn copy<'a>( + opts: &'a Opts, + session: &'a mut retina::client::Demuxed, + stop_signal: Pin>>>, + ts_writer: &'a mut Mpeg2TsWriter, +) -> Result<(), Error> { + let sleep = match opts.duration { + Some(secs) => { + futures::future::Either::Left(tokio::time::sleep(std::time::Duration::from_secs(secs))) + } + None => futures::future::Either::Right(futures::future::pending()), + }; + + tokio::pin!(stop_signal); + tokio::pin!(sleep); + + loop { + tokio::select! { + pkt = session.next() => { + match pkt.ok_or_else(|| anyhow!("EOF"))?? { + CodecItem::VideoFrame(f) => { + let stream = &session.streams()[f.stream_id()]; + let start_ctx = *f.start_ctx(); + + ts_writer.video(stream, f).await.with_context( + || format!("Error processing video frame starting with {}", start_ctx))?; + }, + + CodecItem::AudioFrame(f) => { + let ctx = *f.ctx(); + + ts_writer.audio(f).await.with_context( + || format!("Error processing audio frame, {}", ctx))?; + }, + + CodecItem::Rtcp(rtcp) => { + println!("RTCP {:?}", rtcp); + + if let (Some(t), Some(Ok(Some(sr)))) = (rtcp.rtp_timestamp(), rtcp.pkts().next().map(retina::rtcp::PacketRef::as_sender_report)) { + println!(" {}: SR ts={}", t, sr.ntp_timestamp()); + } + }, + + CodecItem::MessageFrame(msg) => { + println!("MSG {:?}", msg); + }, + + _ => continue, + }; + }, + + _ = &mut stop_signal => { + info!("Stopping due to signal"); + break; + }, + + _ = &mut sleep => { + info!("Stopping after {} seconds", opts.duration.unwrap()); + break; + }, + } + } + Ok(()) +} + +/// Writes the `.ts`, including trying to finish or clean up the file. +async fn write_mpeg2ts( + opts: &Opts, + session: retina::client::Session, + audio_params: Option>, + stop_signal: Pin>>>, +) -> Result<(), Error> { + let mut session = session + .play( + retina::client::PlayOptions::default() + .initial_timestamp(opts.initial_timestamp) + .enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()), + ) + .await? + .demuxed()?; + + // Append into a filename suffixed with ".partial", then try to either rename it into + // place if it's complete or delete it otherwise. + const PARTIAL_SUFFIX: &str = ".partial"; + let mut tmp_filename = opts.out.as_os_str().to_owned(); + tmp_filename.push(PARTIAL_SUFFIX); // OsString::push doesn't put in a '/', unlike PathBuf::. + let tmp_filename: PathBuf = tmp_filename.into(); + + let out = std::fs::File::create(&tmp_filename)?; + let mut ts_writer = Mpeg2TsWriter::new(audio_params, out).await?; + + let result = copy(opts, &mut session, stop_signal, &mut ts_writer).await; + + if let Err(e) = result { + // Log errors about finishing, returning the original error. + if let Err(e) = ts_writer.finish().await { + log::error!(".mp4 finish failed: {}", e); + if let Err(e) = tokio::fs::remove_file(&tmp_filename).await { + log::error!("and removing .mp4 failed too: {}", e); + } + } else if let Err(e) = tokio::fs::rename(&tmp_filename, &opts.out).await { + log::error!("unable to move completed .mp4 into place: {}", e); + } + Err(e) + } else { + // Directly return errors about finishing. + if let Err(e) = ts_writer.finish().await { + log::error!(".mp4 finish failed: {}", e); + if let Err(e) = tokio::fs::remove_file(&tmp_filename).await { + log::error!("and removing .mp4 failed too: {}", e); + } + Err(e) + } else { + tokio::fs::rename(&tmp_filename, &opts.out).await?; + Ok(()) + } + } +} + +pub async fn run(opts: Opts) -> Result<(), Error> { + if matches!(opts.transport, Transport::Udp(_)) && !opts.allow_loss { + warn!("Using --transport=udp without strongly recommended --allow-loss!"); + } + + let creds = super::creds(opts.src.username.clone(), opts.src.password.clone()); + let stop_signal = Box::pin(tokio::signal::ctrl_c()); + let session_group = Arc::new(retina::client::SessionGroup::default()); + let mut session = retina::client::Session::describe( + opts.src.url.clone(), + retina::client::SessionOptions::default() + .creds(creds) + .session_group(session_group.clone()) + .user_agent("Retina TS example".to_owned()) + .teardown(opts.teardown), + ) + .await?; + + let video_stream_i = if !opts.no_video { + let s = session.streams().iter().position(|s| { + if s.media() == "video" { + if s.encoding_name() == "h264" { + log::info!("Using h264 video stream"); + return true; + } + + log::info!( + "Ignoring {} video stream because it's unsupported", + s.encoding_name(), + ); + } + + false + }); + + if s.is_none() { + log::info!("No suitable video stream found"); + } + + s + } else { + log::info!("Ignoring video streams (if any) because of --no-video"); + None + }; + + if let Some(i) = video_stream_i { + session + .setup(i, SetupOptions::default().transport(opts.transport.clone())) + .await?; + } + + let audio_stream = if !opts.no_audio { + let s = session + .streams() + .iter() + .enumerate() + .find_map(|(i, s)| match s.parameters() { + // Only consider audio streams that can produce a .mp4 sample + // entry. + Some(retina::codec::ParametersRef::Audio(a)) if a.sample_entry().is_some() => { + log::info!("Using {} audio stream (rfc 6381 codec {})", s.encoding_name(), a.rfc6381_codec().unwrap()); + log::info!("{:?}", a); + + Some((i, Box::new(a.clone()))) + } + + _ if s.media() == "audio" => { + log::info!("Ignoring {} audio stream because it can't be placed into a .mp4 file without transcoding", s.encoding_name()); + None + } + _ => None, + }); + + if s.is_none() { + log::info!("No suitable audio stream found"); + } + + s + } else { + log::info!("Ignoring audio streams (if any) because of --no-audio"); + None + }; + + if let Some((i, _)) = audio_stream { + session + .setup(i, SetupOptions::default().transport(opts.transport.clone())) + .await?; + } + + if video_stream_i.is_none() && audio_stream.is_none() { + bail!("Exiting because no video or audio stream was selected; see info log messages above"); + } + + let result = write_mpeg2ts(&opts, session, audio_stream.map(|(_i, p)| p), stop_signal).await; + + // Session has now been dropped, on success or failure. A TEARDOWN should + // be pending if necessary. session_group.await_teardown() will wait for it. + if let Err(e) = session_group.await_teardown().await { + log::error!("TEARDOWN failed: {}", e); + } + + result +} + +fn make_raw_payload(pes_data: &[u8]) -> Result { + mpeg2ts::ts::payload::Bytes::new(&pes_data) + .map_err(|_| TsError::PayloadTooBig) +} + +fn make_timestamp(ts: f64) -> Result { + let int_ts = (ts * 90_000.0).round() as u64; + + Timestamp::new(int_ts) + .map_err(|_| TsError::InvalidTimestamp(int_ts)) +} + +fn default_ts_header(pid: u16) -> Result { + use mpeg2ts::ts::TransportScramblingControl; + + Ok(TsHeader { + transport_error_indicator: false, + transport_priority: false, + pid: Pid::new(pid).map_err(|_| TsError::InvalidPacketId(pid))?, + transport_scrambling_control: TransportScramblingControl::NotScrambled, + continuity_counter: ContinuityCounter::new(), + }) +} + +fn default_pat_packet() -> TsPacket { + use mpeg2ts::ts::{VersionNumber, payload::Pat, ProgramAssociation}; + + TsPacket { + header: default_ts_header(0).unwrap(), + adaptation_field: None, + payload: Some( + TsPayload::Pat(Pat { + transport_stream_id: 1, + version_number: VersionNumber::default(), + table: vec![ + ProgramAssociation { + program_num: 1, + program_map_pid: Pid::new(PMT_PID).unwrap(), + } + ] + })), + } +} + +fn default_pmt_packet() -> TsPacket { + use mpeg2ts::{ + ts::{VersionNumber, payload::Pmt, EsInfo}, + es::StreamType, + }; + + TsPacket { + header: default_ts_header(PMT_PID).unwrap(), + adaptation_field: None, + payload: Some( + TsPayload::Pmt(Pmt { + program_num: 1, + pcr_pid: Some(Pid::new(VIDEO_ES_PID).unwrap()), + version_number: VersionNumber::default(), + table: vec![ + EsInfo { + stream_type: StreamType::H264, + elementary_pid: Pid::new(VIDEO_ES_PID).unwrap(), + descriptors: vec![], + }, + EsInfo { + stream_type: StreamType::AdtsAac, + elementary_pid: Pid::new(AUDIO_ES_PID).unwrap(), + descriptors: vec![], + } + ] + })), + } +} + +#[derive(Debug, thiserror::Error)] +pub enum AacError { + #[error("AAC coder not initialized")] + NotInitialized, + + #[error("Not enough data: {0}")] + NotEnoughData(&'static str), + + #[error("Unsupported audio object type")] + UnsupportedAudioFormat, + + #[error("Reserved or unsupported frequency index {0}")] + UnsupportedFrequencyIndex(u8), + + #[error("Reserved or unsupported channel configuration {0}")] + UnsupportedChannelConfiguration(u8), + + #[error("Got forbidden sampling frequency index {0}")] + ForbiddenSamplingFrequencyIndex(u8), +} + +#[derive(Debug, Clone, Copy)] +pub struct SamplingFrequencyIndex(u8); + +impl From for u8 { + fn from(val: SamplingFrequencyIndex) -> Self { + val.0 + } +} + +impl TryFrom for SamplingFrequencyIndex { + type Error = AacError; + + fn try_from(val: u8) -> Result { + match val { + 0..=12 | 15 => Ok(Self(val)), + _ => Err(AacError::UnsupportedFrequencyIndex(val)), + } + } +} + + +#[derive(Debug, Clone, Copy)] +pub struct ChannelConfiguration(u8); + +impl From for u8 { + fn from(val: ChannelConfiguration) -> Self { + val.0 + } +} + +impl TryFrom for ChannelConfiguration { + type Error = AacError; + + fn try_from(val: u8) -> Result { + match val { + 0..=7 => Ok(Self(val)), + _ => Err(AacError::UnsupportedChannelConfiguration(val)), + } + } +} + + +// See [MPEG-4 Audio Object Types][audio_object_types] +// +// [audio_object_types]: https://en.wikipedia.org/wiki/MPEG-4_Part_3#MPEG-4_Audio_Object_Types +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum AudioObjectType { + AacMain = 1, + AacLowComplexity = 2, + AacScalableSampleRate = 3, + AacLongTermPrediction = 4, +} + +impl TryFrom for AudioObjectType { + type Error = AacError; + + fn try_from(value: u8) -> Result { + Ok(match value { + 1 => Self::AacMain, + 2 => Self::AacLowComplexity, + 3 => Self::AacScalableSampleRate, + 4 => Self::AacLongTermPrediction, + _ => return Err(AacError::UnsupportedAudioFormat) + }) + } +} + + + +#[derive(Debug, Clone)] +pub struct AudioSpecificConfiguration { + pub object_type: AudioObjectType, + pub sampling_frequency_index: SamplingFrequencyIndex, + pub sampling_frequency: Option, + pub channel_configuration: ChannelConfiguration, + pub frame_length_flag: bool, + pub depends_on_core_coder: bool, + pub extension_flag: bool, +} + +impl TryFrom<&[u8]> for AudioSpecificConfiguration { + type Error = AacError; + + fn try_from(val: &[u8]) -> Result { + if val.len() < 2 { + return Err(AacError::NotEnoughData("AAC audio specific config")); + } + + let mut buf = Cursor::new(val); + + let header_a = buf.get_u8(); + let header_b = buf.get_u8(); + + let object_type = AudioObjectType::try_from((header_a & 0xF8) >> 3)?; + + let sf_idx = ((header_a & 0x07) << 1) | (header_b >> 7); + let sampling_frequency_index = SamplingFrequencyIndex::try_from(sf_idx)?; + + let channel_configuration = ChannelConfiguration::try_from((header_b >> 3) & 0x0F)?; + let frame_length_flag = (header_b & 0x04) == 0x04; + let depends_on_core_coder = (header_b & 0x02) == 0x02; + let extension_flag = (header_b & 0x01) == 0x01; + + Ok(Self { + object_type, + sampling_frequency_index, + sampling_frequency: None, + channel_configuration, + frame_length_flag, + depends_on_core_coder, + extension_flag, + }) + } +} + +fn convert_aac(frame: retina::codec::AudioFrame, ap: AudioSpecificConfiguration) -> Result, Error> { + const SYNCWORD: u16 = 0xFFF0; + const PROTECTION_ABSENCE: u16 = 0x0001; + + let object_type = ap.object_type as u8; + let sampling_frequency_index = u8::from(ap.sampling_frequency_index) << 2; + let channel_configuration: u8 = ap.channel_configuration.into(); + + let payload = frame.data().to_vec(); + // Profile (2 bits = 0), sampling frequency index (4 bits), + // private (1 bit = 0) and channel configuration (1 bit) + let profile = (object_type - 1) << 6; + + if sampling_frequency_index == 0x0F { + bail!("ForbiddenSamplingFrequencyIndex({})",sampling_frequency_index ); + // return Err(Error::AacError::ForbiddenSamplingFrequencyIndex(sampling_frequency_index)); + } + + // Channel configuration cont. (2 bits), originality (1 bit = 0), + // home (1 bit = 0), copyrighted id (1 bit = 0) + // copyright id start (1 bit = 0) and frame length (2 bits) + let channel_configuration1 = (channel_configuration & 0x07) >> 2; + let channel_configuration2 = (channel_configuration & 0x03) << 6; + + // Header is 7 bytes long if protection is absent, + // 9 bytes otherwise (CRC requires 2 bytes). + let frame_length = (payload.len() + 7) as u16; + let frame_length1 = ((frame_length & 0x1FFF) >> 11) as u8; + + // Frame length cont. (11 bits) and buffer fullness (5 bits) + let frame_length2 = ((frame_length & 0x7FF) << 5) as u16; + + + let mut out = Vec::with_capacity(56 + payload.len()); + + // Syncword (12 bits), MPEG version (1 bit = 0), + // layer (2 bits = 0) and protection absence (1 bit = 1) + out.put_u16(SYNCWORD | PROTECTION_ABSENCE); + out.put_u8(profile | sampling_frequency_index | channel_configuration1); + out.put_u8(channel_configuration2 | frame_length1); + out.put_u16(frame_length2 | 0b0000_0000_0001_1111); + + // Buffer fullness cont. (6 bits) and number of AAC frames minus one (2 bits = 0) + out.put_u8(0b1111_1100); + out.extend(payload); + + Ok(out) +} + +/// Converts from AVC representation to the Annex B representation expected by webrtc-rs. +fn convert_h264(frame: retina::codec::VideoFrame) -> Result, Error> { + // TODO: + // * For each IDR frame, copy the SPS and PPS from the stream's + // parameters, rather than depend on it being present in the frame + // already. In-band parameters aren't guaranteed. This is awkward + // with h264_reader v0.5's h264_reader::avcc::AvcDecoderRecord because it + // strips off the NAL header byte from each parameter. The next major + // version shouldn't do this. + // * Copy only the slice data. In particular, don't copy SEI, which confuses + // Safari: + + let mut data = frame.into_data(); + let mut i = 0; + while i < data.len() - 3 { + // Replace each NAL's length with the Annex B start code b"\x00\x00\x00\x01". + let len = u32::from_be_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]) as usize; + data[i] = 0; + data[i + 1] = 0; + data[i + 2] = 0; + data[i + 3] = 1; + i += 4 + len; + + if i > data.len() { + bail!("partial NAL body"); + } + } + + if i < data.len() { + bail!("partial NAL length"); + } + + Ok(data) +} diff --git a/examples/webrtc-proxy/src/main.rs b/examples/webrtc-proxy/src/main.rs index fc55edd..ebbadb7 100644 --- a/examples/webrtc-proxy/src/main.rs +++ b/examples/webrtc-proxy/src/main.rs @@ -104,6 +104,7 @@ async fn run() -> Result<(), Error> { }; let stop_signal = tokio::signal::ctrl_c(); tokio::pin!(stop_signal); + let upstream_session_group = Arc::new(retina::client::SessionGroup::default()); let mut upstream_session = retina::client::Session::describe( opts.url.clone(), @@ -177,6 +178,7 @@ async fn run() -> Result<(), Error> { format!("{}-video", i), "retina-webrtc-proxy".to_owned(), )); + let sender = downstream_conn .add_track(Arc::clone(&track) as Arc) .await?; @@ -192,6 +194,7 @@ async fn run() -> Result<(), Error> { if tracks.len() <= i { tracks.resize(i + 1, None); } + tracks[i] = Some(track); } @@ -220,7 +223,9 @@ async fn run() -> Result<(), Error> { Box::pin(async {}) })) .await; + tokio::pin!(ice_conn_state_rx); + let (peer_conn_state_tx, peer_conn_state_rx) = tokio::sync::mpsc::unbounded_channel(); downstream_conn .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| { @@ -228,20 +233,25 @@ async fn run() -> Result<(), Error> { Box::pin(async {}) })) .await; + tokio::pin!(peer_conn_state_rx); println!("Navigate to https://jsfiddle.net/9s10amwL/ in your browser."); println!("Paste from the 'Browser base64 Session description' box to here:"); + let offer = read_offer()?; println!(); + downstream_conn.set_remote_description(offer).await?; let answer = downstream_conn.create_answer(None).await?; + downstream_conn.set_local_description(answer).await?; downstream_conn .gathering_complete_promise() .await .recv() .await; + if let Some(local_desc) = downstream_conn.local_description().await { println!("Paste from here to the 'Golang base64 Session Description' box:"); println!("{}", base64::encode(serde_json::to_string(&local_desc)?)); diff --git a/player/app.js b/player/app.js new file mode 100644 index 0000000..3cf3adb --- /dev/null +++ b/player/app.js @@ -0,0 +1,49 @@ +/* eslint-env browser */ + +let pc = new RTCPeerConnection({ + iceServers: [ + { + urls: 'stun:stun.l.google.com:19302' + } + ] +}) + +let log = msg => { + document.getElementById('div').innerHTML += msg + '
' +} + +pc.ontrack = function (event) { + var el = document.createElement(event.track.kind) + el.srcObject = event.streams[0] + el.autoplay = true + el.controls = true + + document.getElementById('remoteVideos').appendChild(el) +} + +pc.oniceconnectionstatechange = e => log(pc.iceConnectionState) +pc.onicecandidate = event => { + if (event.candidate === null) { + document.getElementById('localSessionDescription').value = btoa(JSON.stringify(pc.localDescription)) + } +} + +// Offer to receive 1 audio, and 1 video track +pc.addTransceiver('video', {'direction': 'sendrecv'}) +pc.addTransceiver('audio', {'direction': 'sendrecv'}) + +pc.createOffer().then(d => pc.setLocalDescription(d)).catch(log) + +window.startSession = () => { + let sd = document.getElementById('remoteSessionDescription').value + + if (sd === '') { + return alert('Session Description must not be empty') + } + + try { + pc.setRemoteDescription(new RTCSessionDescription(JSON.parse(atob(sd)))) + } catch (e) { + alert(e) + } +} diff --git a/player/index.html b/player/index.html new file mode 100644 index 0000000..8bb2a49 --- /dev/null +++ b/player/index.html @@ -0,0 +1,26 @@ + + + + +