add a more end-to-end benchmark
This commit is contained in:
parent
6503398d3f
commit
4a8b8f2f77
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -224,6 +224,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"criterion-plot",
|
"criterion-plot",
|
||||||
"csv",
|
"csv",
|
||||||
|
"futures",
|
||||||
"itertools 0.10.1",
|
"itertools 0.10.1",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
@ -236,6 +237,7 @@ dependencies = [
|
|||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tinytemplate",
|
"tinytemplate",
|
||||||
|
"tokio",
|
||||||
"walkdir",
|
"walkdir",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -1135,9 +1137,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rtp-rs"
|
name = "rtp-rs"
|
||||||
version = "0.5.0"
|
version = "0.6.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e1110d695193d446e901de09921ffbf2d86ae351bbfde9c5b53863ce177e17f5"
|
checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rtsp-types"
|
name = "rtsp-types"
|
||||||
|
@ -25,7 +25,7 @@ once_cell = "1.7.2"
|
|||||||
pin-project = "1.0.7"
|
pin-project = "1.0.7"
|
||||||
pretty-hex = "0.2.1"
|
pretty-hex = "0.2.1"
|
||||||
rtcp = "0.2.1"
|
rtcp = "0.2.1"
|
||||||
rtp-rs = "0.5.0"
|
rtp-rs = "0.6.0"
|
||||||
rtsp-types = "0.0.2"
|
rtsp-types = "0.0.2"
|
||||||
sdp = "0.1.4"
|
sdp = "0.1.4"
|
||||||
smallvec = { version = "1.6.1", features = ["union"] }
|
smallvec = { version = "1.6.1", features = ["union"] }
|
||||||
@ -35,7 +35,7 @@ tokio-util = { version = "0.6.6", features = ["codec"] }
|
|||||||
url = "2.2.1"
|
url = "2.2.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = "0.3.4"
|
criterion = { version = "0.3.4", features = ["async_tokio"] }
|
||||||
mylog = { git = "https://github.com/scottlamb/mylog" }
|
mylog = { git = "https://github.com/scottlamb/mylog" }
|
||||||
structopt = "0.3.21"
|
structopt = "0.3.21"
|
||||||
tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "signal"] }
|
tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "signal"] }
|
||||||
@ -43,6 +43,10 @@ tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot
|
|||||||
[profile.bench]
|
[profile.bench]
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "client"
|
||||||
|
harness = false
|
||||||
|
|
||||||
[[bench]]
|
[[bench]]
|
||||||
name = "depacketize"
|
name = "depacketize"
|
||||||
harness = false
|
harness = false
|
||||||
|
139
benches/client.rs
Normal file
139
benches/client.rs
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
|
||||||
|
// SPDX-License-Identifier: MIT OR Apache-2.0
|
||||||
|
|
||||||
|
//! Tests client performance against a mock server.
|
||||||
|
|
||||||
|
use std::{io::ErrorKind, net::SocketAddr, num::NonZeroU32};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use retina::{client::PlayPolicy, codec::CodecItem};
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
/// Mock server. Serves `data` on every connection.
|
||||||
|
async fn serve(data: Bytes) -> SocketAddr {
|
||||||
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let conn = match listener.accept().await {
|
||||||
|
Err(_) => return,
|
||||||
|
Ok((conn, _addr)) => conn,
|
||||||
|
};
|
||||||
|
tokio::task::spawn(serve_connection(conn, data.clone()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
addr
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mock server connection handler.
|
||||||
|
///
|
||||||
|
/// Tries to send the canned data. Doesn't even try to read any data from the
|
||||||
|
/// client until the end. This assumes the total data written from client fits
|
||||||
|
/// within the socket buffer; it might cause deadlock otherwise.
|
||||||
|
async fn serve_connection(mut stream: tokio::net::TcpStream, data: Bytes) {
|
||||||
|
// Send.
|
||||||
|
if stream.write_all(&data[..]).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until the client has read it all and closes the connection.
|
||||||
|
// This reads and discards whatever the client sent earlier.
|
||||||
|
if stream.shutdown().await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
if stream.readable().await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut buf = [0u8; 1024];
|
||||||
|
match stream.try_read(&mut buf) {
|
||||||
|
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
|
||||||
|
Err(_) | Ok(0) => return,
|
||||||
|
Ok(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_test_data(max_payload_size: u16) -> Bytes {
|
||||||
|
let mut data = Vec::new();
|
||||||
|
data.extend_from_slice(include_bytes!(
|
||||||
|
"../src/client/testdata/hikvision_describe.txt"
|
||||||
|
));
|
||||||
|
data.extend_from_slice(include_bytes!("../src/client/testdata/hikvision_setup.txt"));
|
||||||
|
data.extend_from_slice(include_bytes!("../src/client/testdata/hikvision_play.txt"));
|
||||||
|
|
||||||
|
// These are the actual sizes of the slice NALs in one (two-second, 60-frame) Group of Pictures
|
||||||
|
// from a Dahua IP camera. This is about 3.5 Mbps.
|
||||||
|
let frame_sizes: [u32; 60] = [
|
||||||
|
667339, 7960, 2296, 3153, 3687, 3246, 3621, 2300, 2603, 2956, 2888, 3187, 3439, 3299, 3252,
|
||||||
|
3372, 3052, 2988, 2921, 2859, 2806, 3400, 2811, 3143, 2972, 4097, 2906, 3307, 3628, 3750,
|
||||||
|
3575, 3144, 3431, 3317, 3154, 3387, 3630, 3232, 3574, 3254, 4198, 4235, 4898, 4890, 4854,
|
||||||
|
4854, 4863, 4652, 4227, 4101, 3867, 3870, 3716, 3074, 3253, 3267, 3192, 3995, 3904, 3781,
|
||||||
|
];
|
||||||
|
let mut dummy_frame = vec![0; 1048576];
|
||||||
|
dummy_frame[4] = h264_reader::nal::UnitType::SliceLayerWithoutPartitioningIdr.id();
|
||||||
|
let mut p = retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104).unwrap();
|
||||||
|
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
|
||||||
|
let mut pkt_buf = vec![0; 65536];
|
||||||
|
for _ in 0..30 {
|
||||||
|
for &f in &frame_sizes {
|
||||||
|
&dummy_frame[0..4].copy_from_slice(&f.to_be_bytes()[..]);
|
||||||
|
let frame = Bytes::copy_from_slice(&dummy_frame[..(usize::try_from(f).unwrap() + 4)]);
|
||||||
|
p.push(timestamp, frame).unwrap();
|
||||||
|
while let Some(pkt) = p.pull().unwrap() {
|
||||||
|
let pkt_len = rtp_rs::RtpPacketBuilder::new()
|
||||||
|
.payload_type(96)
|
||||||
|
.marked(pkt.mark)
|
||||||
|
.sequence(rtp_rs::Seq::from(pkt.sequence_number))
|
||||||
|
.ssrc(0x4cacc3d1)
|
||||||
|
.timestamp(pkt.timestamp.timestamp() as u32)
|
||||||
|
.payload(&pkt.payload)
|
||||||
|
.build_into(&mut pkt_buf)
|
||||||
|
.unwrap();
|
||||||
|
data.push(b'$'); // interleaved data
|
||||||
|
data.push(0); // channel 0
|
||||||
|
data.extend_from_slice(&u16::try_from(pkt_len).unwrap().to_be_bytes()[..]);
|
||||||
|
data.extend_from_slice(&pkt_buf[..pkt_len]);
|
||||||
|
}
|
||||||
|
timestamp = timestamp.try_add(3000).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Bytes::from(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_to_eof(addr: SocketAddr) {
|
||||||
|
let url = Url::parse(&format!("rtsp://{}/", &addr)).unwrap();
|
||||||
|
let mut session = retina::client::Session::describe(url, None).await.unwrap();
|
||||||
|
session.setup(0).await.unwrap();
|
||||||
|
let session = session
|
||||||
|
.play(PlayPolicy::default())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.demuxed()
|
||||||
|
.unwrap();
|
||||||
|
tokio::pin!(session);
|
||||||
|
let mut i = 0;
|
||||||
|
while let Some(item) = session.next().await {
|
||||||
|
match item {
|
||||||
|
Ok(CodecItem::VideoFrame(_)) => i += 1,
|
||||||
|
o => panic!("bad item: {:#?}", o),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(i, 30 * 60);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn criterion_benchmark(c: &mut Criterion) {
|
||||||
|
let mut g = c.benchmark_group("client");
|
||||||
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let data = make_test_data(1440);
|
||||||
|
g.throughput(Throughput::Bytes(u64::try_from(data.len()).unwrap()));
|
||||||
|
let addr = rt.block_on(serve(data));
|
||||||
|
g.bench_function("h264", |b| b.to_async(&rt).iter(|| read_to_eof(addr)));
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(benches, criterion_benchmark);
|
||||||
|
criterion_main!(benches);
|
2
src/client/testdata/hikvision_describe.txt
vendored
2
src/client/testdata/hikvision_describe.txt
vendored
@ -1,5 +1,5 @@
|
|||||||
RTSP/1.0 200 OK
|
RTSP/1.0 200 OK
|
||||||
CSeq: 2
|
CSeq: 1
|
||||||
Content-Type: application/sdp
|
Content-Type: application/sdp
|
||||||
Content-Base: rtsp://192.168.5.106:554/Streaming/Channels/101/
|
Content-Base: rtsp://192.168.5.106:554/Streaming/Channels/101/
|
||||||
Content-Length: 902
|
Content-Length: 902
|
||||||
|
2
src/client/testdata/hikvision_play.txt
vendored
2
src/client/testdata/hikvision_play.txt
vendored
@ -1,5 +1,5 @@
|
|||||||
RTSP/1.0 200 OK
|
RTSP/1.0 200 OK
|
||||||
CSeq: 4
|
CSeq: 3
|
||||||
Session: 708345999
|
Session: 708345999
|
||||||
RTP-Info: url=rtsp://192.168.5.106:554/Streaming/Channels/101/trackID=1?transportmode=unicast&profile=Profile_1;seq=24104;rtptime=1270711678
|
RTP-Info: url=rtsp://192.168.5.106:554/Streaming/Channels/101/trackID=1?transportmode=unicast&profile=Profile_1;seq=24104;rtptime=1270711678
|
||||||
Date: Wed, May 05 2021 21:51:17 GMT
|
Date: Wed, May 05 2021 21:51:17 GMT
|
||||||
|
2
src/client/testdata/hikvision_setup.txt
vendored
2
src/client/testdata/hikvision_setup.txt
vendored
@ -1,5 +1,5 @@
|
|||||||
RTSP/1.0 200 OK
|
RTSP/1.0 200 OK
|
||||||
CSeq: 3
|
CSeq: 2
|
||||||
Session: 708345999;timeout=60
|
Session: 708345999;timeout=60
|
||||||
Transport: RTP/AVP/TCP;unicast;interleaved=0-1;ssrc=4cacc3d1;mode="play"
|
Transport: RTP/AVP/TCP;unicast;interleaved=0-1;ssrc=4cacc3d1;mode="play"
|
||||||
Date: Wed, May 05 2021 21:51:17 GMT
|
Date: Wed, May 05 2021 21:51:17 GMT
|
||||||
|
@ -23,6 +23,7 @@ pub mod h264;
|
|||||||
pub(crate) mod onvif;
|
pub(crate) mod onvif;
|
||||||
pub(crate) mod simple_audio;
|
pub(crate) mod simple_audio;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum CodecItem {
|
pub enum CodecItem {
|
||||||
VideoFrame(VideoFrame),
|
VideoFrame(VideoFrame),
|
||||||
AudioFrame(AudioFrame),
|
AudioFrame(AudioFrame),
|
||||||
|
10
src/lib.rs
10
src/lib.rs
@ -4,6 +4,7 @@
|
|||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use failure::{bail, format_err, Error};
|
use failure::{bail, format_err, Error};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
|
use pretty_hex::PrettyHex;
|
||||||
use rtsp_types::Message;
|
use rtsp_types::Message;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fmt::{Debug, Display};
|
use std::fmt::{Debug, Display};
|
||||||
@ -260,7 +261,14 @@ impl tokio_util::codec::Decoder for Codec {
|
|||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) {
|
let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) {
|
||||||
Ok((m, l)) => (m, l),
|
Ok((m, l)) => (m, l),
|
||||||
Err(rtsp_types::ParseError::Error) => bail!("RTSP parse error: {:#?}", &self.ctx),
|
Err(rtsp_types::ParseError::Error) => {
|
||||||
|
let snippet = &src[0..std::cmp::min(128, src.len())];
|
||||||
|
bail!(
|
||||||
|
"RTSP parse error at {:#?}: next bytes are {:#?}",
|
||||||
|
&self.ctx,
|
||||||
|
snippet.hex_dump()
|
||||||
|
)
|
||||||
|
}
|
||||||
Err(rtsp_types::ParseError::Incomplete) => return Ok(None),
|
Err(rtsp_types::ParseError::Incomplete) => return Ok(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user