test teardown & normal session expiration

This commit is contained in:
Scott Lamb 2021-12-29 21:24:17 -08:00
parent 9a519c8e25
commit 3187be6c41
2 changed files with 174 additions and 33 deletions

View File

@ -39,7 +39,7 @@ criterion = { version = "0.3.4", features = ["async_tokio"] }
itertools = "0.10.1"
mylog = { git = "https://github.com/scottlamb/mylog" }
structopt = "0.3.21"
tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "signal"] }
tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "signal", "test-util"] }
[profile.bench]
debug = true

View File

@ -1947,6 +1947,8 @@ impl futures::Stream for Demuxed {
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::*;
use crate::testutil::response;
@ -1973,6 +1975,19 @@ mod tests {
(client, server)
}
fn init_logging() {
let h = mylog::Builder::new()
.set_format(
::std::env::var("MOONFIRE_FORMAT")
.map_err(|_| ())
.and_then(|s| mylog::Format::from_str(&s))
.unwrap_or(mylog::Format::Google),
)
.set_spec(::std::env::var("MOONFIRE_LOG").as_deref().unwrap_or("info"))
.build();
let _ = h.install();
}
/// Receives a request and sends a response, filling in the matching `CSeq`.
async fn req_response(
server: &mut crate::tokio::Connection,
@ -1994,15 +2009,21 @@ mod tests {
.unwrap();
}
/// Tests the happy path of session initialization.
#[tokio::test]
/// Tests the happy path from initialization to teardown (first attempt succeeds).
#[tokio::test(start_paused = true)]
async fn simple() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let group = Arc::new(SessionGroup::default());
// DESCRIBE.
let (session, _) = tokio::join!(
Session::describe_with_conn(conn, SessionOptions::default(), url),
Session::describe_with_conn(
conn,
SessionOptions::default().session_group(group.clone()),
url
),
req_response(
&mut server,
rtsp_types::Method::Describe,
@ -2033,47 +2054,165 @@ mod tests {
response(include_bytes!("testdata/reolink_play.txt"))
),
);
let session = session.unwrap();
tokio::pin!(session);
let drop_time;
{
let session = session.unwrap();
tokio::pin!(session);
// Packet.
// Packet.
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
},
async {
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(pkt),
)))
.await
.unwrap();
},
);
drop_time = tokio::time::Instant::now();
};
// Drop (initiated by exiting the scope above).
// This server advertises an ancient version of live555, so Retina
// sends a TEARDOWN even with TCP.
let stale_sessions = group.stale_sessions();
assert_eq!(stale_sessions.num_sessions, 1);
tokio::join!(
group.await_stale_sessions(&stale_sessions),
req_response(
&mut server,
rtsp_types::Method::Teardown,
response(include_bytes!("testdata/reolink_teardown.txt"))
),
);
assert_eq!(group.stale_sessions().num_sessions, 0);
// elapsed is not zero because tokio advances the time unnecessarily, grr.
// https://github.com/tokio-rs/tokio/issues/3108
let elapsed = tokio::time::Instant::now() - drop_time;
assert!(
elapsed < std::time::Duration::from_secs(60),
"elapsed={:?}",
elapsed
);
}
/// As above, but TEARDOWN fails until session expiration.
#[tokio::test(start_paused = true)]
async fn session_expiration() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let group = Arc::new(SessionGroup::default());
// DESCRIBE.
let (session, _) = tokio::join!(
Session::describe_with_conn(
conn,
SessionOptions::default().session_group(group.clone()),
url
),
req_response(
&mut server,
rtsp_types::Method::Describe,
response(include_bytes!("testdata/reolink_describe.txt"))
),
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
// SETUP.
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
},
async {
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(pkt),
)))
.await
.unwrap();
session.setup(0).await.unwrap();
},
req_response(
&mut server,
rtsp_types::Method::Setup,
response(include_bytes!("testdata/reolink_setup.txt"))
),
);
// End of stream.
tokio::join!(
async {
assert!(session.next().await.is_none());
},
async {
server.close().await.unwrap();
},
// PLAY.
let (session, _) = tokio::join!(
session.play(PlayOptions::default()),
req_response(
&mut server,
rtsp_types::Method::Play,
response(include_bytes!("testdata/reolink_play.txt"))
),
);
let drop_time;
{
let session = session.unwrap();
tokio::pin!(session);
// Packet.
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
},
async {
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(pkt),
)))
.await
.unwrap();
},
);
drop_time = tokio::time::Instant::now();
}
// Drop (initiated by exiting the scope above).
// This server advertises an ancient version of live555, so Retina
// sends a TEARDOWN even with TCP.
server.close().await.unwrap();
let stale_sessions = group.stale_sessions();
assert_eq!(stale_sessions.num_sessions, 1);
// Even if repeated attempts fail, the stale session will go await on timeout.
// The "60" here is the RFC-defined default timeout when none is specified
// in the SETUP response.
group.await_stale_sessions(&stale_sessions).await;
assert_eq!(group.stale_sessions().num_sessions, 0);
// elapsed is not zero because tokio advances the time unnecessarily, grr.
// https://github.com/tokio-rs/tokio/issues/3108
let elapsed = tokio::time::Instant::now() - drop_time;
assert!(
elapsed >= std::time::Duration::from_secs(60),
"elapsed={:?}",
elapsed
);
}
/// Tests ignoring bogus RTP and RTCP messages while waiting for PLAY response.
#[tokio::test]
async fn ignore_early_rtp_rtcp() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let bogus_rtp = rtsp_types::Message::Data(rtsp_types::Data::new(
@ -2129,6 +2268,7 @@ mod tests {
// See with: cargo test -- --nocapture client::tests::print_sizes
#[test]
fn print_sizes() {
init_logging();
for (name, size) in &[
("PacketItem", std::mem::size_of::<PacketItem>()),
("Presentation", std::mem::size_of::<Presentation>()),
@ -2153,6 +2293,7 @@ mod tests {
#[test]
fn check_live555_tcp_bug() {
init_logging();
assert!(!has_live555_tcp_bug("not live555"));
assert!(!has_live555_tcp_bug("LIVE555 Streaming Media v"));
assert!(has_live555_tcp_bug("LIVE555 Streaming Media v2013.04.08"));