basic client tests with mock server

There's still a lot of untested functionality, but this is a start.
This commit is contained in:
Scott Lamb 2021-08-20 16:40:15 -07:00
parent 76b3a61eb7
commit 915e6b7404
5 changed files with 237 additions and 12 deletions

View File

@ -571,7 +571,11 @@ impl Session<Described> {
/// `Session<Described>::setup`, the erorrs in question will be ultimately
/// returned from `Stream<Playing>::demuxed`.
pub async fn describe(url: Url, options: SessionOptions) -> Result<Self, Error> {
let mut conn = RtspConnection::connect(&url, options).await?;
let conn = RtspConnection::connect(&url, options).await?;
Self::describe_with_conn(conn, url).await
}
async fn describe_with_conn(mut conn: RtspConnection, url: Url) -> Result<Self, Error> {
let mut req =
rtsp_types::Request::builder(rtsp_types::Method::Describe, rtsp_types::Version::V1_0)
.header(rtsp_types::headers::ACCEPT, "application/sdp")
@ -838,6 +842,7 @@ impl Session<Described> {
}
}
#[derive(Debug)]
pub enum PacketItem {
RtpPacket(rtp::Packet),
SenderReport(rtp::SenderReport),
@ -1164,8 +1169,217 @@ impl futures::Stream for Demuxed {
#[cfg(test)]
mod tests {
use super::*;
use crate::testutil::response;
// TODO: tests of Reolink ignore_spurious_data scenarios.
/// Cross-platform, tokio equivalent of `socketpair(2)`.
async fn socketpair() -> (tokio::net::TcpStream, tokio::net::TcpStream) {
// Another process on the machine could connect to the server and mess
// this up, but that's unlikely enough to ignore in test code.
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let client = tokio::net::TcpStream::connect(addr);
let server = listener.accept();
(client.await.unwrap(), server.await.unwrap().0)
}
async fn connect_to_mock(
options: SessionOptions,
) -> (RtspConnection, crate::tokio::Connection) {
let (client, server) = socketpair().await;
let client = crate::tokio::Connection::from_stream(client).unwrap();
let server = crate::tokio::Connection::from_stream(server).unwrap();
let client = RtspConnection {
inner: client,
options,
requested_auth: None,
next_cseq: 1,
};
(client, server)
}
/// Receives a request and sends a reply, filling in the matching `CSeq`.
async fn req_reply(
server: &mut crate::tokio::Connection,
expected_method: rtsp_types::Method,
mut response: rtsp_types::Response<Bytes>,
) {
let msg = server.next().await.unwrap().unwrap();
let cseq = match msg.msg {
rtsp_types::Message::Request(ref r) => {
assert_eq!(r.method(), expected_method);
r.header(&rtsp_types::headers::CSEQ).unwrap()
}
_ => panic!(),
};
response.insert_header(rtsp_types::headers::CSEQ, cseq.as_str());
server
.send(rtsp_types::Message::Response(response))
.await
.unwrap();
}
/// Test the happy path of session initialization.
#[tokio::test]
async fn simple() {
let (conn, mut server) = connect_to_mock(SessionOptions::default()).await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
// DESCRIBE.
let (session, _) = tokio::join!(
Session::describe_with_conn(conn, url),
req_reply(
&mut server,
rtsp_types::Method::Describe,
response(include_bytes!("testdata/reolink_describe.txt"))
),
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
// SETUP.
tokio::join!(
async {
session.setup(0).await.unwrap();
},
req_reply(
&mut server,
rtsp_types::Method::Setup,
response(include_bytes!("testdata/reolink_setup.txt"))
),
);
// PLAY.
let (session, _) = tokio::join!(
session.play(PlayOptions::default()),
req_reply(
&mut server,
rtsp_types::Method::Play,
response(include_bytes!("testdata/reolink_play.txt"))
),
);
let session = session.unwrap();
tokio::pin!(session);
// Packet.
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
},
async {
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(pkt),
)))
.await
.unwrap();
},
);
// End of stream.
tokio::join!(
async {
assert!(session.next().await.is_none());
},
async {
server.close().await.unwrap();
},
);
}
/// Tests the `ignore_spurious_data` feature:
/// * ignore a data packet while waiting for a RTSP response early on.
/// * ignore a data packet with the wrong ssrc after play.
#[tokio::test]
async fn ignore_spurious_data() {
let (conn, mut server) =
connect_to_mock(SessionOptions::default().ignore_spurious_data(true)).await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let bogus_pkt = rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(b"\x80\x60\xaa\xaa\x00\x00\x00\x00\xbb\xbb\xbb\xbbbogus pkt"),
));
// DESCRIBE.
let (session, _) = tokio::join!(Session::describe_with_conn(conn, url), async {
server.send(bogus_pkt.clone()).await.unwrap();
req_reply(
&mut server,
rtsp_types::Method::Describe,
response(include_bytes!("testdata/reolink_describe.txt")),
)
.await;
},);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
// SETUP.
tokio::join!(
async {
session.setup(0).await.unwrap();
},
req_reply(
&mut server,
rtsp_types::Method::Setup,
response(include_bytes!("testdata/reolink_setup.txt"))
),
);
// PLAY.
let (session, _) = tokio::join!(
session.play(PlayOptions::default()),
req_reply(
&mut server,
rtsp_types::Method::Play,
response(include_bytes!("testdata/reolink_play.txt"))
),
);
let session = session.unwrap();
tokio::pin!(session);
// Packet.
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
},
async {
server.send(bogus_pkt).await.unwrap();
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(pkt),
)))
.await
.unwrap();
},
);
// End of stream.
tokio::join!(
async {
assert!(session.next().await.is_none());
},
async {
server.close().await.unwrap();
},
);
}
// See with: cargo test -- --nocapture client::tests::print_sizes
#[test]

View File

@ -587,21 +587,12 @@ pub(crate) fn parse_play(
mod tests {
use std::num::NonZeroU16;
use bytes::Bytes;
use url::Url;
use crate::{client::StreamStateInit, codec::Parameters};
use super::super::StreamState;
fn response(raw: &'static [u8]) -> rtsp_types::Response<Bytes> {
let (msg, len) = rtsp_types::Message::parse(raw).unwrap();
assert_eq!(len, raw.len());
match msg {
rtsp_types::Message::Response(r) => r.map_body(|b| Bytes::from_static(b)),
_ => panic!("unexpected message type"),
}
}
use crate::testutil::response;
fn parse_describe(
raw_url: &str,

View File

@ -11,6 +11,9 @@ use std::num::NonZeroU32;
mod error;
mod rtcp;
#[cfg(test)]
mod testutil;
pub use error::Error;
/// Wraps the supplied `ErrorInt` and returns it as an `Err`.

13
src/testutil.rs Normal file
View File

@ -0,0 +1,13 @@
// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
use bytes::Bytes;
pub(crate) fn response(raw: &'static [u8]) -> rtsp_types::Response<Bytes> {
let (msg, len) = rtsp_types::Message::parse(raw).unwrap();
assert_eq!(len, raw.len());
match msg {
rtsp_types::Message::Response(r) => r.map_body(|b| Bytes::from_static(b)),
_ => panic!("unexpected message type"),
}
}

View File

@ -29,6 +29,10 @@ impl Connection {
Host::Ipv4(h) => TcpStream::connect((h, port)).await,
Host::Ipv6(h) => TcpStream::connect((h, port)).await,
}?;
Self::from_stream(stream)
}
pub(crate) fn from_stream(stream: TcpStream) -> Result<Self, std::io::Error> {
let established_wall = WallTime::now();
let established = Instant::now();
let local_addr = stream.local_addr()?;