diff --git a/Cargo.toml b/Cargo.toml index 71247a6..435ebbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ criterion = { version = "0.3.4", features = ["async_tokio"] } itertools = "0.10.1" mylog = { git = "https://github.com/scottlamb/mylog" } structopt = "0.3.21" -tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "signal"] } +tokio = { version = "1.5.0", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "signal", "test-util"] } [profile.bench] debug = true diff --git a/src/client/mod.rs b/src/client/mod.rs index f2f65f5..5775c90 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1947,6 +1947,8 @@ impl futures::Stream for Demuxed { #[cfg(test)] mod tests { + use std::str::FromStr; + use super::*; use crate::testutil::response; @@ -1973,6 +1975,19 @@ mod tests { (client, server) } + fn init_logging() { + let h = mylog::Builder::new() + .set_format( + ::std::env::var("MOONFIRE_FORMAT") + .map_err(|_| ()) + .and_then(|s| mylog::Format::from_str(&s)) + .unwrap_or(mylog::Format::Google), + ) + .set_spec(::std::env::var("MOONFIRE_LOG").as_deref().unwrap_or("info")) + .build(); + let _ = h.install(); + } + /// Receives a request and sends a response, filling in the matching `CSeq`. async fn req_response( server: &mut crate::tokio::Connection, @@ -1994,15 +2009,21 @@ mod tests { .unwrap(); } - /// Tests the happy path of session initialization. - #[tokio::test] + /// Tests the happy path from initialization to teardown (first attempt succeeds). + #[tokio::test(start_paused = true)] async fn simple() { + init_logging(); let (conn, mut server) = connect_to_mock().await; let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap(); + let group = Arc::new(SessionGroup::default()); // DESCRIBE. let (session, _) = tokio::join!( - Session::describe_with_conn(conn, SessionOptions::default(), url), + Session::describe_with_conn( + conn, + SessionOptions::default().session_group(group.clone()), + url + ), req_response( &mut server, rtsp_types::Method::Describe, @@ -2033,47 +2054,165 @@ mod tests { response(include_bytes!("testdata/reolink_play.txt")) ), ); - let session = session.unwrap(); - tokio::pin!(session); + let drop_time; + { + let session = session.unwrap(); + tokio::pin!(session); - // Packet. + // Packet. + tokio::join!( + async { + match session.next().await { + Some(Ok(PacketItem::RtpPacket(p))) => { + assert_eq!(p.ssrc, 0xdcc4a0d8); + assert_eq!(p.sequence_number, 0x41d4); + assert_eq!(&p.payload[..], b"hello world"); + } + o => panic!("unexpected item: {:#?}", o), + } + }, + async { + let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; + server + .send(rtsp_types::Message::Data(rtsp_types::Data::new( + 0, + Bytes::from_static(pkt), + ))) + .await + .unwrap(); + }, + ); + drop_time = tokio::time::Instant::now(); + }; + + // Drop (initiated by exiting the scope above). + // This server advertises an ancient version of live555, so Retina + // sends a TEARDOWN even with TCP. + let stale_sessions = group.stale_sessions(); + assert_eq!(stale_sessions.num_sessions, 1); + tokio::join!( + group.await_stale_sessions(&stale_sessions), + req_response( + &mut server, + rtsp_types::Method::Teardown, + response(include_bytes!("testdata/reolink_teardown.txt")) + ), + ); + assert_eq!(group.stale_sessions().num_sessions, 0); + + // elapsed is not zero because tokio advances the time unnecessarily, grr. + // https://github.com/tokio-rs/tokio/issues/3108 + let elapsed = tokio::time::Instant::now() - drop_time; + assert!( + elapsed < std::time::Duration::from_secs(60), + "elapsed={:?}", + elapsed + ); + } + + /// As above, but TEARDOWN fails until session expiration. + #[tokio::test(start_paused = true)] + async fn session_expiration() { + init_logging(); + let (conn, mut server) = connect_to_mock().await; + let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap(); + let group = Arc::new(SessionGroup::default()); + + // DESCRIBE. + let (session, _) = tokio::join!( + Session::describe_with_conn( + conn, + SessionOptions::default().session_group(group.clone()), + url + ), + req_response( + &mut server, + rtsp_types::Method::Describe, + response(include_bytes!("testdata/reolink_describe.txt")) + ), + ); + let mut session = session.unwrap(); + assert_eq!(session.streams().len(), 2); + + // SETUP. tokio::join!( async { - match session.next().await { - Some(Ok(PacketItem::RtpPacket(p))) => { - assert_eq!(p.ssrc, 0xdcc4a0d8); - assert_eq!(p.sequence_number, 0x41d4); - assert_eq!(&p.payload[..], b"hello world"); - } - o => panic!("unexpected item: {:#?}", o), - } - }, - async { - let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; - server - .send(rtsp_types::Message::Data(rtsp_types::Data::new( - 0, - Bytes::from_static(pkt), - ))) - .await - .unwrap(); + session.setup(0).await.unwrap(); }, + req_response( + &mut server, + rtsp_types::Method::Setup, + response(include_bytes!("testdata/reolink_setup.txt")) + ), ); - // End of stream. - tokio::join!( - async { - assert!(session.next().await.is_none()); - }, - async { - server.close().await.unwrap(); - }, + // PLAY. + let (session, _) = tokio::join!( + session.play(PlayOptions::default()), + req_response( + &mut server, + rtsp_types::Method::Play, + response(include_bytes!("testdata/reolink_play.txt")) + ), + ); + let drop_time; + { + let session = session.unwrap(); + tokio::pin!(session); + + // Packet. + tokio::join!( + async { + match session.next().await { + Some(Ok(PacketItem::RtpPacket(p))) => { + assert_eq!(p.ssrc, 0xdcc4a0d8); + assert_eq!(p.sequence_number, 0x41d4); + assert_eq!(&p.payload[..], b"hello world"); + } + o => panic!("unexpected item: {:#?}", o), + } + }, + async { + let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; + server + .send(rtsp_types::Message::Data(rtsp_types::Data::new( + 0, + Bytes::from_static(pkt), + ))) + .await + .unwrap(); + }, + ); + drop_time = tokio::time::Instant::now(); + } + + // Drop (initiated by exiting the scope above). + // This server advertises an ancient version of live555, so Retina + // sends a TEARDOWN even with TCP. + server.close().await.unwrap(); + let stale_sessions = group.stale_sessions(); + assert_eq!(stale_sessions.num_sessions, 1); + + // Even if repeated attempts fail, the stale session will go await on timeout. + // The "60" here is the RFC-defined default timeout when none is specified + // in the SETUP response. + group.await_stale_sessions(&stale_sessions).await; + assert_eq!(group.stale_sessions().num_sessions, 0); + + // elapsed is not zero because tokio advances the time unnecessarily, grr. + // https://github.com/tokio-rs/tokio/issues/3108 + let elapsed = tokio::time::Instant::now() - drop_time; + assert!( + elapsed >= std::time::Duration::from_secs(60), + "elapsed={:?}", + elapsed ); } /// Tests ignoring bogus RTP and RTCP messages while waiting for PLAY response. #[tokio::test] async fn ignore_early_rtp_rtcp() { + init_logging(); let (conn, mut server) = connect_to_mock().await; let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap(); let bogus_rtp = rtsp_types::Message::Data(rtsp_types::Data::new( @@ -2129,6 +2268,7 @@ mod tests { // See with: cargo test -- --nocapture client::tests::print_sizes #[test] fn print_sizes() { + init_logging(); for (name, size) in &[ ("PacketItem", std::mem::size_of::()), ("Presentation", std::mem::size_of::()), @@ -2153,6 +2293,7 @@ mod tests { #[test] fn check_live555_tcp_bug() { + init_logging(); assert!(!has_live555_tcp_bug("not live555")); assert!(!has_live555_tcp_bug("LIVE555 Streaming Media v")); assert!(has_live555_tcp_bug("LIVE555 Streaming Media v2013.04.08"));