diff --git a/src/client/mod.rs b/src/client/mod.rs index 4dc59ae..2a06744 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -571,7 +571,11 @@ impl Session { /// `Session::setup`, the erorrs in question will be ultimately /// returned from `Stream::demuxed`. pub async fn describe(url: Url, options: SessionOptions) -> Result { - let mut conn = RtspConnection::connect(&url, options).await?; + let conn = RtspConnection::connect(&url, options).await?; + Self::describe_with_conn(conn, url).await + } + + async fn describe_with_conn(mut conn: RtspConnection, url: Url) -> Result { let mut req = rtsp_types::Request::builder(rtsp_types::Method::Describe, rtsp_types::Version::V1_0) .header(rtsp_types::headers::ACCEPT, "application/sdp") @@ -838,6 +842,7 @@ impl Session { } } +#[derive(Debug)] pub enum PacketItem { RtpPacket(rtp::Packet), SenderReport(rtp::SenderReport), @@ -1164,8 +1169,217 @@ impl futures::Stream for Demuxed { #[cfg(test)] mod tests { use super::*; + use crate::testutil::response; - // TODO: tests of Reolink ignore_spurious_data scenarios. + /// Cross-platform, tokio equivalent of `socketpair(2)`. + async fn socketpair() -> (tokio::net::TcpStream, tokio::net::TcpStream) { + // Another process on the machine could connect to the server and mess + // this up, but that's unlikely enough to ignore in test code. + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let client = tokio::net::TcpStream::connect(addr); + let server = listener.accept(); + (client.await.unwrap(), server.await.unwrap().0) + } + + async fn connect_to_mock( + options: SessionOptions, + ) -> (RtspConnection, crate::tokio::Connection) { + let (client, server) = socketpair().await; + let client = crate::tokio::Connection::from_stream(client).unwrap(); + let server = crate::tokio::Connection::from_stream(server).unwrap(); + let client = RtspConnection { + inner: client, + options, + requested_auth: None, + next_cseq: 1, + }; + (client, server) + } + + /// Receives a request and sends a reply, filling in the matching `CSeq`. + async fn req_reply( + server: &mut crate::tokio::Connection, + expected_method: rtsp_types::Method, + mut response: rtsp_types::Response, + ) { + let msg = server.next().await.unwrap().unwrap(); + let cseq = match msg.msg { + rtsp_types::Message::Request(ref r) => { + assert_eq!(r.method(), expected_method); + r.header(&rtsp_types::headers::CSEQ).unwrap() + } + _ => panic!(), + }; + response.insert_header(rtsp_types::headers::CSEQ, cseq.as_str()); + server + .send(rtsp_types::Message::Response(response)) + .await + .unwrap(); + } + + /// Test the happy path of session initialization. + #[tokio::test] + async fn simple() { + let (conn, mut server) = connect_to_mock(SessionOptions::default()).await; + let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap(); + + // DESCRIBE. + let (session, _) = tokio::join!( + Session::describe_with_conn(conn, url), + req_reply( + &mut server, + rtsp_types::Method::Describe, + response(include_bytes!("testdata/reolink_describe.txt")) + ), + ); + let mut session = session.unwrap(); + assert_eq!(session.streams().len(), 2); + + // SETUP. + tokio::join!( + async { + session.setup(0).await.unwrap(); + }, + req_reply( + &mut server, + rtsp_types::Method::Setup, + response(include_bytes!("testdata/reolink_setup.txt")) + ), + ); + + // PLAY. + let (session, _) = tokio::join!( + session.play(PlayOptions::default()), + req_reply( + &mut server, + rtsp_types::Method::Play, + response(include_bytes!("testdata/reolink_play.txt")) + ), + ); + let session = session.unwrap(); + tokio::pin!(session); + + // Packet. + tokio::join!( + async { + match session.next().await { + Some(Ok(PacketItem::RtpPacket(p))) => { + assert_eq!(p.ssrc, 0xdcc4a0d8); + assert_eq!(p.sequence_number, 0x41d4); + assert_eq!(&p.payload[..], b"hello world"); + } + o => panic!("unexpected item: {:#?}", o), + } + }, + async { + let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; + server + .send(rtsp_types::Message::Data(rtsp_types::Data::new( + 0, + Bytes::from_static(pkt), + ))) + .await + .unwrap(); + }, + ); + + // End of stream. + tokio::join!( + async { + assert!(session.next().await.is_none()); + }, + async { + server.close().await.unwrap(); + }, + ); + } + + /// Tests the `ignore_spurious_data` feature: + /// * ignore a data packet while waiting for a RTSP response early on. + /// * ignore a data packet with the wrong ssrc after play. + #[tokio::test] + async fn ignore_spurious_data() { + let (conn, mut server) = + connect_to_mock(SessionOptions::default().ignore_spurious_data(true)).await; + let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap(); + let bogus_pkt = rtsp_types::Message::Data(rtsp_types::Data::new( + 0, + Bytes::from_static(b"\x80\x60\xaa\xaa\x00\x00\x00\x00\xbb\xbb\xbb\xbbbogus pkt"), + )); + + // DESCRIBE. + let (session, _) = tokio::join!(Session::describe_with_conn(conn, url), async { + server.send(bogus_pkt.clone()).await.unwrap(); + req_reply( + &mut server, + rtsp_types::Method::Describe, + response(include_bytes!("testdata/reolink_describe.txt")), + ) + .await; + },); + let mut session = session.unwrap(); + assert_eq!(session.streams().len(), 2); + + // SETUP. + tokio::join!( + async { + session.setup(0).await.unwrap(); + }, + req_reply( + &mut server, + rtsp_types::Method::Setup, + response(include_bytes!("testdata/reolink_setup.txt")) + ), + ); + + // PLAY. + let (session, _) = tokio::join!( + session.play(PlayOptions::default()), + req_reply( + &mut server, + rtsp_types::Method::Play, + response(include_bytes!("testdata/reolink_play.txt")) + ), + ); + let session = session.unwrap(); + tokio::pin!(session); + + // Packet. + tokio::join!( + async { + match session.next().await { + Some(Ok(PacketItem::RtpPacket(p))) => { + assert_eq!(p.ssrc, 0xdcc4a0d8); + assert_eq!(p.sequence_number, 0x41d4); + assert_eq!(&p.payload[..], b"hello world"); + } + o => panic!("unexpected item: {:#?}", o), + } + }, + async { + server.send(bogus_pkt).await.unwrap(); + let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; + server + .send(rtsp_types::Message::Data(rtsp_types::Data::new( + 0, + Bytes::from_static(pkt), + ))) + .await + .unwrap(); + }, + ); + + // End of stream. + tokio::join!( + async { + assert!(session.next().await.is_none()); + }, + async { + server.close().await.unwrap(); + }, + ); + } // See with: cargo test -- --nocapture client::tests::print_sizes #[test] diff --git a/src/client/parse.rs b/src/client/parse.rs index aabc0db..b2b6868 100644 --- a/src/client/parse.rs +++ b/src/client/parse.rs @@ -587,21 +587,12 @@ pub(crate) fn parse_play( mod tests { use std::num::NonZeroU16; - use bytes::Bytes; use url::Url; use crate::{client::StreamStateInit, codec::Parameters}; use super::super::StreamState; - - fn response(raw: &'static [u8]) -> rtsp_types::Response { - let (msg, len) = rtsp_types::Message::parse(raw).unwrap(); - assert_eq!(len, raw.len()); - match msg { - rtsp_types::Message::Response(r) => r.map_body(|b| Bytes::from_static(b)), - _ => panic!("unexpected message type"), - } - } + use crate::testutil::response; fn parse_describe( raw_url: &str, diff --git a/src/lib.rs b/src/lib.rs index 5c1588b..c865ca5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,9 @@ use std::num::NonZeroU32; mod error; mod rtcp; +#[cfg(test)] +mod testutil; + pub use error::Error; /// Wraps the supplied `ErrorInt` and returns it as an `Err`. diff --git a/src/testutil.rs b/src/testutil.rs new file mode 100644 index 0000000..d124703 --- /dev/null +++ b/src/testutil.rs @@ -0,0 +1,13 @@ +// Copyright (C) 2021 Scott Lamb +// SPDX-License-Identifier: MIT OR Apache-2.0 + +use bytes::Bytes; + +pub(crate) fn response(raw: &'static [u8]) -> rtsp_types::Response { + let (msg, len) = rtsp_types::Message::parse(raw).unwrap(); + assert_eq!(len, raw.len()); + match msg { + rtsp_types::Message::Response(r) => r.map_body(|b| Bytes::from_static(b)), + _ => panic!("unexpected message type"), + } +} diff --git a/src/tokio.rs b/src/tokio.rs index f6235f4..786e1e9 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -29,6 +29,10 @@ impl Connection { Host::Ipv4(h) => TcpStream::connect((h, port)).await, Host::Ipv6(h) => TcpStream::connect((h, port)).await, }?; + Self::from_stream(stream) + } + + pub(crate) fn from_stream(stream: TcpStream) -> Result { let established_wall = WallTime::now(); let established = Instant::now(); let local_addr = stream.local_addr()?;