Add H265 Codec

This commit is contained in:
Andrey Tkachenko 2022-11-14 18:49:42 +04:00
parent 11e4a7335f
commit 6df7684384
18 changed files with 1526 additions and 32 deletions

109
Cargo.lock generated
View File

@ -116,6 +116,12 @@ version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704" checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704"
[[package]]
name = "arc-swap"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.5.2" version = "0.5.2"
@ -364,9 +370,11 @@ dependencies = [
"futures", "futures",
"itertools", "itertools",
"log", "log",
"mpeg2ts",
"mylog", "mylog",
"retina", "retina",
"structopt", "structopt",
"thiserror",
"tokio", "tokio",
"url", "url",
] ]
@ -1159,7 +1167,7 @@ dependencies = [
"tokio", "tokio",
"waitgroup", "waitgroup",
"webrtc-srtp", "webrtc-srtp",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -1311,6 +1319,16 @@ dependencies = [
"four-cc", "four-cc",
] ]
[[package]]
name = "mpeg2ts"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a38c92bbb0d15f62ed915c392f2d675e3c0889c98be7ad418ae660bfcb7f3b2"
dependencies = [
"byteorder",
"trackable 0.2.24",
]
[[package]] [[package]]
name = "mpeg4-audio-const" name = "mpeg4-audio-const"
version = "0.2.0" version = "0.2.0"
@ -1786,6 +1804,7 @@ checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
name = "retina" name = "retina"
version = "0.4.3" version = "0.4.3"
dependencies = [ dependencies = [
"arc-swap",
"base64", "base64",
"bitstream-io", "bitstream-io",
"bytes", "bytes",
@ -1800,6 +1819,7 @@ dependencies = [
"pin-project", "pin-project",
"pretty-hex", "pretty-hex",
"rand", "rand",
"rtp",
"rtsp-types", "rtsp-types",
"sdp-types", "sdp-types",
"smallvec", "smallvec",
@ -1855,20 +1875,20 @@ checksum = "80d9625e47edb43aca711ec826ad12154d364ada9e60f4e6f8d40471b3e1e156"
dependencies = [ dependencies = [
"bytes", "bytes",
"thiserror", "thiserror",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
name = "rtp" name = "rtp"
version = "0.6.5" version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5110c12c9f7d1e76eba80076cce4ccb82ee085bd10a62472468de0663240f8b5" checksum = "68ecd5b57967801ff3616239508be0ecc50c6a10a9ca0716475002bf9dc44210"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"rand", "rand",
"thiserror", "thiserror",
"webrtc-util", "webrtc-util 0.6.0",
] ]
[[package]] [[package]]
@ -2237,7 +2257,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"url", "url",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2295,18 +2315,18 @@ dependencies = [
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.31" version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.31" version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2414,6 +2434,35 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "trackable"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b98abb9e7300b9ac902cc04920945a874c1973e08c310627cc4458c04b70dd32"
dependencies = [
"trackable 1.2.0",
"trackable_derive",
]
[[package]]
name = "trackable"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "017e2a1a93718e4e8386d037cfb8add78f1d690467f4350fb582f55af1203167"
dependencies = [
"trackable_derive",
]
[[package]]
name = "trackable_derive"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebeb235c5847e2f82cfe0f07eb971d1e5f6804b18dac2ae16349cc604380f82f"
dependencies = [
"quote",
"syn",
]
[[package]] [[package]]
name = "turn" name = "turn"
version = "0.5.4" version = "0.5.4"
@ -2429,7 +2478,7 @@ dependencies = [
"stun", "stun",
"thiserror", "thiserror",
"tokio", "tokio",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2680,7 +2729,7 @@ dependencies = [
"webrtc-media", "webrtc-media",
"webrtc-sctp", "webrtc-sctp",
"webrtc-srtp", "webrtc-srtp",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2695,7 +2744,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"webrtc-sctp", "webrtc-sctp",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2731,7 +2780,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"webpki", "webpki",
"webrtc-util", "webrtc-util 0.5.4",
"x25519-dalek", "x25519-dalek",
"x509-parser 0.9.2", "x509-parser 0.9.2",
] ]
@ -2754,7 +2803,7 @@ dependencies = [
"uuid", "uuid",
"waitgroup", "waitgroup",
"webrtc-mdns", "webrtc-mdns",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2767,7 +2816,7 @@ dependencies = [
"socket2", "socket2",
"thiserror", "thiserror",
"tokio", "tokio",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2783,7 +2832,7 @@ dependencies = [
"rand", "rand",
"rtp", "rtp",
"thiserror", "thiserror",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2818,7 +2867,7 @@ dependencies = [
"rand", "rand",
"thiserror", "thiserror",
"tokio", "tokio",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2842,7 +2891,7 @@ dependencies = [
"subtle", "subtle",
"thiserror", "thiserror",
"tokio", "tokio",
"webrtc-util", "webrtc-util 0.5.4",
] ]
[[package]] [[package]]
@ -2867,6 +2916,28 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "webrtc-util"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4921b6a976b5570004e9c1b29ae109a81a73e2370e80627efa315f9ad0105f43"
dependencies = [
"async-trait",
"bitflags",
"bytes",
"cc",
"ipnet",
"lazy_static",
"libc",
"log",
"nix 0.24.1",
"parking_lot",
"rand",
"thiserror",
"tokio",
"winapi",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

View File

@ -16,6 +16,7 @@ include = ["src/**/*", "benches", "Cargo.toml"]
rust-version = "1.59" rust-version = "1.59"
[dependencies] [dependencies]
arc-swap = "1.5.1"
base64 = "0.13.0" base64 = "0.13.0"
bitstream-io = "1.1" bitstream-io = "1.1"
bytes = "1.0.1" bytes = "1.0.1"
@ -28,6 +29,7 @@ once_cell = "1.7.2"
pin-project = "1.0.7" pin-project = "1.0.7"
pretty-hex = "0.3.0" pretty-hex = "0.3.0"
rand = "0.8.3" rand = "0.8.3"
rtp = "0.6.7"
rtsp-types = "0.0.3" rtsp-types = "0.0.3"
sdp-types = "0.1.4" sdp-types = "0.1.4"
smallvec = { version = "1.6.1", features = ["union"] } smallvec = { version = "1.6.1", features = ["union"] }

View File

@ -16,3 +16,5 @@ anyhow = "1.0.41"
itertools = "0.10.1" itertools = "0.10.1"
mylog = { git = "https://github.com/scottlamb/mylog" } mylog = { git = "https://github.com/scottlamb/mylog" }
structopt = "0.3.21" structopt = "0.3.21"
mpeg2ts = "0.1.1"
thiserror = "1.0.37"

View File

@ -6,6 +6,7 @@
mod info; mod info;
mod mp4; mod mp4;
mod onvif; mod onvif;
mod ts;
use anyhow::Error; use anyhow::Error;
use log::{error, info}; use log::{error, info};
@ -35,6 +36,8 @@ enum Cmd {
Mp4(mp4::Opts), Mp4(mp4::Opts),
/// Follows ONVIF metadata stream; use Ctrl+C to stop. /// Follows ONVIF metadata stream; use Ctrl+C to stop.
Onvif(onvif::Opts), Onvif(onvif::Opts),
/// Transport Stream
Ts(ts::Opts),
} }
fn init_logging() -> mylog::Handle { fn init_logging() -> mylog::Handle {
@ -85,5 +88,6 @@ async fn main_inner() -> Result<(), Error> {
Cmd::Info(opts) => info::run(opts).await, Cmd::Info(opts) => info::run(opts).await,
Cmd::Mp4(opts) => mp4::run(opts).await, Cmd::Mp4(opts) => mp4::run(opts).await,
Cmd::Onvif(opts) => onvif::run(opts).await, Cmd::Onvif(opts) => onvif::run(opts).await,
Cmd::Ts(opts) => ts::run(opts).await,
} }
} }

View File

@ -575,13 +575,16 @@ impl<W: AsyncWrite + AsyncSeek + Send + Unpin> Mp4Writer<W> {
frame.loss(), frame.loss(),
self.allow_loss, self.allow_loss,
)?; )?;
self.mdat_pos = self self.mdat_pos = self
.mdat_pos .mdat_pos
.checked_add(size) .checked_add(size)
.ok_or_else(|| anyhow!("mdat_pos overflow"))?; .ok_or_else(|| anyhow!("mdat_pos overflow"))?;
if frame.is_random_access_point() { if frame.is_random_access_point() {
self.video_sync_sample_nums.push(self.video_trak.samples); self.video_sync_sample_nums.push(self.video_trak.samples);
} }
self.inner.write_all(frame.data()).await?; self.inner.write_all(frame.data()).await?;
Ok(()) Ok(())
} }

915
examples/client/src/ts.rs Normal file
View File

@ -0,0 +1,915 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
//! Proof-of-concept `.mp4` writer.
//!
//! This writes media data (`mdat`) to a stream, buffering parameters for a
//! `moov` atom at the end. This avoids the need to buffer the media data
//! (`mdat`) first or reserved a fixed size for the `moov`, but it will slow
//! playback, particularly when serving `.mp4` files remotely.
//!
//! For a more high-quality implementation, see [Moonfire NVR](https://github.com/scottlamb/moonfire-nvr).
//! It's better tested, places the `moov` atom at the start, can do HTTP range
//! serving for arbitrary time ranges, and supports standard and fragmented
//! `.mp4` files.
//!
//! See the BMFF spec, ISO/IEC 14496-12:2015:
//! https://github.com/scottlamb/moonfire-nvr/wiki/Standards-and-specifications
//! https://standards.iso.org/ittf/PubliclyAvailableStandards/c068960_ISO_IEC_14496-12_2015.zip
use anyhow::{anyhow, bail, Error, Context};
use log::{info, warn, debug};
use bytes::{Buf, BufMut};
use futures::{Future, StreamExt};
use mpeg2ts::{ts::{ContinuityCounter, TsPacket, TsHeader, Pid, TsPayload, TsPacketWriter, WriteTsPacket, AdaptationField, payload}, time::{Timestamp, ClockReference}, pes::PesHeader, es::StreamId};
use retina::{
client::{SetupOptions, Transport},
codec::{AudioParameters, CodecItem, VideoParameters, ParametersRef},
};
use std::{num::NonZeroU32, io::Cursor, path::PathBuf, pin::Pin, sync::Arc};
#[derive(thiserror::Error, Debug)]
pub enum TsError {
#[error("Failed to create TS file")]
FileCreationFailed(#[from] std::io::Error),
#[error("Failed to write TS file")]
WriteError,
#[error("Packet ID {0} is not valid")]
InvalidPacketId(u16),
#[error("Invalid timestamp {0}")]
InvalidTimestamp(u64),
#[error("Packet payload exceeded packet limit")]
PayloadTooBig,
}
#[derive(structopt::StructOpt)]
pub struct Opts {
#[structopt(flatten)]
src: super::Source,
/// Policy for handling the `rtptime` parameter normally seem in the `RTP-Info` header.
/// One of `default`, `require`, `ignore`, `permissive`.
#[structopt(default_value, long)]
initial_timestamp: retina::client::InitialTimestampPolicy,
/// Don't attempt to include video streams.
#[structopt(long)]
no_video: bool,
/// Don't attempt to include audio streams.
#[structopt(long)]
no_audio: bool,
/// Allow lost packets mid-stream without aborting.
#[structopt(long)]
allow_loss: bool,
/// When to issue a `TEARDOWN` request: `auto`, `always`, or `never`.
#[structopt(default_value, long)]
teardown: retina::client::TeardownPolicy,
/// Duration after which to exit automatically, in seconds.
#[structopt(long, name = "secs")]
duration: Option<u64>,
/// The transport to use: `tcp` or `udp` (experimental).
///
/// Note: `--allow-loss` is strongly recommended with `udp`.
#[structopt(default_value, long)]
transport: retina::client::Transport,
/// Path to `.mp4` file to write.
#[structopt(parse(try_from_str))]
out: PathBuf,
}
const PMT_PID: u16 = 256;
const VIDEO_ES_PID: u16 = 257;
const AUDIO_ES_PID: u16 = 258;
const PES_VIDEO_STREAM_ID: u8 = 224;
const PES_AUDIO_STREAM_ID: u8 = 192;
pub struct Mpeg2TsWriter<W: std::io::Write + std::io::Seek + Send + Unpin> {
video_continuity_counter: ContinuityCounter,
audio_continuity_counter: ContinuityCounter,
packets: Vec<TsPacket>,
video_params: Vec<VideoParameters>,
audio_params: Option<Box<AudioParameters>>,
cur_video_params_sample_description_index: Option<u32>,
video_last_pcr: Option<Timestamp>,
audio_last_pcr: Option<Timestamp>,
inner: W,
}
impl<W: std::io::Write + std::io::Seek + Send + Unpin> Mpeg2TsWriter<W> {
pub async fn new(
audio_params: Option<Box<AudioParameters>>,
inner: W,
) -> Result<Self, Error> {
Ok(Mpeg2TsWriter {
inner,
video_params: Vec::new(),
audio_params,
video_continuity_counter: Default::default(),
audio_continuity_counter: Default::default(),
packets: Default::default(),
cur_video_params_sample_description_index: None,
video_last_pcr: None,
audio_last_pcr: None,
})
}
pub async fn finish(mut self) -> Result<(), Error> {
let packets: Vec<_> = self.packets.drain(..).collect();
let mut writer = TsPacketWriter::new(self.inner);
writer
.write_ts_packet(&default_pat_packet())
.map_err(|_| TsError::WriteError)?;
writer
.write_ts_packet(&default_pmt_packet())
.map_err(|_| TsError::WriteError)?;
for packet in &packets {
writer
.write_ts_packet(packet)
.map_err(|_| TsError::WriteError)?;
}
println!("FINISH");
Ok(())
}
fn push_video(&mut self, timestamp: Timestamp, keyframe: bool, video: Vec<u8>) -> Result<(), TsError> {
let mut header = default_ts_header(VIDEO_ES_PID)?;
header.continuity_counter = self.video_continuity_counter;
let mut buf = Cursor::new(video);
let packet = {
let data = {
let pes_data = if buf.remaining() < 153 {
buf.chunk()
} else {
&buf.chunk()[..153]
};
make_raw_payload(pes_data)?
};
buf.advance(data.len());
let adaptation_field = if keyframe {
Some(AdaptationField {
discontinuity_indicator: false,
random_access_indicator: true,
es_priority_indicator: false,
pcr: Some(ClockReference::from(timestamp)),
opcr: None,
splice_countdown: None,
transport_private_data: Vec::new(),
extension: None,
})
} else {
None
};
TsPacket {
header: header.clone(),
adaptation_field,
payload: Some(TsPayload::Pes(payload::Pes {
header: PesHeader {
stream_id: StreamId::new(PES_VIDEO_STREAM_ID),
priority: false,
data_alignment_indicator: false,
copyright: false,
original_or_copy: false,
pts: Some(timestamp),
dts: None,
escr: None,
},
pes_packet_len: 0,
data
})),
}
};
self.packets.push(packet);
header.continuity_counter.increment();
while buf.has_remaining() {
let raw_payload = {
let pes_data = if buf.remaining() < payload::Bytes::MAX_SIZE {
buf.chunk()
} else {
&buf.chunk()[..payload::Bytes::MAX_SIZE]
};
make_raw_payload(&pes_data)?
};
buf.advance(raw_payload.len());
let packet = TsPacket {
header: header.clone(),
adaptation_field: None,
payload: Some(TsPayload::Raw(raw_payload))
};
self.packets.push(packet);
header.continuity_counter.increment();
}
self.video_continuity_counter = header.continuity_counter;
Ok(())
}
async fn video(
&mut self,
stream: &retina::client::Stream,
frame: retina::codec::VideoFrame,
) -> Result<(), Error> {
println!(
"{:?}({}): {}-byte video frame{}",
&frame.timestamp(),
&frame.timestamp(),
frame.data().remaining(),
if frame.is_random_access_point() { "*" } else { "" }
);
let sample_description_index = if let (Some(i), false) = (
self.cur_video_params_sample_description_index,
frame.has_new_parameters(),
) {
// Use the most recent sample description index for most frames, without having to
// scan through self.video_sample_index.
i
} else {
match stream.parameters() {
Some(ParametersRef::Video(params)) => {
log::info!("new video params: {:?}", params);
let pos = self.video_params.iter().position(|p| p == params);
if let Some(pos) = pos {
u32::try_from(pos + 1)?
} else {
self.video_params.push(params.clone());
u32::try_from(self.video_params.len())?
}
}
None => {
debug!("Discarding video frame received before parameters");
return Ok(());
}
_ => unreachable!(),
}
};
self.cur_video_params_sample_description_index = Some(sample_description_index);
let timestamp = make_timestamp(frame.timestamp().elapsed_secs() + 1.4)?;
let keyframe = frame.is_random_access_point();
let data = convert_h264(frame)?;
self.push_video(timestamp, keyframe, data)?;
Ok(())
}
fn push_audio(&mut self, timestamp: Timestamp, audio: Vec<u8>) -> Result<(), TsError> {
let mut buf = Cursor::new(audio);
let data = {
let pes_data = if buf.remaining() < 153 {
buf.chunk()
} else {
&buf.chunk()[..153]
};
make_raw_payload(&pes_data)?
};
buf.advance(data.len());
let mut header = default_ts_header(AUDIO_ES_PID)?;
header.continuity_counter = self.audio_continuity_counter;
let adaptation_field = if self.audio_continuity_counter.as_u8() % 4 == 0 {
let pcr = if let Some(last_ts) = self.video_last_pcr {
if timestamp != last_ts {
Some(ClockReference::from(timestamp))
} else {
None
}
} else {
Some(ClockReference::from(timestamp))
};
pcr.map(|pcr| AdaptationField {
discontinuity_indicator: false,
random_access_indicator: true,
es_priority_indicator: false,
pcr: Some(pcr),
opcr: None,
splice_countdown: None,
transport_private_data: Vec::new(),
extension: None,
})
} else {
None
};
let packet = TsPacket {
header: header.clone(),
adaptation_field,
payload: Some(TsPayload::Pes(payload::Pes {
header: PesHeader {
stream_id: StreamId::new(PES_AUDIO_STREAM_ID),
priority: false,
data_alignment_indicator: false,
copyright: false,
original_or_copy: false,
pts: Some(timestamp),
dts: None,
escr: None,
},
pes_packet_len: 0,
data
})),
};
self.packets.push(packet);
header.continuity_counter.increment();
while buf.has_remaining() {
let raw_payload = {
let pes_data = if buf.remaining() < payload::Bytes::MAX_SIZE {
buf.chunk()
} else {
&buf.chunk()[..payload::Bytes::MAX_SIZE]
};
make_raw_payload(&pes_data)?
};
buf.advance(raw_payload.len());
let packet = TsPacket {
header: header.clone(),
adaptation_field: None,
payload: Some(TsPayload::Raw(raw_payload))
};
self.packets.push(packet);
header.continuity_counter.increment();
}
self.audio_continuity_counter = header.continuity_counter;
Ok(())
}
async fn audio(&mut self, frame: retina::codec::AudioFrame) -> Result<(), Error> {
println!(
"{:?}({}): {}-byte audio frame",
&frame.timestamp(),
&frame.timestamp(),
frame.data().remaining(),
);
let timestamp = make_timestamp(frame.timestamp().elapsed_secs() + 1.4)?;
if let Some(ap) = &self.audio_params {
let asc = AudioSpecificConfiguration::try_from(ap.extra_data())?;
let data = convert_aac(frame, asc)?;
self.push_audio(timestamp, data)?;
}
Ok(())
}
}
/// Copies packets from `session` to `mp4` without handling any cleanup on error.
async fn copy<'a>(
opts: &'a Opts,
session: &'a mut retina::client::Demuxed,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
ts_writer: &'a mut Mpeg2TsWriter<std::fs::File>,
) -> Result<(), Error> {
let sleep = match opts.duration {
Some(secs) => {
futures::future::Either::Left(tokio::time::sleep(std::time::Duration::from_secs(secs)))
}
None => futures::future::Either::Right(futures::future::pending()),
};
tokio::pin!(stop_signal);
tokio::pin!(sleep);
loop {
tokio::select! {
pkt = session.next() => {
match pkt.ok_or_else(|| anyhow!("EOF"))?? {
CodecItem::VideoFrame(f) => {
let stream = &session.streams()[f.stream_id()];
let start_ctx = *f.start_ctx();
ts_writer.video(stream, f).await.with_context(
|| format!("Error processing video frame starting with {}", start_ctx))?;
},
CodecItem::AudioFrame(f) => {
let ctx = *f.ctx();
ts_writer.audio(f).await.with_context(
|| format!("Error processing audio frame, {}", ctx))?;
},
CodecItem::Rtcp(rtcp) => {
println!("RTCP {:?}", rtcp);
if let (Some(t), Some(Ok(Some(sr)))) = (rtcp.rtp_timestamp(), rtcp.pkts().next().map(retina::rtcp::PacketRef::as_sender_report)) {
println!(" {}: SR ts={}", t, sr.ntp_timestamp());
}
},
CodecItem::MessageFrame(msg) => {
println!("MSG {:?}", msg);
},
_ => continue,
};
},
_ = &mut stop_signal => {
info!("Stopping due to signal");
break;
},
_ = &mut sleep => {
info!("Stopping after {} seconds", opts.duration.unwrap());
break;
},
}
}
Ok(())
}
/// Writes the `.ts`, including trying to finish or clean up the file.
async fn write_mpeg2ts(
opts: &Opts,
session: retina::client::Session<retina::client::Described>,
audio_params: Option<Box<AudioParameters>>,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
) -> Result<(), Error> {
let mut session = session
.play(
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
)
.await?
.demuxed()?;
// Append into a filename suffixed with ".partial", then try to either rename it into
// place if it's complete or delete it otherwise.
const PARTIAL_SUFFIX: &str = ".partial";
let mut tmp_filename = opts.out.as_os_str().to_owned();
tmp_filename.push(PARTIAL_SUFFIX); // OsString::push doesn't put in a '/', unlike PathBuf::.
let tmp_filename: PathBuf = tmp_filename.into();
let out = std::fs::File::create(&tmp_filename)?;
let mut ts_writer = Mpeg2TsWriter::new(audio_params, out).await?;
let result = copy(opts, &mut session, stop_signal, &mut ts_writer).await;
if let Err(e) = result {
// Log errors about finishing, returning the original error.
if let Err(e) = ts_writer.finish().await {
log::error!(".mp4 finish failed: {}", e);
if let Err(e) = tokio::fs::remove_file(&tmp_filename).await {
log::error!("and removing .mp4 failed too: {}", e);
}
} else if let Err(e) = tokio::fs::rename(&tmp_filename, &opts.out).await {
log::error!("unable to move completed .mp4 into place: {}", e);
}
Err(e)
} else {
// Directly return errors about finishing.
if let Err(e) = ts_writer.finish().await {
log::error!(".mp4 finish failed: {}", e);
if let Err(e) = tokio::fs::remove_file(&tmp_filename).await {
log::error!("and removing .mp4 failed too: {}", e);
}
Err(e)
} else {
tokio::fs::rename(&tmp_filename, &opts.out).await?;
Ok(())
}
}
}
pub async fn run(opts: Opts) -> Result<(), Error> {
if matches!(opts.transport, Transport::Udp(_)) && !opts.allow_loss {
warn!("Using --transport=udp without strongly recommended --allow-loss!");
}
let creds = super::creds(opts.src.username.clone(), opts.src.password.clone());
let stop_signal = Box::pin(tokio::signal::ctrl_c());
let session_group = Arc::new(retina::client::SessionGroup::default());
let mut session = retina::client::Session::describe(
opts.src.url.clone(),
retina::client::SessionOptions::default()
.creds(creds)
.session_group(session_group.clone())
.user_agent("Retina TS example".to_owned())
.teardown(opts.teardown),
)
.await?;
let video_stream_i = if !opts.no_video {
let s = session.streams().iter().position(|s| {
if s.media() == "video" {
if s.encoding_name() == "h264" {
log::info!("Using h264 video stream");
return true;
}
log::info!(
"Ignoring {} video stream because it's unsupported",
s.encoding_name(),
);
}
false
});
if s.is_none() {
log::info!("No suitable video stream found");
}
s
} else {
log::info!("Ignoring video streams (if any) because of --no-video");
None
};
if let Some(i) = video_stream_i {
session
.setup(i, SetupOptions::default().transport(opts.transport.clone()))
.await?;
}
let audio_stream = if !opts.no_audio {
let s = session
.streams()
.iter()
.enumerate()
.find_map(|(i, s)| match s.parameters() {
// Only consider audio streams that can produce a .mp4 sample
// entry.
Some(retina::codec::ParametersRef::Audio(a)) if a.sample_entry().is_some() => {
log::info!("Using {} audio stream (rfc 6381 codec {})", s.encoding_name(), a.rfc6381_codec().unwrap());
log::info!("{:?}", a);
Some((i, Box::new(a.clone())))
}
_ if s.media() == "audio" => {
log::info!("Ignoring {} audio stream because it can't be placed into a .mp4 file without transcoding", s.encoding_name());
None
}
_ => None,
});
if s.is_none() {
log::info!("No suitable audio stream found");
}
s
} else {
log::info!("Ignoring audio streams (if any) because of --no-audio");
None
};
if let Some((i, _)) = audio_stream {
session
.setup(i, SetupOptions::default().transport(opts.transport.clone()))
.await?;
}
if video_stream_i.is_none() && audio_stream.is_none() {
bail!("Exiting because no video or audio stream was selected; see info log messages above");
}
let result = write_mpeg2ts(&opts, session, audio_stream.map(|(_i, p)| p), stop_signal).await;
// Session has now been dropped, on success or failure. A TEARDOWN should
// be pending if necessary. session_group.await_teardown() will wait for it.
if let Err(e) = session_group.await_teardown().await {
log::error!("TEARDOWN failed: {}", e);
}
result
}
fn make_raw_payload(pes_data: &[u8]) -> Result<mpeg2ts::ts::payload::Bytes, TsError> {
mpeg2ts::ts::payload::Bytes::new(&pes_data)
.map_err(|_| TsError::PayloadTooBig)
}
fn make_timestamp(ts: f64) -> Result<Timestamp, TsError> {
let int_ts = (ts * 90_000.0).round() as u64;
Timestamp::new(int_ts)
.map_err(|_| TsError::InvalidTimestamp(int_ts))
}
fn default_ts_header(pid: u16) -> Result<TsHeader, TsError> {
use mpeg2ts::ts::TransportScramblingControl;
Ok(TsHeader {
transport_error_indicator: false,
transport_priority: false,
pid: Pid::new(pid).map_err(|_| TsError::InvalidPacketId(pid))?,
transport_scrambling_control: TransportScramblingControl::NotScrambled,
continuity_counter: ContinuityCounter::new(),
})
}
fn default_pat_packet() -> TsPacket {
use mpeg2ts::ts::{VersionNumber, payload::Pat, ProgramAssociation};
TsPacket {
header: default_ts_header(0).unwrap(),
adaptation_field: None,
payload: Some(
TsPayload::Pat(Pat {
transport_stream_id: 1,
version_number: VersionNumber::default(),
table: vec![
ProgramAssociation {
program_num: 1,
program_map_pid: Pid::new(PMT_PID).unwrap(),
}
]
})),
}
}
fn default_pmt_packet() -> TsPacket {
use mpeg2ts::{
ts::{VersionNumber, payload::Pmt, EsInfo},
es::StreamType,
};
TsPacket {
header: default_ts_header(PMT_PID).unwrap(),
adaptation_field: None,
payload: Some(
TsPayload::Pmt(Pmt {
program_num: 1,
pcr_pid: Some(Pid::new(VIDEO_ES_PID).unwrap()),
version_number: VersionNumber::default(),
table: vec![
EsInfo {
stream_type: StreamType::H264,
elementary_pid: Pid::new(VIDEO_ES_PID).unwrap(),
descriptors: vec![],
},
EsInfo {
stream_type: StreamType::AdtsAac,
elementary_pid: Pid::new(AUDIO_ES_PID).unwrap(),
descriptors: vec![],
}
]
})),
}
}
#[derive(Debug, thiserror::Error)]
pub enum AacError {
#[error("AAC coder not initialized")]
NotInitialized,
#[error("Not enough data: {0}")]
NotEnoughData(&'static str),
#[error("Unsupported audio object type")]
UnsupportedAudioFormat,
#[error("Reserved or unsupported frequency index {0}")]
UnsupportedFrequencyIndex(u8),
#[error("Reserved or unsupported channel configuration {0}")]
UnsupportedChannelConfiguration(u8),
#[error("Got forbidden sampling frequency index {0}")]
ForbiddenSamplingFrequencyIndex(u8),
}
#[derive(Debug, Clone, Copy)]
pub struct SamplingFrequencyIndex(u8);
impl From<SamplingFrequencyIndex> for u8 {
fn from(val: SamplingFrequencyIndex) -> Self {
val.0
}
}
impl TryFrom<u8> for SamplingFrequencyIndex {
type Error = AacError;
fn try_from(val: u8) -> Result<Self, AacError> {
match val {
0..=12 | 15 => Ok(Self(val)),
_ => Err(AacError::UnsupportedFrequencyIndex(val)),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct ChannelConfiguration(u8);
impl From<ChannelConfiguration> for u8 {
fn from(val: ChannelConfiguration) -> Self {
val.0
}
}
impl TryFrom<u8> for ChannelConfiguration {
type Error = AacError;
fn try_from(val: u8) -> Result<Self, AacError> {
match val {
0..=7 => Ok(Self(val)),
_ => Err(AacError::UnsupportedChannelConfiguration(val)),
}
}
}
// See [MPEG-4 Audio Object Types][audio_object_types]
//
// [audio_object_types]: https://en.wikipedia.org/wiki/MPEG-4_Part_3#MPEG-4_Audio_Object_Types
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum AudioObjectType {
AacMain = 1,
AacLowComplexity = 2,
AacScalableSampleRate = 3,
AacLongTermPrediction = 4,
}
impl TryFrom<u8> for AudioObjectType {
type Error = AacError;
fn try_from(value: u8) -> Result<Self, AacError> {
Ok(match value {
1 => Self::AacMain,
2 => Self::AacLowComplexity,
3 => Self::AacScalableSampleRate,
4 => Self::AacLongTermPrediction,
_ => return Err(AacError::UnsupportedAudioFormat)
})
}
}
#[derive(Debug, Clone)]
pub struct AudioSpecificConfiguration {
pub object_type: AudioObjectType,
pub sampling_frequency_index: SamplingFrequencyIndex,
pub sampling_frequency: Option<u32>,
pub channel_configuration: ChannelConfiguration,
pub frame_length_flag: bool,
pub depends_on_core_coder: bool,
pub extension_flag: bool,
}
impl TryFrom<&[u8]> for AudioSpecificConfiguration {
type Error = AacError;
fn try_from(val: &[u8]) -> Result<Self, Self::Error> {
if val.len() < 2 {
return Err(AacError::NotEnoughData("AAC audio specific config"));
}
let mut buf = Cursor::new(val);
let header_a = buf.get_u8();
let header_b = buf.get_u8();
let object_type = AudioObjectType::try_from((header_a & 0xF8) >> 3)?;
let sf_idx = ((header_a & 0x07) << 1) | (header_b >> 7);
let sampling_frequency_index = SamplingFrequencyIndex::try_from(sf_idx)?;
let channel_configuration = ChannelConfiguration::try_from((header_b >> 3) & 0x0F)?;
let frame_length_flag = (header_b & 0x04) == 0x04;
let depends_on_core_coder = (header_b & 0x02) == 0x02;
let extension_flag = (header_b & 0x01) == 0x01;
Ok(Self {
object_type,
sampling_frequency_index,
sampling_frequency: None,
channel_configuration,
frame_length_flag,
depends_on_core_coder,
extension_flag,
})
}
}
fn convert_aac(frame: retina::codec::AudioFrame, ap: AudioSpecificConfiguration) -> Result<Vec<u8>, Error> {
const SYNCWORD: u16 = 0xFFF0;
const PROTECTION_ABSENCE: u16 = 0x0001;
let object_type = ap.object_type as u8;
let sampling_frequency_index = u8::from(ap.sampling_frequency_index) << 2;
let channel_configuration: u8 = ap.channel_configuration.into();
let payload = frame.data().to_vec();
// Profile (2 bits = 0), sampling frequency index (4 bits),
// private (1 bit = 0) and channel configuration (1 bit)
let profile = (object_type - 1) << 6;
if sampling_frequency_index == 0x0F {
bail!("ForbiddenSamplingFrequencyIndex({})",sampling_frequency_index );
// return Err(Error::AacError::ForbiddenSamplingFrequencyIndex(sampling_frequency_index));
}
// Channel configuration cont. (2 bits), originality (1 bit = 0),
// home (1 bit = 0), copyrighted id (1 bit = 0)
// copyright id start (1 bit = 0) and frame length (2 bits)
let channel_configuration1 = (channel_configuration & 0x07) >> 2;
let channel_configuration2 = (channel_configuration & 0x03) << 6;
// Header is 7 bytes long if protection is absent,
// 9 bytes otherwise (CRC requires 2 bytes).
let frame_length = (payload.len() + 7) as u16;
let frame_length1 = ((frame_length & 0x1FFF) >> 11) as u8;
// Frame length cont. (11 bits) and buffer fullness (5 bits)
let frame_length2 = ((frame_length & 0x7FF) << 5) as u16;
let mut out = Vec::with_capacity(56 + payload.len());
// Syncword (12 bits), MPEG version (1 bit = 0),
// layer (2 bits = 0) and protection absence (1 bit = 1)
out.put_u16(SYNCWORD | PROTECTION_ABSENCE);
out.put_u8(profile | sampling_frequency_index | channel_configuration1);
out.put_u8(channel_configuration2 | frame_length1);
out.put_u16(frame_length2 | 0b0000_0000_0001_1111);
// Buffer fullness cont. (6 bits) and number of AAC frames minus one (2 bits = 0)
out.put_u8(0b1111_1100);
out.extend(payload);
Ok(out)
}
/// Converts from AVC representation to the Annex B representation expected by webrtc-rs.
fn convert_h264(frame: retina::codec::VideoFrame) -> Result<Vec<u8>, Error> {
// TODO:
// * For each IDR frame, copy the SPS and PPS from the stream's
// parameters, rather than depend on it being present in the frame
// already. In-band parameters aren't guaranteed. This is awkward
// with h264_reader v0.5's h264_reader::avcc::AvcDecoderRecord because it
// strips off the NAL header byte from each parameter. The next major
// version shouldn't do this.
// * Copy only the slice data. In particular, don't copy SEI, which confuses
// Safari: <https://github.com/scottlamb/retina/issues/60#issuecomment-1178369955>
let mut data = frame.into_data();
let mut i = 0;
while i < data.len() - 3 {
// Replace each NAL's length with the Annex B start code b"\x00\x00\x00\x01".
let len = u32::from_be_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]) as usize;
data[i] = 0;
data[i + 1] = 0;
data[i + 2] = 0;
data[i + 3] = 1;
i += 4 + len;
if i > data.len() {
bail!("partial NAL body");
}
}
if i < data.len() {
bail!("partial NAL length");
}
Ok(data)
}

View File

@ -104,6 +104,7 @@ async fn run() -> Result<(), Error> {
}; };
let stop_signal = tokio::signal::ctrl_c(); let stop_signal = tokio::signal::ctrl_c();
tokio::pin!(stop_signal); tokio::pin!(stop_signal);
let upstream_session_group = Arc::new(retina::client::SessionGroup::default()); let upstream_session_group = Arc::new(retina::client::SessionGroup::default());
let mut upstream_session = retina::client::Session::describe( let mut upstream_session = retina::client::Session::describe(
opts.url.clone(), opts.url.clone(),
@ -177,6 +178,7 @@ async fn run() -> Result<(), Error> {
format!("{}-video", i), format!("{}-video", i),
"retina-webrtc-proxy".to_owned(), "retina-webrtc-proxy".to_owned(),
)); ));
let sender = downstream_conn let sender = downstream_conn
.add_track(Arc::clone(&track) as Arc<dyn TrackLocal + Send + Sync>) .add_track(Arc::clone(&track) as Arc<dyn TrackLocal + Send + Sync>)
.await?; .await?;
@ -192,6 +194,7 @@ async fn run() -> Result<(), Error> {
if tracks.len() <= i { if tracks.len() <= i {
tracks.resize(i + 1, None); tracks.resize(i + 1, None);
} }
tracks[i] = Some(track); tracks[i] = Some(track);
} }
@ -220,7 +223,9 @@ async fn run() -> Result<(), Error> {
Box::pin(async {}) Box::pin(async {})
})) }))
.await; .await;
tokio::pin!(ice_conn_state_rx); tokio::pin!(ice_conn_state_rx);
let (peer_conn_state_tx, peer_conn_state_rx) = tokio::sync::mpsc::unbounded_channel(); let (peer_conn_state_tx, peer_conn_state_rx) = tokio::sync::mpsc::unbounded_channel();
downstream_conn downstream_conn
.on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| { .on_peer_connection_state_change(Box::new(move |state: RTCPeerConnectionState| {
@ -228,20 +233,25 @@ async fn run() -> Result<(), Error> {
Box::pin(async {}) Box::pin(async {})
})) }))
.await; .await;
tokio::pin!(peer_conn_state_rx); tokio::pin!(peer_conn_state_rx);
println!("Navigate to https://jsfiddle.net/9s10amwL/ in your browser."); println!("Navigate to https://jsfiddle.net/9s10amwL/ in your browser.");
println!("Paste from the 'Browser base64 Session description' box to here:"); println!("Paste from the 'Browser base64 Session description' box to here:");
let offer = read_offer()?; let offer = read_offer()?;
println!(); println!();
downstream_conn.set_remote_description(offer).await?; downstream_conn.set_remote_description(offer).await?;
let answer = downstream_conn.create_answer(None).await?; let answer = downstream_conn.create_answer(None).await?;
downstream_conn.set_local_description(answer).await?; downstream_conn.set_local_description(answer).await?;
downstream_conn downstream_conn
.gathering_complete_promise() .gathering_complete_promise()
.await .await
.recv() .recv()
.await; .await;
if let Some(local_desc) = downstream_conn.local_description().await { if let Some(local_desc) = downstream_conn.local_description().await {
println!("Paste from here to the 'Golang base64 Session Description' box:"); println!("Paste from here to the 'Golang base64 Session Description' box:");
println!("{}", base64::encode(serde_json::to_string(&local_desc)?)); println!("{}", base64::encode(serde_json::to_string(&local_desc)?));

49
player/app.js Normal file
View File

@ -0,0 +1,49 @@
/* eslint-env browser */
let pc = new RTCPeerConnection({
iceServers: [
{
urls: 'stun:stun.l.google.com:19302'
}
]
})
let log = msg => {
document.getElementById('div').innerHTML += msg + '<br>'
}
pc.ontrack = function (event) {
var el = document.createElement(event.track.kind)
el.srcObject = event.streams[0]
el.autoplay = true
el.controls = true
document.getElementById('remoteVideos').appendChild(el)
}
pc.oniceconnectionstatechange = e => log(pc.iceConnectionState)
pc.onicecandidate = event => {
if (event.candidate === null) {
document.getElementById('localSessionDescription').value = btoa(JSON.stringify(pc.localDescription))
}
}
// Offer to receive 1 audio, and 1 video track
pc.addTransceiver('video', {'direction': 'sendrecv'})
pc.addTransceiver('audio', {'direction': 'sendrecv'})
pc.createOffer().then(d => pc.setLocalDescription(d)).catch(log)
window.startSession = () => {
let sd = document.getElementById('remoteSessionDescription').value
if (sd === '') {
return alert('Session Description must not be empty')
}
try {
pc.setRemoteDescription(new RTCSessionDescription(JSON.parse(atob(sd))))
} catch (e) {
alert(e)
}
}

26
player/index.html Normal file
View File

@ -0,0 +1,26 @@
<html>
<head>
<link href="./style.css" />
<script src="./app.js" />
</head>
<body>
Browser base64 Session Description<br />
<textarea id="localSessionDescription" readonly="true"></textarea> <br />
Golang base64 Session Description<br />
<textarea id="remoteSessionDescription"> </textarea> <br />
<button onclick="window.startSession()"> Start Session </button><br />
<br />
Video<br />
<div id="remoteVideos"></div> <br />
Logs<br />
<div id="div"></div>
</body>
</html>

4
player/style.css Normal file
View File

@ -0,0 +1,4 @@
textarea {
width: 500px;
min-height: 75px;
}

View File

@ -2444,27 +2444,32 @@ impl futures::Stream for Demuxed {
DemuxedState::Pulling(stream_id) => (stream_id, None), DemuxedState::Pulling(stream_id) => (stream_id, None),
DemuxedState::Fused => return Poll::Ready(None), DemuxedState::Fused => return Poll::Ready(None),
}; };
let inner = self.session.0.as_mut().project(); let inner = self.session.0.as_mut().project();
let stream = &mut inner.presentation.streams[stream_id]; let stream = &mut inner.presentation.streams[stream_id];
let stream_ctx = match stream.state { let stream_ctx = match stream.state {
StreamState::Playing { ref ctx, .. } => ctx, StreamState::Playing { ref ctx, .. } => ctx,
_ => unreachable!(), _ => unreachable!(),
}; };
let depacketizer = match &mut stream.depacketizer { let depacketizer = match &mut stream.depacketizer {
Ok(d) => d, Ok(d) => d,
Err(_) => unreachable!("depacketizer was Ok"), Err(_) => unreachable!("depacketizer was Ok"),
}; };
let conn_ctx = inner let conn_ctx = inner
.conn .conn
.as_ref() .as_ref()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))? .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?
.inner .inner
.ctx(); .ctx();
if let Some(p) = pkt { if let Some(p) = pkt {
let pkt_ctx = *p.ctx(); let pkt_ctx = *p.ctx();
let stream_id = p.stream_id(); let stream_id = p.stream_id();
let ssrc = p.ssrc(); let ssrc = p.ssrc();
let sequence_number = p.sequence_number(); let sequence_number = p.sequence_number();
depacketizer.push(p).map_err(|description| { depacketizer.push(p).map_err(|description| {
wrap!(ErrorInt::RtpPacketError { wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx, conn_ctx: *conn_ctx,

View File

@ -608,7 +608,7 @@ impl Depacketizer {
frag.buf.extend_from_slice(data); frag.buf.extend_from_slice(data);
self.state = DepacketizerState::Ready(super::AudioFrame { self.state = DepacketizerState::Ready(super::AudioFrame {
ctx: *pkt.ctx(), ctx: *pkt.ctx(),
loss: frag.loss, loss: frag.loss as _,
frame_length: NonZeroU32::from(self.config.frame_length), frame_length: NonZeroU32::from(self.config.frame_length),
stream_id: pkt.stream_id(), stream_id: pkt.stream_id(),
timestamp: pkt.timestamp(), timestamp: pkt.timestamp(),
@ -727,7 +727,7 @@ impl Depacketizer {
let agg_timestamp = agg.pkt.timestamp(); let agg_timestamp = agg.pkt.timestamp();
let frame = super::AudioFrame { let frame = super::AudioFrame {
ctx: *agg.pkt.ctx(), ctx: *agg.pkt.ctx(),
loss: agg.loss, loss: agg.loss as _,
stream_id: agg.pkt.stream_id(), stream_id: agg.pkt.stream_id(),
frame_length: NonZeroU32::from(self.config.frame_length), frame_length: NonZeroU32::from(self.config.frame_length),

View File

@ -62,7 +62,7 @@ impl Depacketizer {
} }
self.pending = Some(super::AudioFrame { self.pending = Some(super::AudioFrame {
ctx: *pkt.ctx(), ctx: *pkt.ctx(),
loss: pkt.loss(), loss: pkt.loss() as _,
stream_id: pkt.stream_id(), stream_id: pkt.stream_id(),
timestamp: pkt.timestamp(), timestamp: pkt.timestamp(),
frame_length: NonZeroU32::new(240).unwrap(), frame_length: NonZeroU32::new(240).unwrap(),

View File

@ -516,7 +516,7 @@ impl Depacketizer {
}; };
Ok(VideoFrame { Ok(VideoFrame {
has_new_parameters, has_new_parameters,
loss: au.loss, loss: au.loss as _,
start_ctx: au.start_ctx, start_ctx: au.start_ctx,
end_ctx: au.end_ctx, end_ctx: au.end_ctx,
timestamp: au.timestamp, timestamp: au.timestamp,

392
src/codec/h265.rs Normal file
View File

@ -0,0 +1,392 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
use bytes::{BufMut, Bytes, BytesMut};
use rtp::codecs::h265::{H265NALUHeader, H265Packet, H265Payload};
use rtp::packetizer::Depacketizer as _;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use super::{VideoFrame, VideoParameters};
use crate::rtp::ReceivedPacket;
const H265NALU_BLA_W_LP: u8 = 16;
const H265NALU_BLA_W_RADL: u8 = 17;
const H265NALU_BLA_N_LP: u8 = 18;
const H265NALU_IDR_W_RADL: u8 = 19;
const H265NALU_IDR_N_LP: u8 = 20;
const H265NALU_CRA_NUT: u8 = 21;
const H265NALU_VPS: u8 = 32;
const H265NALU_SPS: u8 = 33;
const H265NALU_PPS: u8 = 34;
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum DepacketizerInputState {
/// Not yet processing an access unit.
Idle,
/// Currently processing an access unit.
/// This will be flushed after a marked packet or when receiving a later timestamp.
Process(AccessUnit),
}
#[derive(Debug, Default)]
struct Params {
vps: Option<Vec<u8>>,
pps: Option<Vec<u8>>,
sps: Option<Vec<u8>>,
sei: Option<Vec<u8>>,
profile_id: Option<i32>,
using_donl_field: bool,
}
impl From<&str> for Params {
fn from(input: &str) -> Self {
let mut params = Params::default();
for mut item in input.split(';').map(|x| x.split('=')) {
let Some(key) = item.next() else {continue};
let Some(value) = item.next() else {continue};
let key = key.trim();
if key.eq_ignore_ascii_case("profile-id") {
match value.trim().parse::<i32>() {
Ok(val) => params.profile_id = val.into(),
Err(err) => log::error!("attr[profile-id] parse error: {}", err),
}
} else if key.eq_ignore_ascii_case("sprop-vps") {
match base64::decode(value.trim()) {
Ok(val) => params.vps = Some(val),
Err(err) => log::error!("attr[sprop-vps] parse error: {}", err),
}
} else if key.eq_ignore_ascii_case("sprop-pps") {
match base64::decode(value.trim()) {
Ok(val) => params.pps = Some(val),
Err(err) => log::error!("attr[sprop-pps] parse error: {}", err),
}
} else if key.eq_ignore_ascii_case("sprop-sps") {
match base64::decode(value.trim()) {
Ok(val) => params.sps = Some(val),
Err(err) => log::error!("attr[sprop-sps] parse error: {}", err),
}
} else if key.eq_ignore_ascii_case("sprop-sei") {
match base64::decode(value.trim()) {
Ok(val) => params.sei = Some(val),
Err(err) => log::error!("attr[sprop-sei] parse error: {}", err),
}
} else if key.eq_ignore_ascii_case("sprop-max-don-diff") {
match value.trim().parse::<i32>() {
Ok(val) => params.using_donl_field = val > 0,
Err(err) => log::error!("attr[sprop-max-don-diff] parse error: {}", err),
}
} else if key.eq_ignore_ascii_case("sprop-depack-buf-nalus") {
match value.trim().parse::<i32>() {
Ok(val) => params.using_donl_field = val > 0,
Err(err) => log::error!("attr[sprop-depack-buf-nalus] parse error: {}", err),
}
}
}
params
}
}
/// A [super::Depacketizer] implementation which finds access unit boundaries
/// and produces unfragmented NAL units as specified in [RFC
/// 6184](https://tools.ietf.org/html/rfc6184).
///
/// This doesn't inspect the contents of the NAL units, so it doesn't depend on or
/// verify compliance with H.264 section 7.4.1.2.3 "Order of NAL units and coded
/// pictures and association to access units".
///
/// Currently expects that the stream starts at an access unit boundary unless
/// packet loss is indicated.
#[derive(Debug)]
pub(crate) struct Depacketizer {
input_state: DepacketizerInputState,
pkts_loss: u32,
/// A complete video frames ready for pull.
pending: VecDeque<VideoFrame>,
parameters: Params,
parameters_sent_time: Option<Instant>,
generic_params: Option<VideoParameters>,
}
impl Depacketizer {
pub(super) fn new(
clock_rate: u32,
format_specific_params: Option<&str>,
) -> Result<Self, String> {
if clock_rate != 90_000 {
return Err(format!(
"invalid H.265 clock rate {}; must always be 90000",
clock_rate
));
}
// TODO proper parse SPS, PPS for `generic_params`
let generic_params = Some(VideoParameters {
pixel_dimensions: (0, 0),
rfc6381_codec: "hevc".to_string(),
pixel_aspect_ratio: None,
frame_rate: None,
extra_data: Vec::new().into(),
});
log::debug!(">>> FSP: {:?}", format_specific_params);
Ok(Depacketizer {
pkts_loss: 0,
input_state: DepacketizerInputState::Idle,
pending: VecDeque::new(),
parameters: format_specific_params.map(Into::into).unwrap_or_default(),
parameters_sent_time: None,
generic_params,
})
}
fn send_params(
&mut self,
ctx: crate::PacketContext,
stream_id: usize,
timestamp: crate::Timestamp,
) {
if let Some(vps) = &self.parameters.vps {
let mut data = Vec::with_capacity(vps.len() + 4);
data.put_u32(vps.len() as u32);
data.extend_from_slice(&vps);
self.pending.push_back(VideoFrame {
start_ctx: ctx,
end_ctx: ctx,
has_new_parameters: false,
loss: 0,
timestamp,
stream_id,
is_random_access_point: false,
is_disposable: false,
data,
});
}
if let Some(sps) = &self.parameters.sps {
let mut data = Vec::with_capacity(sps.len() + 4);
data.put_u32(sps.len() as u32);
data.extend_from_slice(&sps);
self.pending.push_back(VideoFrame {
start_ctx: ctx,
end_ctx: ctx,
has_new_parameters: false,
loss: 0,
timestamp,
stream_id,
is_random_access_point: false,
is_disposable: false,
data,
});
}
if let Some(pps) = &self.parameters.pps {
let mut data = Vec::with_capacity(pps.len() + 4);
data.put_u32(pps.len() as u32);
data.extend_from_slice(&pps);
self.pending.push_back(VideoFrame {
start_ctx: ctx,
end_ctx: ctx,
has_new_parameters: false,
loss: 0,
timestamp,
stream_id,
is_random_access_point: false,
is_disposable: false,
data,
});
}
}
pub(super) fn parameters(&self) -> Option<super::ParametersRef> {
self.generic_params
.as_ref()
.map(super::ParametersRef::Video)
}
pub(super) fn pull(&mut self) -> Option<super::CodecItem> {
self.pending.pop_front().map(super::CodecItem::VideoFrame)
}
pub(super) fn push(&mut self, pkt: ReceivedPacket) -> Result<(), String> {
let ctx = *pkt.ctx();
let loss = pkt.loss;
let stream_id = pkt.stream_id();
let timestamp = pkt.timestamp();
if matches!(self.parameters_sent_time, None)
| matches!(self.parameters_sent_time, Some(inst) if inst.elapsed() > Duration::from_secs(5))
{
self.parameters_sent_time = Some(Instant::now());
self.send_params(ctx, stream_id, timestamp);
}
self.pkts_loss += loss as u32;
let payload = pkt.into_payload_bytes();
let mut pkt = H265Packet::default();
pkt.with_donl(self.parameters.using_donl_field);
pkt.depacketize(&payload)
.map_err(|err| format!("{}", err))?;
self.input_state = loop {
break match std::mem::replace(&mut self.input_state, DepacketizerInputState::Idle) {
DepacketizerInputState::Idle => match pkt.payload() {
H265Payload::H265SingleNALUnitPacket(pkt) => {
let mut au = AccessUnit::start(ctx, timestamp, stream_id);
au.push(pkt.payload(), Some(pkt.payload_header()));
au.finsh(&mut self.pending, ctx, self.pkts_loss);
self.pkts_loss = 0;
DepacketizerInputState::Idle
}
H265Payload::H265AggregationPacket(pkt) => {
let mut au = AccessUnit::start(ctx, timestamp, stream_id);
let first = pkt.first_unit().unwrap();
au.push(first.nal_unit(), None);
au.finsh(&mut self.pending, ctx, self.pkts_loss);
self.pkts_loss = 0;
let others = pkt.other_units();
for u in others {
let mut au = AccessUnit::start(ctx, timestamp, stream_id);
au.push(u.nal_unit(), None);
au.finsh(&mut self.pending, ctx, self.pkts_loss);
}
DepacketizerInputState::Idle
}
H265Payload::H265FragmentationUnitPacket(pkt) => {
let fu_header = pkt.fu_header();
if fu_header.s() {
let mut au = AccessUnit::start(ctx, timestamp, stream_id);
let header = pkt.payload_header().0;
let header = header & !(0b111111u16 << 9);
let header = header | ((fu_header.fu_type() & 0b111111) as u16) << 9;
au.push(pkt.payload(), Some(H265NALUHeader(header)));
DepacketizerInputState::Process(au)
} else {
self.pkts_loss += 1;
DepacketizerInputState::Idle
}
}
H265Payload::H265PACIPacket(_) => DepacketizerInputState::Idle,
},
DepacketizerInputState::Process(mut au) => match pkt.payload() {
H265Payload::H265FragmentationUnitPacket(pkt) => {
if loss > 0 {
self.pkts_loss += au.segments + 1;
DepacketizerInputState::Idle
} else {
let fu_header = pkt.fu_header();
let header = pkt.payload_header().0;
let header = header & !(0b111111u16 << 9);
let header = header | ((fu_header.fu_type() & 0b111111) as u16) << 9;
au.push(pkt.payload(), Some(H265NALUHeader(header)));
if fu_header.e() {
au.finsh(&mut self.pending, ctx, self.pkts_loss);
self.pkts_loss = 0;
DepacketizerInputState::Idle
} else {
DepacketizerInputState::Process(au)
}
}
}
_ => {
self.pkts_loss += au.segments;
self.input_state = DepacketizerInputState::Idle;
continue;
}
},
};
};
Ok(())
}
}
/// An access unit that is currently being accumulated during `PreMark` state.
#[derive(Debug)]
struct AccessUnit {
ctx: crate::PacketContext,
timestamp: crate::Timestamp,
segments: u32,
stream_id: usize,
header: Option<H265NALUHeader>,
data: BytesMut,
}
impl AccessUnit {
fn start(ctx: crate::PacketContext, timestamp: crate::Timestamp, stream_id: usize) -> Self {
let mut data = BytesMut::new();
data.put_u32(0);
data.put_u16(0);
AccessUnit {
ctx,
timestamp,
stream_id,
segments: 0,
header: None,
data,
}
}
fn push(&mut self, data: Bytes, header: Option<H265NALUHeader>) {
self.segments += 1;
self.header = header;
self.data.put(data);
}
fn finsh(self, dst: &mut VecDeque<VideoFrame>, ctx: crate::PacketContext, loss: u32) {
let mut data = self.data.to_vec();
let length = data.len() as u32 - 4;
(&mut data[0..4]).put_u32(length);
if let Some(hdr) = self.header {
(&mut data[4..6]).put_u16(hdr.0);
}
let is_random_access_point = match self.header.unwrap().nalu_type() {
H265NALU_BLA_W_LP | H265NALU_BLA_W_RADL | H265NALU_BLA_N_LP | H265NALU_IDR_W_RADL
| H265NALU_IDR_N_LP | H265NALU_CRA_NUT => true,
_ => false,
};
dst.push_back(VideoFrame {
start_ctx: self.ctx,
end_ctx: ctx,
has_new_parameters: false,
stream_id: self.stream_id,
timestamp: self.timestamp,
is_random_access_point,
is_disposable: false,
loss,
data,
})
}
}
#[cfg(test)]
mod tests {}

View File

@ -22,6 +22,9 @@ pub(crate) mod g723;
#[doc(hidden)] #[doc(hidden)]
pub mod h264; pub mod h264;
#[doc(hidden)]
pub mod h265;
pub(crate) mod onvif; pub(crate) mod onvif;
pub(crate) mod simple_audio; pub(crate) mod simple_audio;
@ -191,7 +194,7 @@ pub struct AudioFrame {
stream_id: usize, stream_id: usize,
timestamp: crate::Timestamp, timestamp: crate::Timestamp,
frame_length: NonZeroU32, frame_length: NonZeroU32,
loss: u16, loss: u32,
data: Bytes, data: Bytes,
} }
@ -222,7 +225,7 @@ impl AudioFrame {
/// Note that if loss occurs during a fragmented frame, more than this /// Note that if loss occurs during a fragmented frame, more than this
/// number of packets' worth of data may be skipped. /// number of packets' worth of data may be skipped.
#[inline] #[inline]
pub fn loss(&self) -> u16 { pub fn loss(&self) -> u32 {
self.loss self.loss
} }
@ -254,7 +257,7 @@ pub struct MessageFrame {
ctx: crate::PacketContext, ctx: crate::PacketContext,
timestamp: crate::Timestamp, timestamp: crate::Timestamp,
stream_id: usize, stream_id: usize,
loss: u16, loss: u32,
data: Bytes, data: Bytes,
} }
@ -292,7 +295,7 @@ impl MessageFrame {
/// Note that if loss occurs during a fragmented frame, more than this /// Note that if loss occurs during a fragmented frame, more than this
/// number of packets' worth of data may be skipped. /// number of packets' worth of data may be skipped.
#[inline] #[inline]
pub fn loss(&self) -> u16 { pub fn loss(&self) -> u32 {
self.loss self.loss
} }
@ -316,7 +319,7 @@ pub struct VideoFrame {
end_ctx: crate::PacketContext, end_ctx: crate::PacketContext,
has_new_parameters: bool, has_new_parameters: bool,
loss: u16, loss: u32,
timestamp: crate::Timestamp, timestamp: crate::Timestamp,
stream_id: usize, stream_id: usize,
is_random_access_point: bool, is_random_access_point: bool,
@ -344,7 +347,7 @@ impl VideoFrame {
/// Note that if loss occurs during a fragmented frame, more than this /// Note that if loss occurs during a fragmented frame, more than this
/// number of packets' worth of data may be skipped. /// number of packets' worth of data may be skipped.
#[inline] #[inline]
pub fn loss(&self) -> u16 { pub fn loss(&self) -> u32 {
self.loss self.loss
} }
@ -430,6 +433,7 @@ enum DepacketizerInner {
SimpleAudio(Box<simple_audio::Depacketizer>), SimpleAudio(Box<simple_audio::Depacketizer>),
G723(Box<g723::Depacketizer>), G723(Box<g723::Depacketizer>),
H264(Box<h264::Depacketizer>), H264(Box<h264::Depacketizer>),
H265(Box<h265::Depacketizer>),
Onvif(Box<onvif::Depacketizer>), Onvif(Box<onvif::Depacketizer>),
} }
@ -450,6 +454,10 @@ impl Depacketizer {
clock_rate, clock_rate,
format_specific_params, format_specific_params,
)?)), )?)),
("video", "h265") => DepacketizerInner::H265(Box::new(h265::Depacketizer::new(
clock_rate,
format_specific_params,
)?)),
("audio", "mpeg4-generic") => DepacketizerInner::Aac(Box::new(aac::Depacketizer::new( ("audio", "mpeg4-generic") => DepacketizerInner::Aac(Box::new(aac::Depacketizer::new(
clock_rate, clock_rate,
channels, channels,
@ -519,6 +527,7 @@ impl Depacketizer {
DepacketizerInner::Aac(d) => d.parameters(), DepacketizerInner::Aac(d) => d.parameters(),
DepacketizerInner::G723(d) => d.parameters(), DepacketizerInner::G723(d) => d.parameters(),
DepacketizerInner::H264(d) => d.parameters(), DepacketizerInner::H264(d) => d.parameters(),
DepacketizerInner::H265(d) => d.parameters(),
DepacketizerInner::Onvif(d) => d.parameters(), DepacketizerInner::Onvif(d) => d.parameters(),
DepacketizerInner::SimpleAudio(d) => d.parameters(), DepacketizerInner::SimpleAudio(d) => d.parameters(),
} }
@ -534,6 +543,7 @@ impl Depacketizer {
DepacketizerInner::Aac(d) => d.push(input), DepacketizerInner::Aac(d) => d.push(input),
DepacketizerInner::G723(d) => d.push(input), DepacketizerInner::G723(d) => d.push(input),
DepacketizerInner::H264(d) => d.push(input), DepacketizerInner::H264(d) => d.push(input),
DepacketizerInner::H265(d) => d.push(input),
DepacketizerInner::Onvif(d) => d.push(input), DepacketizerInner::Onvif(d) => d.push(input),
DepacketizerInner::SimpleAudio(d) => d.push(input), DepacketizerInner::SimpleAudio(d) => d.push(input),
} }
@ -552,6 +562,7 @@ impl Depacketizer {
DepacketizerInner::Aac(d) => d.pull(conn_ctx, stream_ctx), DepacketizerInner::Aac(d) => d.pull(conn_ctx, stream_ctx),
DepacketizerInner::G723(d) => Ok(d.pull()), DepacketizerInner::G723(d) => Ok(d.pull()),
DepacketizerInner::H264(d) => Ok(d.pull()), DepacketizerInner::H264(d) => Ok(d.pull()),
DepacketizerInner::H265(d) => Ok(d.pull()),
DepacketizerInner::Onvif(d) => Ok(d.pull()), DepacketizerInner::Onvif(d) => Ok(d.pull()),
DepacketizerInner::SimpleAudio(d) => Ok(d.pull()), DepacketizerInner::SimpleAudio(d) => Ok(d.pull()),
} }

View File

@ -83,7 +83,7 @@ impl Depacketizer {
// fast-path: avoid copy. // fast-path: avoid copy.
self.state = State::Ready(super::MessageFrame { self.state = State::Ready(super::MessageFrame {
stream_id: pkt.stream_id(), stream_id: pkt.stream_id(),
loss: pkt.loss(), loss: pkt.loss() as _,
ctx: *pkt.ctx(), ctx: *pkt.ctx(),
timestamp: pkt.timestamp(), timestamp: pkt.timestamp(),
data: pkt.into_payload_bytes(), data: pkt.into_payload_bytes(),
@ -107,7 +107,7 @@ impl Depacketizer {
ctx: in_progress.ctx, ctx: in_progress.ctx,
timestamp: in_progress.timestamp, timestamp: in_progress.timestamp,
data: in_progress.data.freeze(), data: in_progress.data.freeze(),
loss: in_progress.loss, loss: in_progress.loss as _,
}); });
} else { } else {
self.state = State::InProgress(in_progress); self.state = State::InProgress(in_progress);

View File

@ -57,7 +57,7 @@ impl Depacketizer {
) )
})?; })?;
self.pending = Some(super::AudioFrame { self.pending = Some(super::AudioFrame {
loss: pkt.loss(), loss: pkt.loss() as _,
ctx: *pkt.ctx(), ctx: *pkt.ctx(),
stream_id: pkt.stream_id(), stream_id: pkt.stream_id(),
timestamp: pkt.timestamp(), timestamp: pkt.timestamp(),