From 9e9366f1158bda30b2239e917d368e7b88244ca0 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 27 Aug 2021 16:30:55 -0700 Subject: [PATCH] experimental UDP support (fixes #30) * support setting transport to UDP * breaking change: use new PacketContext rather than RtspMessageContext in places where an RTP/RTCP packet is expected, as it can now be over UDP instead of via RTSP interleaved data * send teardown, which is important with UDP sessions. (It also will help somewhat with Reolink TCP.) Along the way, make Session Unpin so that teardown can consume it without tripping over tokio::pin!(). * refactor to shrink the amount of data used by Session and how much gets memmoved around on the stack, instead of further growing it. Because of missing RTCP RR and reorder buffer support, this is only appropriate for using on a LAN. That's enough for me right now. --- CHANGELOG.md | 4 +- Cargo.lock | 3 +- Cargo.toml | 3 +- README.md | 3 +- benches/depacketize.rs | 11 +- examples/client/metadata.rs | 4 +- examples/client/mp4.rs | 22 +- fuzz/Cargo.lock | 3 +- fuzz/fuzz_targets/depacketize_h264.rs | 5 +- src/client/mod.rs | 861 +++++++++++++++++++------- src/client/parse.rs | 47 +- src/client/rtp.rs | 75 +-- src/codec/aac.rs | 7 +- src/codec/h264.rs | 58 +- src/codec/mod.rs | 12 +- src/codec/onvif.rs | 2 +- src/error.rs | 23 +- src/lib.rs | 133 +++- src/tokio.rs | 24 +- 19 files changed, 933 insertions(+), 367 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7538791..cebc29b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ -## unreleased +## `v0.3.0` (unreleased) +* [#30](https://github.com/scottlamb/retina/issues/30): experimental UDP + support. * [#22](https://github.com/scottlamb/retina/issues/22): fix handling of 44.1 kHz AAC audio. diff --git a/Cargo.lock b/Cargo.lock index e1a4eba..8c15b07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -989,7 +989,7 @@ checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" [[package]] name = "retina" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", "base64", @@ -1005,6 +1005,7 @@ dependencies = [ "once_cell", "pin-project", "pretty-hex", + "rand", "rtp-rs", "rtsp-types", "sdp", diff --git a/Cargo.toml b/Cargo.toml index a455006..1e8ccd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "retina" -version = "0.2.0" +version = "0.3.0" authors = ["Scott Lamb "] license = "MIT/Apache-2.0" edition = "2018" @@ -22,6 +22,7 @@ log = "0.4.8" once_cell = "1.7.2" pin-project = "1.0.7" pretty-hex = "0.2.1" +rand = "0.8.3" rtp-rs = "0.6.0" rtsp-types = "0.0.2" sdp = "0.1.4" diff --git a/README.md b/README.md index 5f9172b..2d6e8c7 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ Progress: * [x] client support * [x] digest authentication. * [x] RTP over TCP via RTSP interleaved channels. - * [ ] RTP over UDP. + * [x] RTP over UDP (experimental). + * * [ ] re-order buffer. (Out-of-order packets are dropped now.) * [x] RTSP/1.0. * [ ] RTSP/2.0. * [ ] SRTP. diff --git a/benches/depacketize.rs b/benches/depacketize.rs index 6750edc..fe621e5 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -4,7 +4,7 @@ use std::num::NonZeroU16; use criterion::{criterion_group, criterion_main, Criterion}; -use retina::client::{rtp::StrictSequenceChecker, Timeline}; +use retina::client::{rtp::InorderParser, Timeline}; use retina::codec::{CodecItem, Depacketizer}; use std::convert::TryFrom; use std::io::Write; @@ -24,15 +24,15 @@ fn h264_aac ()>(mut f: F) { Timeline::new(Some(0), 90_000, None).unwrap(), ]; let mut rtps = [ - StrictSequenceChecker::new(None, Some(1)), - StrictSequenceChecker::new(None, Some(1)), + InorderParser::new(None, Some(1)), + InorderParser::new(None, Some(1)), ]; let mut depacketizers = [ Depacketizer::new("audio", "mpeg4-generic", 12_000, NonZeroU16::new(2), Some("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490")).unwrap(), Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(), ]; let conn_ctx = retina::ConnectionContext::dummy(); - let msg_ctx = retina::RtspMessageContext::dummy(); + let pkt_ctx = retina::PacketContext::dummy(); while !remaining.is_empty() { assert!(remaining.len() > 4); assert_eq!(remaining[0], b'$'); @@ -50,9 +50,8 @@ fn h264_aac ()>(mut f: F) { let pkt = match rtps[stream_id].rtp( &retina::client::SessionOptions::default(), &conn_ctx, - &msg_ctx, + &pkt_ctx, &mut timelines[stream_id], - channel_id, stream_id, data, ) { diff --git a/examples/client/metadata.rs b/examples/client/metadata.rs index 1512392..42eb205 100644 --- a/examples/client/metadata.rs +++ b/examples/client/metadata.rs @@ -29,12 +29,11 @@ pub async fn run(opts: Opts) -> Result<(), Error> { .position(|s| matches!(s.parameters(), Some(retina::codec::Parameters::Message(..)))) .ok_or_else(|| anyhow!("couldn't find onvif stream"))?; session.setup(onvif_stream_i).await?; - let session = session + let mut session = session .play(retina::client::PlayOptions::default().ignore_zero_seq(true)) .await? .demuxed()?; - tokio::pin!(session); tokio::pin!(stop); loop { tokio::select! { @@ -51,5 +50,6 @@ pub async fn run(opts: Opts) -> Result<(), Error> { }, } } + session.teardown().await?; Ok(()) } diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 4fa36cc..86b2790 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -20,8 +20,11 @@ use anyhow::{anyhow, bail, Error}; use bytes::{Buf, BufMut, BytesMut}; use futures::StreamExt; -use log::info; -use retina::codec::{AudioParameters, CodecItem, VideoParameters}; +use log::{info, warn}; +use retina::{ + client::Transport, + codec::{AudioParameters, CodecItem, VideoParameters}, +}; use std::convert::TryFrom; use std::io::SeekFrom; @@ -60,6 +63,12 @@ pub struct Opts { #[structopt(long, name = "secs")] duration: Option, + /// The transport to use: `tcp` or `udp` (experimental). + /// + /// Note: `--allow-loss` is strongly recommended with `udp`. + #[structopt(default_value, long)] + transport: retina::client::Transport, + /// Path to `.mp4` file to write. #[structopt(parse(try_from_str))] out: PathBuf, @@ -564,6 +573,10 @@ impl Mp4Writer { } pub async fn run(opts: Opts) -> Result<(), Error> { + if matches!(opts.transport, Transport::Udp) && !opts.allow_loss { + warn!("Using --transport=udp without strongly recommended --allow-loss!"); + } + let creds = super::creds(opts.src.username, opts.src.password); let stop_signal = tokio::signal::ctrl_c(); let mut session = retina::client::Session::describe( @@ -571,6 +584,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { retina::client::SessionOptions::default() .creds(creds) .user_agent("Retina mp4 example".to_owned()) + .transport(opts.transport) .ignore_spurious_data(opts.ignore_spurious_data), ) .await?; @@ -604,7 +618,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { if let Some((i, _)) = audio_stream { session.setup(i).await?; } - let session = session + let mut session = session .play( retina::client::PlayOptions::default() .initial_timestamp(opts.initial_timestamp) @@ -629,7 +643,6 @@ pub async fn run(opts: Opts) -> Result<(), Error> { } None => futures::future::Either::Right(futures::future::pending()), }; - tokio::pin!(session); tokio::pin!(stop_signal); tokio::pin!(sleep); loop { @@ -654,6 +667,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { }, } } + session.teardown().await?; mp4.finish().await?; Ok(()) } diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index fb50abe..09fe588 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -544,7 +544,7 @@ dependencies = [ [[package]] name = "retina" -version = "0.2.0" +version = "0.3.0" dependencies = [ "base64", "bitreader", @@ -557,6 +557,7 @@ dependencies = [ "once_cell", "pin-project", "pretty-hex", + "rand", "rtp-rs", "rtsp-types", "sdp", diff --git a/fuzz/fuzz_targets/depacketize_h264.rs b/fuzz/fuzz_targets/depacketize_h264.rs index 6b5fcee..e593aba 100644 --- a/fuzz/fuzz_targets/depacketize_h264.rs +++ b/fuzz/fuzz_targets/depacketize_h264.rs @@ -13,7 +13,7 @@ fuzz_target!(|data: &[u8]| { let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap(); let mut sequence_number: u16 = 0; let conn_ctx = retina::ConnectionContext::dummy(); - let msg_ctx = retina::RtspMessageContext::dummy(); + let pkt_ctx = retina::PacketContext::dummy(); while data.has_remaining() { let hdr = data.get_u8(); let ts_change = (hdr & 0b001) != 0; @@ -30,8 +30,7 @@ fuzz_target!(|data: &[u8]| { timestamp = timestamp.try_add(1).unwrap(); } let pkt = retina::client::rtp::Packet { - ctx: msg_ctx, - channel_id: 0, + ctx: pkt_ctx, stream_id: 0, timestamp, ssrc: 0, diff --git a/src/client/mod.rs b/src/client/mod.rs index 2cb1b9d..2bbb2e8 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,11 @@ // Copyright (C) 2021 Scott Lamb // SPDX-License-Identifier: MIT OR Apache-2.0 +use std::mem::MaybeUninit; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::num::NonZeroU32; use std::task::Poll; +use std::time::Instant; use std::{borrow::Cow, fmt::Debug, num::NonZeroU16, pin::Pin}; use self::channel_mapping::*; @@ -11,8 +14,8 @@ use bytes::Bytes; use futures::{ready, Future, SinkExt, StreamExt}; use log::{debug, trace, warn}; use pin_project::pin_project; -use sdp::session_description::SessionDescription; -use tokio::pin; +use rtsp_types::Method; +use tokio::net::UdpSocket; use url::Url; use crate::codec::CodecItem; @@ -91,8 +94,54 @@ impl std::str::FromStr for InitialTimestampPolicy { #[derive(Default)] pub struct SessionOptions { creds: Option, - user_agent: String, + user_agent: Option>, ignore_spurious_data: bool, + transport: Transport, +} + +#[derive(Copy, Clone, Debug)] +pub enum Transport { + Tcp, + + /// UDP (experimental). + /// + /// This support is currently only suitable for a LAN for a couple reasons: + /// * There's no reorder buffer, so out-of-order packets are all dropped. + /// * There's no support for sending RTCP RRs (receiver reports), so + /// servers won't have the correct information to measure packet loss + /// and pace packets appropriately. + Udp, +} + +impl Default for Transport { + fn default() -> Self { + Transport::Tcp + } +} + +impl std::fmt::Display for Transport { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.pad(match self { + Transport::Tcp => "tcp", + Transport::Udp => "udp", + }) + } +} + +impl std::str::FromStr for Transport { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(match s { + "tcp" => Transport::Tcp, + "udp" => Transport::Udp, + _ => bail!(ErrorInt::InvalidArgument(format!( + "bad Transport {}; \ + expected tcp or udp", + s + ))), + }) + } } impl SessionOptions { @@ -132,7 +181,17 @@ impl SessionOptions { /// Sends the given user agent string with each request. pub fn user_agent(mut self, user_agent: String) -> Self { - self.user_agent = user_agent; + self.user_agent = if user_agent.is_empty() { + None + } else { + Some(user_agent.into_boxed_str()) + }; + self + } + + /// Sets the underlying transport to use. + pub fn transport(mut self, transport: Transport) -> Self { + self.transport = transport; self } } @@ -186,7 +245,6 @@ pub struct Presentation { base_url: Url, pub control: Url, pub accept_dynamic_rate: bool, - sdp: SessionDescription, } /// Information about a stream offered within a presentation. @@ -230,9 +288,23 @@ pub struct Stream { /// says the server is allowed to omit it when there is only a single stream. pub control: Option, + /// The sockets for `Transport::Udp`. + sockets: Option, + state: StreamState, } +#[derive(Debug)] +struct UdpSockets { + local_ip: IpAddr, + local_rtp_port: u16, + remote_ip: IpAddr, + remote_rtp_port: u16, + rtp_socket: UdpSocket, + remote_rtcp_port: u16, + rtcp_socket: UdpSocket, +} + impl Stream { /// Returns the parameters for this stream. /// @@ -248,13 +320,13 @@ enum StreamState { /// Uninitialized; no `SETUP` has yet been sent. Uninit, - /// `SETUP` reply has been received. + /// `SETUP` response has been received. Init(StreamStateInit), - /// `PLAY` reply has been received. + /// `PLAY` response has been received. Playing { timeline: Timeline, - rtp_handler: rtp::StrictSequenceChecker, + rtp_handler: rtp::InorderParser, }, } @@ -294,22 +366,8 @@ pub struct Credentials { pub trait State {} /// Initial state after a `DESCRIBE`; use via `Session`. -/// One or more `SETUP`s may have also been issued, in which case a `session_id` -/// will be assigned. #[doc(hidden)] -pub struct Described { - presentation: Presentation, - session_id: Option, - - // Keep some information about the DESCRIBE response. If a depacketizer - // couldn't be constructed correctly for one or more streams, this will be - // used to create a RtspResponseError on `State::demuxed()`. - // We defer such errors from DESCRIBE time until then because they only - // matter if the stream is setup and the caller wants depacketization. - describe_ctx: RtspMessageContext, - describe_cseq: u32, - describe_status: rtsp_types::StatusCode, -} +pub struct Described(()); impl State for Described {} enum KeepaliveState { @@ -320,18 +378,7 @@ enum KeepaliveState { /// State after a `PLAY`; use via `Session`. #[doc(hidden)] -#[pin_project(project = PlayingProj)] -pub struct Playing { - presentation: Presentation, - session_id: String, - keepalive_state: KeepaliveState, - describe_ctx: RtspMessageContext, - describe_cseq: u32, - describe_status: rtsp_types::StatusCode, - - #[pin] - keepalive_timer: tokio::time::Sleep, -} +pub struct Playing(()); impl State for Playing {} /// The raw connection, without tracking session state. @@ -343,16 +390,52 @@ struct RtspConnection { next_cseq: u32, } +/// Mode to use in `RtspConnection::send` when looking for a response. +enum ResponseMode { + /// Anything but the response to this request is an error. + Normal, + + /// Silently discard data messages and responses to the given keepalive + /// while awaiting the response to this request. + Teardown { keepalive_cseq: Option }, +} + /// An RTSP session, or a connection that may be used in a proscriptive way. /// See discussion at [State]. +pub struct Session(Pin>, S); + #[pin_project] -pub struct Session { +struct SessionInner { + // TODO: allow this to be closed and reopened during a UDP session? conn: RtspConnection, + options: SessionOptions, requested_auth: Option, + presentation: Presentation, - #[pin] - state: S, + /// This will be set iff one or more `SETUP` calls have been issued. + /// This is sometimes true in state `Described` and always true in state + /// `Playing`. + session_id: Option>, + + // Keep some information about the DESCRIBE response. If a depacketizer + // couldn't be constructed correctly for one or more streams, this will be + // used to create a RtspResponseError on `State::demuxed()`. + // We defer such errors from DESCRIBE time until then because they only + // matter if the stream is setup and the caller wants depacketization. + describe_ctx: RtspMessageContext, + describe_cseq: u32, + describe_status: rtsp_types::StatusCode, + + /// The state of the keepalive request; only used in state `Playing`. + keepalive_state: KeepaliveState, + + keepalive_timer: Option>>, + + /// The index within `presentation.streams` to start the next poll at. + /// Round-robining between them rather than always starting at 0 should + /// prevent one stream from starving the others. + udp_next_poll_i: usize, } impl RtspConnection { @@ -391,6 +474,7 @@ impl RtspConnection { /// Takes care of authorization and `CSeq`. Returns `Error` if not successful. async fn send( &mut self, + mode: ResponseMode, options: &SessionOptions, requested_auth: &mut Option, req: &mut rtsp_types::Request, @@ -404,23 +488,42 @@ impl RtspConnection { let method: &str = req.method().into(); let (resp, msg_ctx) = loop { let msg = self.inner.next().await.unwrap_or_else(|| { - bail!(ErrorInt::ReadError { + bail!(ErrorInt::RtspReadError { conn_ctx: *self.inner.ctx(), msg_ctx: self.inner.eof_ctx(), source: std::io::Error::new( std::io::ErrorKind::UnexpectedEof, - format!("EOF while expecting reply to {} CSeq {}", method, cseq), + format!("EOF while expecting response to {} CSeq {}", method, cseq), ), }) })?; match msg.msg { - rtsp_types::Message::Response(r) if parse::get_cseq(&r) == Some(cseq) => { - break (r, msg.ctx) + rtsp_types::Message::Response(r) => { + if let Some(response_cseq) = parse::get_cseq(&r) { + if response_cseq == cseq { + break (r, msg.ctx); + } + if let ResponseMode::Teardown { + keepalive_cseq: Some(k), + } = mode + { + if response_cseq == k { + debug!("ignoring keepalive {} response during TEARDOWN", k); + continue; + } + } + } + } + rtsp_types::Message::Data(_) + if matches!(mode, ResponseMode::Teardown { .. }) => + { + debug!("ignoring RTSP data during TEARDOWN"); + continue; } rtsp_types::Message::Data(d) if options.ignore_spurious_data => { debug!( "ignoring interleaved data message on channel {} while waiting \ - for reply to {} CSeq {}", + for response to {} CSeq {}", d.channel_id(), method, cseq @@ -431,7 +534,7 @@ impl RtspConnection { conn_ctx: *self.inner.ctx(), msg_ctx: msg.ctx, description: format!( - "Expected reply to {} CSeq {}, got {:?}", + "Expected response to {} CSeq {}, got {:?}", method, cseq, o, ), }), @@ -550,8 +653,8 @@ impl RtspConnection { req.insert_header(rtsp_types::headers::AUTHORIZATION, authorization); } req.insert_header(rtsp_types::headers::CSEQ, cseq.to_string()); - if !options.user_agent.is_empty() { - req.insert_header(rtsp_types::headers::USER_AGENT, options.user_agent.clone()); + if let Some(ref u) = options.user_agent { + req.insert_header(rtsp_types::headers::USER_AGENT, u.to_string()); } Ok(cseq) } @@ -575,13 +678,19 @@ impl Session { options: SessionOptions, url: Url, ) -> Result { - let mut req = - rtsp_types::Request::builder(rtsp_types::Method::Describe, rtsp_types::Version::V1_0) - .header(rtsp_types::headers::ACCEPT, "application/sdp") - .request_uri(url.clone()) - .build(Bytes::new()); + let mut req = rtsp_types::Request::builder(Method::Describe, rtsp_types::Version::V1_0) + .header(rtsp_types::headers::ACCEPT, "application/sdp") + .request_uri(url.clone()) + .build(Bytes::new()); let mut requested_auth = None; - let (msg_ctx, cseq, response) = conn.send(&options, &mut requested_auth, &mut req).await?; + let (msg_ctx, cseq, response) = conn + .send( + ResponseMode::Normal, + &options, + &mut requested_auth, + &mut req, + ) + .await?; let presentation = parse::parse_describe(url, &response).map_err(|description| { wrap!(ErrorInt::RtspResponseError { conn_ctx: *conn.inner.ctx(), @@ -592,22 +701,26 @@ impl Session { description, }) })?; - Ok(Session { - conn, - options, - requested_auth, - state: Described { + Ok(Session( + Box::pin(SessionInner { + conn, + options, + requested_auth, presentation, session_id: None, describe_ctx: msg_ctx, describe_cseq: cseq, describe_status: response.status(), - }, - }) + keepalive_state: KeepaliveState::Idle, + keepalive_timer: None, + udp_next_poll_i: 0, + }), + Described(()), + )) } pub fn streams(&self) -> &[Stream] { - &self.state.presentation.streams + &self.0.presentation.streams } /// Sends a `SETUP` request for a stream. @@ -620,45 +733,76 @@ impl Session { /// /// Panics if `stream_i >= self.streams().len()`. pub async fn setup(&mut self, stream_i: usize) -> Result<(), Error> { - let stream = &mut self.state.presentation.streams[stream_i]; + let inner = &mut self.0.as_mut().project(); + let presentation = &mut inner.presentation; + let options = &inner.options; + let conn = &mut inner.conn; + let stream = &mut presentation.streams[stream_i]; if !matches!(stream.state, StreamState::Uninit) { bail!(ErrorInt::FailedPrecondition("stream already set up".into())); } - let proposed_channel_id = self.conn.channels.next_unassigned().ok_or_else(|| { - wrap!(ErrorInt::FailedPrecondition( - "no unassigned channels".into() - )) - })?; let url = stream .control .as_ref() - .unwrap_or(&self.state.presentation.control) + .unwrap_or(&presentation.control) .clone(); - let mut req = - rtsp_types::Request::builder(rtsp_types::Method::Setup, rtsp_types::Version::V1_0) - .request_uri(url) - .header( + let mut req = rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0) + .request_uri(url) + .header(crate::X_DYNAMIC_RATE.clone(), "1".to_owned()); + match options.transport { + Transport::Tcp => { + let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| { + wrap!(ErrorInt::FailedPrecondition( + "no unassigned channels".into() + )) + })?; + req = req.header( rtsp_types::headers::TRANSPORT, format!( "RTP/AVP/TCP;unicast;interleaved={}-{}", proposed_channel_id, proposed_channel_id + 1 ), - ) - .header(crate::X_DYNAMIC_RATE.clone(), "1".to_owned()); - if let Some(ref s) = self.state.session_id { - req = req.header(rtsp_types::headers::SESSION, s.clone()); + ); + } + Transport::Udp => { + // Bind an ephemeral UDP port on the same local address used to connect + // to the RTSP server. + let ip_addr = conn.inner.ctx().local_addr.ip(); + let pair = crate::tokio::UdpPair::for_ip(ip_addr) + .map_err(|e| wrap!(ErrorInt::Internal(e.into())))?; + stream.sockets = Some(UdpSockets { + local_ip: ip_addr, + local_rtp_port: pair.rtp_port, + remote_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), + remote_rtp_port: 0, + rtp_socket: pair.rtp_socket, + remote_rtcp_port: 0, + rtcp_socket: pair.rtcp_socket, + }); + req = req.header( + rtsp_types::headers::TRANSPORT, + format!( + "RTP/AVP/UDP;client_port={}-{}", + pair.rtp_port, + pair.rtp_port + 1, + ), + ); + } } - let (msg_ctx, cseq, response) = self - .conn + if let Some(ref s) = inner.session_id { + req = req.header(rtsp_types::headers::SESSION, s.to_string()); + } + let (msg_ctx, cseq, response) = conn .send( - &self.options, - &mut self.requested_auth, + ResponseMode::Normal, + &inner.options, + &mut inner.requested_auth, &mut req.build(Bytes::new()), ) .await?; debug!("SETUP response: {:#?}", &response); - let conn_ctx = self.conn.inner.ctx(); + let conn_ctx = conn.inner.ctx(); let status = response.status(); let response = parse::parse_setup(&response).map_err(|description| { wrap!(ErrorInt::RtspResponseError { @@ -670,10 +814,10 @@ impl Session { description, }) })?; - match self.state.session_id.as_ref() { - Some(old) if old != response.session_id => { + match inner.session_id.as_ref() { + Some(old) if old.as_ref() != response.session_id => { bail!(ErrorInt::RtspResponseError { - conn_ctx: *self.conn.inner.ctx(), + conn_ctx: *inner.conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Setup, cseq, @@ -685,22 +829,75 @@ impl Session { }); } Some(_) => {} - None => self.state.session_id = Some(response.session_id.to_owned()), + None => *inner.session_id = Some(response.session_id.into()), }; - let conn_ctx = self.conn.inner.ctx(); - self.conn - .channels - .assign(response.channel_id, stream_i) - .map_err(|description| { - wrap!(ErrorInt::RtspResponseError { - conn_ctx: *conn_ctx, - msg_ctx, - method: rtsp_types::Method::Setup, - cseq, - status, - description, - }) - })?; + let conn_ctx = conn.inner.ctx(); + match options.transport { + Transport::Tcp => { + let channel_id = match response.channel_id { + Some(id) => id, + None => bail!(ErrorInt::RtspResponseError { + conn_ctx: *inner.conn.inner.ctx(), + msg_ctx, + method: rtsp_types::Method::Setup, + cseq, + status, + description: "Transport header is missing interleaved parameter".to_owned(), + }), + }; + conn.channels + .assign(channel_id, stream_i) + .map_err(|description| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *conn_ctx, + msg_ctx, + method: rtsp_types::Method::Setup, + cseq, + status, + description, + }) + })?; + } + Transport::Udp => { + // TODO: RFC 2326 section 12.39 says "If the source address for + // the stream is different than can be derived from the RTSP + // endpoint address (the server in playback or the client in + // recording), the source MAY be specified." Not MUST, + // unfortunately. But let's see if we can get away with this + // for now. + let source = match response.source { + Some(s) => s, + None => conn.inner.ctx().peer_addr.ip(), + }; + let server_port = response.server_port.ok_or_else(|| { + wrap!(ErrorInt::RtspResponseError { + conn_ctx: *conn_ctx, + msg_ctx, + method: rtsp_types::Method::Setup, + cseq, + status, + description: "Transport header is missing server_port parameter".to_owned(), + }) + })?; + let udp_sockets = stream.sockets.as_mut().unwrap(); + udp_sockets.remote_ip = source; + udp_sockets.remote_rtp_port = server_port.0; + udp_sockets.remote_rtcp_port = server_port.1; + udp_sockets + .rtp_socket + .connect(SocketAddr::new(source, udp_sockets.remote_rtp_port)) + .await + .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; + udp_sockets + .rtcp_socket + .connect(SocketAddr::new(source, udp_sockets.remote_rtcp_port)) + .await + .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; + punch_firewall_hole(&udp_sockets.rtp_socket, &udp_sockets.rtcp_socket) + .await + .map_err(|e| wrap!(ErrorInt::ConnectError(e)))?; + } + } stream.state = StreamState::Init(StreamStateInit { ssrc: response.ssrc, initial_seq: None, @@ -714,30 +911,29 @@ impl Session { /// The presentation must support aggregate control, as defined in [RFC 2326 /// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3). pub async fn play(mut self, policy: PlayOptions) -> Result, Error> { - let session_id = self.state.session_id.take().ok_or_else(|| { + let inner = self.0.as_mut().project(); + let session_id = inner.session_id.as_deref().ok_or_else(|| { wrap!(ErrorInt::FailedPrecondition( "must SETUP before PLAY".into() )) })?; - trace!("PLAY with channel mappings: {:#?}", &self.conn.channels); - let (msg_ctx, cseq, response) = self + trace!("PLAY with channel mappings: {:#?}", &inner.conn.channels); + let (msg_ctx, cseq, response) = inner .conn .send( - &self.options, - &mut self.requested_auth, - &mut rtsp_types::Request::builder( - rtsp_types::Method::Play, - rtsp_types::Version::V1_0, - ) - .request_uri(self.state.presentation.control.clone()) - .header(rtsp_types::headers::SESSION, session_id.clone()) - .header(rtsp_types::headers::RANGE, "npt=0.000-".to_owned()) - .build(Bytes::new()), + ResponseMode::Normal, + &inner.options, + inner.requested_auth, + &mut rtsp_types::Request::builder(Method::Play, rtsp_types::Version::V1_0) + .request_uri(inner.presentation.control.clone()) + .header(rtsp_types::headers::SESSION, session_id.clone()) + .header(rtsp_types::headers::RANGE, "npt=0.000-".to_owned()) + .build(Bytes::new()), ) .await?; - parse::parse_play(&response, &mut self.state.presentation).map_err(|description| { + parse::parse_play(&response, inner.presentation).map_err(|description| { wrap!(ErrorInt::RtspResponseError { - conn_ctx: *self.conn.inner.ctx(), + conn_ctx: *inner.conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Play, cseq, @@ -747,29 +943,23 @@ impl Session { })?; // Count how many streams have been setup (not how many are in the presentation). - let setup_streams = self - .state + let setup_streams = inner .presentation .streams .iter() .filter(|s| matches!(s.state, StreamState::Init(_))) .count(); - let all_have_time = self - .state - .presentation - .streams - .iter() - .all(|s| match s.state { - StreamState::Init(StreamStateInit { - initial_rtptime, .. - }) => initial_rtptime.is_some(), - _ => true, - }); + let all_have_time = inner.presentation.streams.iter().all(|s| match s.state { + StreamState::Init(StreamStateInit { + initial_rtptime, .. + }) => initial_rtptime.is_some(), + _ => true, + }); // Move all streams that have been set up from Init to Playing state. Check that required // parameters are present while doing so. - for (i, s) in self.state.presentation.streams.iter_mut().enumerate() { + for (i, s) in inner.presentation.streams.iter_mut().enumerate() { match s.state { StreamState::Init(StreamStateInit { initial_rtptime, @@ -783,7 +973,7 @@ impl Session { { if initial_rtptime.is_none() { bail!(ErrorInt::RtspResponseError { - conn_ctx: *self.conn.inner.ctx(), + conn_ctx: *inner.conn.inner.ctx(), msg_ctx, method: rtsp_types::Method::Play, cseq, @@ -812,7 +1002,7 @@ impl Session { } o => o, }; - let conn_ctx = self.conn.inner.ctx(); + let conn_ctx = inner.conn.inner.ctx(); s.state = StreamState::Playing { timeline: Timeline::new( initial_rtptime, @@ -829,30 +1019,52 @@ impl Session { description, }) })?, - rtp_handler: rtp::StrictSequenceChecker::new(ssrc, initial_seq), + rtp_handler: rtp::InorderParser::new(ssrc, initial_seq), }; } StreamState::Uninit => {} StreamState::Playing { .. } => unreachable!(), }; } - Ok(Session { - conn: self.conn, - options: self.options, - requested_auth: self.requested_auth, - state: Playing { - presentation: self.state.presentation, - session_id, - keepalive_state: KeepaliveState::Idle, - keepalive_timer: tokio::time::sleep(KEEPALIVE_DURATION), - describe_ctx: self.state.describe_ctx, - describe_cseq: self.state.describe_cseq, - describe_status: self.state.describe_status, - }, - }) + *inner.keepalive_timer = Some(Box::pin(tokio::time::sleep(KEEPALIVE_DURATION))); + Ok(Session(self.0, Playing(()))) } } +/// Sends dummy RTP and RTCP packets to punch a hole in connection-tracking +/// firewalls. +/// +/// This is useful when the client is on the protected side of the firewall and +/// the server isn't. The server should discard these dummy packets, but they +/// prompt the firewall to add the appropriate connection tracking state for +/// server->client packets to make it through. +/// +/// Note this is insufficient for NAT traversal; the NAT firewall must be +/// RTSP-aware to rewrite the Transport header's client_ports. +async fn punch_firewall_hole( + rtp_socket: &UdpSocket, + rtcp_socket: &UdpSocket, +) -> Result<(), std::io::Error> { + #[rustfmt::skip] + const DUMMY_RTP: [u8; 12] = [ + 2 << 6, // version=2 + p=0 + x=0 + cc=0 + 0, // m=0 + pt=0 + 0, 0, // sequence number=0 + 0, 0, 0, 0, // timestamp=0 0, + 0, 0, 0, 0, // ssrc=0 + ]; + #[rustfmt::skip] + const DUMMY_RTCP: [u8; 8] = [ + 2 << 6, // version=2 + p=0 + rc=0 + 200, // pt=200 (reception report) + 0, 1, // length=1 (in 4-byte words minus 1) + 0, 0, 0, 0, // ssrc=0 (bogus but we don't know the ssrc reliably yet) + ]; + rtp_socket.send(&DUMMY_RTP[..]).await?; + rtcp_socket.send(&DUMMY_RTCP[..]).await?; + Ok(()) +} + #[derive(Debug)] pub enum PacketItem { RtpPacket(rtp::Packet), @@ -864,15 +1076,16 @@ impl Session { /// /// Fails if a stream that has been setup can't be depacketized. pub fn demuxed(mut self) -> Result { - for s in &mut self.state.presentation.streams { + let inner = self.0.as_mut().project(); + for s in &mut inner.presentation.streams { if matches!(s.state, StreamState::Playing { .. }) { if let Err(ref description) = s.depacketizer { bail!(ErrorInt::RtspResponseError { - conn_ctx: *self.conn.inner.ctx(), - msg_ctx: self.state.describe_ctx, + conn_ctx: *inner.conn.inner.ctx(), + msg_ctx: *inner.describe_ctx, method: rtsp_types::Method::Describe, - cseq: self.state.describe_cseq, - status: self.state.describe_status, + cseq: *inner.describe_cseq, + status: *inner.describe_status, description: description.clone(), }); } @@ -884,18 +1097,61 @@ impl Session { }) } + /// Tears down the session. + /// + /// This attempts to send a `TEARDOWN` request to end the session: + /// * When using UDP, this signals the server to end the data stream + /// promptly rather than wait for timeout. + /// * Even when using TCP, some old versions of `live555` servers will + /// incorrectly keep trying to send packets to this connection, burning + /// CPU until timeout, and potentially sending packets to a freshly + /// opened connection that happens to claim the same file descriptor + /// number. + /// + /// Discards any RTSP interleaved data messages received on the socket + /// while waiting for `TEARDOWN` response. + /// + /// Currently closes the socket(s) immediately after `TEARDOWN` response, + /// even on failure. This behavior may change in a future release. + pub async fn teardown(mut self) -> Result<(), Error> { + let inner = self.0.as_mut().project(); + let mut req = rtsp_types::Request::builder(Method::Teardown, rtsp_types::Version::V1_0) + .request_uri(inner.presentation.base_url.clone()) + .header( + rtsp_types::headers::SESSION, + inner.session_id.as_deref().unwrap().to_string(), + ) + .build(Bytes::new()); + let keepalive_cseq = match inner.keepalive_state { + KeepaliveState::Idle => None, + KeepaliveState::Flushing(cseq) => Some(*cseq), + KeepaliveState::Waiting(cseq) => Some(*cseq), + }; + inner + .conn + .send( + ResponseMode::Teardown { keepalive_cseq }, + inner.options, + inner.requested_auth, + &mut req, + ) + .await?; + Ok(()) + } + pub fn streams(&self) -> &[Stream] { - &self.state.presentation.streams + &self.0.presentation.streams } fn handle_keepalive_timer( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Result<(), Error> { + let inner = self.0.as_mut().project(); // Expect the previous keepalive request to have finished. - match self.state.keepalive_state { + match inner.keepalive_state { KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError { - conn_ctx: *self.conn.inner.ctx(), + conn_ctx: *inner.conn.inner.ctx(), source: std::io::Error::new( std::io::ErrorKind::TimedOut, format!( @@ -904,9 +1160,9 @@ impl Session { ), ), }), - KeepaliveState::Waiting(cseq) => bail!(ErrorInt::ReadError { - conn_ctx: *self.conn.inner.ctx(), - msg_ctx: self.conn.inner.eof_ctx(), + KeepaliveState::Waiting(cseq) => bail!(ErrorInt::RtspReadError { + conn_ctx: *inner.conn.inner.ctx(), + msg_ctx: inner.conn.inner.eof_ctx(), source: std::io::Error::new( std::io::ErrorKind::TimedOut, format!( @@ -920,9 +1176,7 @@ impl Session { // Currently the only outbound data should be keepalives, and the previous one // has already been flushed, so there's no reason the Sink shouldn't be ready. - let mut this = self.project(); - let state = this.state.project(); - if matches!(this.conn.inner.poll_ready_unpin(cx), Poll::Pending) { + if matches!(inner.conn.inner.poll_ready_unpin(cx), Poll::Pending) { bail!(ErrorInt::Internal( "Unexpectedly not ready to send keepalive".into() )); @@ -931,64 +1185,70 @@ impl Session { // Send a new one and reset the timer. // Use a SET_PARAMETER with no body for keepalives, as recommended in the // ONVIF Streaming Specification version version 21.06 section 5.2.2.2. - let mut req = rtsp_types::Request::builder( - rtsp_types::Method::SetParameter, - rtsp_types::Version::V1_0, - ) - .request_uri(state.presentation.base_url.clone()) - .header(rtsp_types::headers::SESSION, state.session_id.clone()) - .build(Bytes::new()); - let cseq = this + let session_id = inner.session_id.as_deref().unwrap(); + let mut req = rtsp_types::Request::builder(Method::SetParameter, rtsp_types::Version::V1_0) + .request_uri(inner.presentation.base_url.clone()) + .header(rtsp_types::headers::SESSION, session_id.to_string()) + .build(Bytes::new()); + let cseq = inner + .conn + .fill_req(inner.options, inner.requested_auth, &mut req)?; + inner .conn - .fill_req(&this.options, &mut this.requested_auth, &mut req)?; - this.conn .inner .start_send_unpin(rtsp_types::Message::Request(req)) .expect("encoding is infallible"); - *state.keepalive_state = match this.conn.inner.poll_flush_unpin(cx) { + *inner.keepalive_state = match inner.conn.inner.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq), Poll::Ready(Err(e)) => bail!(e), Poll::Pending => KeepaliveState::Flushing(cseq), }; - state + inner .keepalive_timer + .as_mut() + .expect("keepalive timer set in state Playing") + .as_mut() .reset(tokio::time::Instant::now() + KEEPALIVE_DURATION); Ok(()) } fn handle_response( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, msg_ctx: &crate::RtspMessageContext, response: rtsp_types::Response, ) -> Result<(), Error> { - let this = self.project(); - let state = this.state.project(); - if matches!(*state.keepalive_state, - KeepaliveState::Waiting(cseq) if parse::get_cseq(&response) == Some(cseq)) + let inner = self.0.as_mut().project(); + if matches!(inner.keepalive_state, + KeepaliveState::Waiting(cseq) if parse::get_cseq(&response) == Some(*cseq)) { // We don't care if the keepalive response succeeds or fails. Just mark complete. - *state.keepalive_state = KeepaliveState::Idle; + *inner.keepalive_state = KeepaliveState::Idle; return Ok(()); } // The only response we expect in this state is to our keepalive request. bail!(ErrorInt::RtspFramingError { - conn_ctx: *this.conn.inner.ctx(), + conn_ctx: *inner.conn.inner.ctx(), msg_ctx: *msg_ctx, description: format!("Unexpected RTSP response {:#?}", response), }) } fn handle_data( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, msg_ctx: &RtspMessageContext, data: rtsp_types::Data, ) -> Result, Error> { + let inner = self.0.as_mut().project(); let channel_id = data.channel_id(); - let m = match self.conn.channels.lookup(channel_id) { + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp { + msg_ctx: *msg_ctx, + channel_id, + }); + let m = match inner.conn.channels.lookup(channel_id) { Some(m) => m, - None if self.options.ignore_spurious_data => { + None if inner.options.ignore_spurious_data => { log::debug!( "Ignoring interleaved data on unassigned channel id {}", channel_id @@ -996,14 +1256,12 @@ impl Session { return Ok(None); } None => bail!(ErrorInt::RtspUnassignedChannelError { - conn_ctx: *self.conn.inner.ctx(), + conn_ctx: *inner.conn.inner.ctx(), msg_ctx: *msg_ctx, channel_id, }), }; - let this = self.project(); - let state = this.state.project(); - let stream = &mut state.presentation.streams[m.stream_i]; + let stream = &mut inner.presentation.streams[m.stream_i]; let (mut timeline, rtp_handler) = match &mut stream.state { StreamState::Playing { timeline, @@ -1016,32 +1274,152 @@ impl Session { }; match m.channel_type { ChannelType::Rtp => Ok(rtp_handler.rtp( - &this.options, - this.conn.inner.ctx(), - msg_ctx, + &inner.options, + inner.conn.inner.ctx(), + &pkt_ctx, &mut timeline, - channel_id, m.stream_i, data.into_body(), )?), ChannelType::Rtcp => match rtp_handler.rtcp( - &this.options, - msg_ctx, + &inner.options, + &pkt_ctx, &mut timeline, m.stream_i, data.into_body(), ) { Ok(p) => Ok(p), - Err(description) => Err(wrap!(ErrorInt::RtspDataMessageError { - conn_ctx: *this.conn.inner.ctx(), - msg_ctx: *msg_ctx, - channel_id, + Err(description) => Err(wrap!(ErrorInt::PacketError { + conn_ctx: *inner.conn.inner.ctx(), + pkt_ctx: pkt_ctx, stream_id: m.stream_i, description, })), }, } } + + /// Polls a single UDP stream, `inner.presentation.streams[i]`. + /// Assumes `buf` is cleared and large enough for any UDP packet. + fn poll_udp_stream( + &mut self, + cx: &mut std::task::Context, + buf: &mut tokio::io::ReadBuf, + i: usize, + ) -> Poll>> { + debug_assert!(buf.filled().is_empty()); + let inner = self.0.as_mut().project(); + let s = &mut inner.presentation.streams[i]; + if let Some(sockets) = &mut s.sockets { + let (mut timeline, rtp_handler) = match &mut s.state { + StreamState::Playing { + timeline, + rtp_handler, + } => (timeline, rtp_handler), + _ => unreachable!("Session's {}->{:?} not in Playing state", i, s), + }; + // Prioritize RTCP over RTP within a stream. + if let Poll::Ready(r) = sockets.rtcp_socket.poll_recv(cx, buf) { + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { + local_addr: SocketAddr::new(sockets.local_ip, sockets.local_rtp_port + 1), + peer_addr: SocketAddr::new(sockets.remote_ip, sockets.remote_rtcp_port), + received_wall: crate::WallTime::now(), + received: Instant::now(), + }); + match r { + Ok(()) => { + let msg = Bytes::copy_from_slice(buf.filled()); + match rtp_handler.rtcp(&inner.options, &pkt_ctx, &mut timeline, i, msg) { + Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), + Ok(None) => buf.clear(), + Err(description) => { + return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError { + conn_ctx: *inner.conn.inner.ctx(), + pkt_ctx, + stream_id: i, + description, + })))) + } + } + } + Err(source) => { + return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { + conn_ctx: *inner.conn.inner.ctx(), + pkt_ctx, + source, + })))) + } + } + } + if let Poll::Ready(r) = sockets.rtp_socket.poll_recv(cx, buf) { + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { + local_addr: SocketAddr::new(sockets.local_ip, sockets.local_rtp_port), + peer_addr: SocketAddr::new(sockets.remote_ip, sockets.remote_rtp_port), + received_wall: crate::WallTime::now(), + received: Instant::now(), + }); + match r { + Ok(()) => { + let msg = Bytes::copy_from_slice(buf.filled()); + match rtp_handler.rtp( + &inner.options, + inner.conn.inner.ctx(), + &pkt_ctx, + &mut timeline, + i, + msg, + ) { + Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), + Ok(None) => buf.clear(), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + Err(source) => { + return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError { + conn_ctx: *inner.conn.inner.ctx(), + pkt_ctx, + source, + })))) + } + } + } + } + Poll::Pending + } + + /// Polls all UDP streams, round-robining between them to avoid starvation. + fn poll_udp(&mut self, cx: &mut std::task::Context) -> Poll>> { + // For now, create a buffer on the stack large enough for any UDP packet, then + // copy into a fresh allocation if it's actually used. + // TODO: a ring buffer would be better: see + // . + + // SAFETY: this exactly matches an example in the documentation: + // . + let mut buf: [MaybeUninit; 65_536] = unsafe { MaybeUninit::uninit().assume_init() }; + let mut buf = tokio::io::ReadBuf::uninit(&mut buf); + + // Assume 0 <= inner.udp_next_poll_i < inner.presentation.streams.len(). + // play() would have failed if there were no (setup) streams. + let starting_i = *self.0.as_mut().project().udp_next_poll_i; + loop { + let inner = self.0.as_mut().project(); + let i = *inner.udp_next_poll_i; + *inner.udp_next_poll_i += 1; + if *inner.udp_next_poll_i == inner.presentation.streams.len() { + *inner.udp_next_poll_i = 0; + } + + if let Poll::Ready(r) = self.poll_udp_stream(cx, &mut buf, i) { + return Poll::Ready(r); + } + + if *self.0.as_mut().project().udp_next_poll_i == starting_i { + break; + } + } + Poll::Pending + } } impl futures::Stream for Session { @@ -1052,9 +1430,10 @@ impl futures::Stream for Session { cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { loop { - // First try receiving data. Let this starve keepalive handling; if we can't keep up, - // the server should probably drop us. - match Pin::new(&mut self.as_mut().project().conn.inner).poll_next(cx) { + // First try receiving data on the RTSP connection. Let this starve + // sending keepalives; if we can't keep up, the server should + // probably drop us. + match Pin::new(&mut self.0.conn.inner).poll_next(cx) { Poll::Ready(Some(Ok(msg))) => match msg.msg { rtsp_types::Message::Data(data) => { match self.as_mut().handle_data(&msg.ctx, data) { @@ -1079,19 +1458,26 @@ impl futures::Stream for Session { std::task::Poll::Pending => {} } + // Next try receiving data on the UDP sockets, if any. + if matches!(self.0.options.transport, Transport::Udp) { + if let Poll::Ready(result) = self.as_mut().poll_udp(cx) { + return Poll::Ready(result); + } + } + // Then check if it's time for a new keepalive. - let this = self.as_mut().project(); - let state = this.state.project(); - if matches!(state.keepalive_timer.poll(cx), Poll::Ready(())) { + if matches!( + self.0.keepalive_timer.as_mut().unwrap().as_mut().poll(cx), + Poll::Ready(()) + ) { + log::debug!("time for a keepalive"); self.as_mut().handle_keepalive_timer(cx)?; } // Then finish flushing the current keepalive if necessary. - let this = self.as_mut().project(); - let state = this.state.project(); - if let KeepaliveState::Flushing(cseq) = state.keepalive_state { - match this.conn.inner.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => *state.keepalive_state = KeepaliveState::Waiting(*cseq), + if let KeepaliveState::Flushing(cseq) = self.0.keepalive_state { + match self.0.conn.inner.poll_flush_unpin(cx) { + Poll::Ready(Ok(())) => self.0.keepalive_state = KeepaliveState::Waiting(cseq), Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Box::new(e))))), Poll::Pending => {} } @@ -1110,24 +1496,28 @@ enum DemuxedState { } /// Wrapper returned by [`Session::demuxed`] which demuxes/depacketizes into frames. -#[pin_project] pub struct Demuxed { state: DemuxedState, - #[pin] session: Session, } +impl Demuxed { + /// Tears down the session; see [`Session::teardown`]. + pub async fn teardown(self) -> Result<(), Error> { + self.session.teardown().await + } +} + impl futures::Stream for Demuxed { type Item = Result; fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let mut this = self.project(); loop { - let (stream_id, pkt) = match this.state { - DemuxedState::Waiting => match ready!(this.session.as_mut().poll_next(cx)) { + let (stream_id, pkt) = match self.state { + DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) { Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id, Some(p)), Some(Ok(PacketItem::SenderReport(p))) => { return Poll::Ready(Some(Ok(CodecItem::SenderReport(p)))) @@ -1135,27 +1525,24 @@ impl futures::Stream for Demuxed { Some(Err(e)) => return Poll::Ready(Some(Err(e))), None => return Poll::Ready(None), }, - DemuxedState::Pulling(stream_id) => (*stream_id, None), + DemuxedState::Pulling(stream_id) => (stream_id, None), DemuxedState::Fused => return Poll::Ready(None), }; - let session = this.session.as_mut().project(); - let playing = session.state.project(); - let depacketizer = match &mut playing.presentation.streams[stream_id].depacketizer { + let inner = self.session.0.as_mut().project(); + let depacketizer = match &mut inner.presentation.streams[stream_id].depacketizer { Ok(d) => d, Err(_) => unreachable!("depacketizer was Ok"), }; - let conn_ctx = session.conn.inner.ctx(); + let conn_ctx = inner.conn.inner.ctx(); if let Some(p) = pkt { - let msg_ctx = p.ctx; - let channel_id = p.channel_id; + let pkt_ctx = p.ctx; let stream_id = p.stream_id; let ssrc = p.ssrc; let sequence_number = p.sequence_number; depacketizer.push(p).map_err(|description| { wrap!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, - msg_ctx, - channel_id, + pkt_ctx, stream_id, ssrc, sequence_number, @@ -1165,15 +1552,15 @@ impl futures::Stream for Demuxed { } match depacketizer.pull(conn_ctx) { Ok(Some(item)) => { - *this.state = DemuxedState::Pulling(stream_id); + self.state = DemuxedState::Pulling(stream_id); return Poll::Ready(Some(Ok(item))); } Ok(None) => { - *this.state = DemuxedState::Waiting; + self.state = DemuxedState::Waiting; continue; } Err(e) => { - *this.state = DemuxedState::Fused; + self.state = DemuxedState::Fused; return Poll::Ready(Some(Err(e))); } } @@ -1209,8 +1596,8 @@ mod tests { (client, server) } - /// Receives a request and sends a reply, filling in the matching `CSeq`. - async fn req_reply( + /// Receives a request and sends a response, filling in the matching `CSeq`. + async fn req_response( server: &mut crate::tokio::Connection, expected_method: rtsp_types::Method, mut response: rtsp_types::Response, @@ -1239,7 +1626,7 @@ mod tests { // DESCRIBE. let (session, _) = tokio::join!( Session::describe_with_conn(conn, SessionOptions::default(), url), - req_reply( + req_response( &mut server, rtsp_types::Method::Describe, response(include_bytes!("testdata/reolink_describe.txt")) @@ -1253,7 +1640,7 @@ mod tests { async { session.setup(0).await.unwrap(); }, - req_reply( + req_response( &mut server, rtsp_types::Method::Setup, response(include_bytes!("testdata/reolink_setup.txt")) @@ -1263,7 +1650,7 @@ mod tests { // PLAY. let (session, _) = tokio::join!( session.play(PlayOptions::default()), - req_reply( + req_response( &mut server, rtsp_types::Method::Play, response(include_bytes!("testdata/reolink_play.txt")) @@ -1323,7 +1710,7 @@ mod tests { let options = SessionOptions::default().ignore_spurious_data(true); let (session, _) = tokio::join!(Session::describe_with_conn(conn, options, url), async { server.send(bogus_pkt.clone()).await.unwrap(); - req_reply( + req_response( &mut server, rtsp_types::Method::Describe, response(include_bytes!("testdata/reolink_describe.txt")), @@ -1338,7 +1725,7 @@ mod tests { async { session.setup(0).await.unwrap(); }, - req_reply( + req_response( &mut server, rtsp_types::Method::Setup, response(include_bytes!("testdata/reolink_setup.txt")) @@ -1348,7 +1735,7 @@ mod tests { // PLAY. let (session, _) = tokio::join!( session.play(PlayOptions::default()), - req_reply( + req_response( &mut server, rtsp_types::Method::Play, response(include_bytes!("testdata/reolink_play.txt")) @@ -1397,15 +1784,17 @@ mod tests { #[test] fn print_sizes() { for (name, size) in &[ + ("PacketItem", std::mem::size_of::()), + ("Presentation", std::mem::size_of::()), ("RtspConnection", std::mem::size_of::()), ( - "Session", - std::mem::size_of::>(), + "Session", + std::mem::size_of::>(), // is the same size. ), - ("Session", std::mem::size_of::>()), + ("SessionInner", std::mem::size_of::()), + ("SessionOptions", std::mem::size_of::()), ("Demuxed", std::mem::size_of::()), ("Stream", std::mem::size_of::()), - ("PacketItem", std::mem::size_of::()), ("rtp::Packet", std::mem::size_of::()), ( "rtp::SenderReport", diff --git a/src/client/parse.rs b/src/client/parse.rs index ffb59e1..2ad1a9e 100644 --- a/src/client/parse.rs +++ b/src/client/parse.rs @@ -5,7 +5,7 @@ use bytes::{Buf, Bytes}; use log::debug; use pretty_hex::PrettyHex; use sdp::media_description::MediaDescription; -use std::{convert::TryFrom, num::NonZeroU16}; +use std::{convert::TryFrom, net::IpAddr, num::NonZeroU16}; use url::Url; use super::{Presentation, Stream}; @@ -365,6 +365,7 @@ fn parse_media(base_url: &Url, media_description: &MediaDescription) -> Result { pub(crate) session_id: &'a str, pub(crate) ssrc: Option, - pub(crate) channel_id: u8, + pub(crate) channel_id: Option, + pub(crate) source: Option, + pub(crate) server_port: Option<(u16, u16)>, } /// Parses a `SETUP` response. @@ -473,6 +475,8 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response) -> Result) -> Result) -> std::fmt::Result { f.debug_struct("Packet") .field("ctx", &self.ctx) - .field("channel_id", &self.channel_id) .field("stream_id", &self.stream_id) .field("timestamp", &self.timestamp) .field("ssrc", &self.ssrc) @@ -53,7 +51,7 @@ impl std::fmt::Debug for Packet { #[derive(Debug)] pub struct SenderReport { pub stream_id: usize, - pub ctx: crate::RtspMessageContext, + pub ctx: crate::PacketContext, pub timestamp: crate::Timestamp, pub ntp_timestamp: crate::NtpTimestamp, } @@ -61,13 +59,16 @@ pub struct SenderReport { /// RTP/RTCP demarshaller which ensures packets have the correct SSRC and /// monotonically increasing SEQ. Unstable; exposed for benchmark. /// -/// This reports packet loss (via [Packet::loss]) but doesn't prohibit it, except for losses +/// When using UDP, skips and logs out-of-order packets. When using TCP, +/// fails on them. +/// +/// This reports packet loss (via [Packet::loss]) but doesn't prohibit it /// of more than `i16::MAX` which would be indistinguishable from non-monotonic sequence numbers. /// Servers sometimes drop packets internally even when sending data via TCP. /// /// At least [one camera](https://github.com/scottlamb/moonfire-nvr/wiki/Cameras:-Reolink#reolink-rlc-410-hardware-version-ipc_3816m) /// sometimes sends data from old RTSP sessions over new ones. This seems like a -/// serious bug, and currently `StrictSequenceChecker` will error in this case, +/// serious bug, and currently `InorderRtpParser` will error in this case, /// although it'd be possible to discard the incorrect SSRC instead. /// /// [RFC 3550 section 8.2](https://tools.ietf.org/html/rfc3550#section-8.2) says that SSRC @@ -75,12 +76,12 @@ pub struct SenderReport { /// not sure it will ever come up with IP cameras. #[doc(hidden)] #[derive(Debug)] -pub struct StrictSequenceChecker { +pub struct InorderParser { ssrc: Option, next_seq: Option, } -impl StrictSequenceChecker { +impl InorderParser { pub fn new(ssrc: Option, next_seq: Option) -> Self { Self { ssrc, next_seq } } @@ -89,9 +90,8 @@ impl StrictSequenceChecker { &mut self, session_options: &super::SessionOptions, conn_ctx: &crate::ConnectionContext, - msg_ctx: &crate::RtspMessageContext, + pkt_ctx: &crate::PacketContext, timeline: &mut super::Timeline, - channel_id: u8, stream_id: usize, mut data: Bytes, ) -> Result, Error> { @@ -109,10 +109,9 @@ impl StrictSequenceChecker { } let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| { - wrap!(ErrorInt::RtspDataMessageError { + wrap!(ErrorInt::PacketError { conn_ctx: *conn_ctx, - msg_ctx: *msg_ctx, - channel_id, + pkt_ctx: *pkt_ctx, stream_id, description: format!( "corrupt RTP header while expecting seq={:04x?}: {:?}\n{:#?}", @@ -139,8 +138,7 @@ impl StrictSequenceChecker { } else { bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, - msg_ctx: *msg_ctx, - channel_id, + pkt_ctx: *pkt_ctx, stream_id, ssrc, sequence_number, @@ -152,25 +150,32 @@ impl StrictSequenceChecker { } } if loss > 0x80_00 { - bail!(ErrorInt::RtpPacketError { - conn_ctx: *conn_ctx, - msg_ctx: *msg_ctx, - channel_id, - stream_id, - ssrc, - sequence_number, - description: format!( - "Out-of-order packet or large loss; expecting ssrc={:08x?} seq={:04x?}", - self.ssrc, self.next_seq - ), - }); + if matches!(session_options.transport, super::Transport::Tcp) { + bail!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + pkt_ctx: *pkt_ctx, + stream_id, + ssrc, + sequence_number, + description: format!( + "Out-of-order packet or large loss; expecting ssrc={:08x?} seq={:04x?}", + self.ssrc, self.next_seq + ), + }); + } else { + log::info!( + "Skipping out-of-order seq={:04x} when expecting ssrc={:08x?} seq={:04x?}", + sequence_number, + self.ssrc, + self.next_seq + ); + } } let timestamp = match timeline.advance_to(reader.timestamp()) { Ok(ts) => ts, Err(description) => bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, - msg_ctx: *msg_ctx, - channel_id, + pkt_ctx: *pkt_ctx, stream_id, ssrc, sequence_number, @@ -182,8 +187,7 @@ impl StrictSequenceChecker { let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| { wrap!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, - msg_ctx: *msg_ctx, - channel_id, + pkt_ctx: *pkt_ctx, stream_id, ssrc, sequence_number, @@ -194,8 +198,7 @@ impl StrictSequenceChecker { data.advance(payload_range.start); self.next_seq = Some(sequence_number.wrapping_add(1)); Ok(Some(PacketItem::RtpPacket(Packet { - ctx: *msg_ctx, - channel_id, + ctx: *pkt_ctx, stream_id, timestamp, ssrc, @@ -209,7 +212,7 @@ impl StrictSequenceChecker { pub fn rtcp( &mut self, session_options: &super::SessionOptions, - msg_ctx: &crate::RtspMessageContext, + pkt_ctx: &crate::PacketContext, timeline: &mut super::Timeline, stream_id: usize, data: Bytes, @@ -254,12 +257,12 @@ impl StrictSequenceChecker { sr = Some(SenderReport { stream_id, - ctx: *msg_ctx, + ctx: *pkt_ctx, timestamp, ntp_timestamp: pkt.ntp_timestamp(), }); } - crate::rtcp::Packet::Unknown(pkt) => debug!("rtcp: {:?}", pkt.payload_type()), + crate::rtcp::Packet::Unknown(pkt) => debug!("rtcp: pt {:?}", pkt.payload_type()), } i += 1; } diff --git a/src/codec/aac.rs b/src/codec/aac.rs index 14d8280..0d5ca24 100644 --- a/src/codec/aac.rs +++ b/src/codec/aac.rs @@ -427,7 +427,7 @@ pub(crate) struct Depacketizer { #[derive(Debug)] struct Aggregate { - ctx: crate::RtspMessageContext, + ctx: crate::PacketContext, /// RTP packets lost before the next frame in this aggregate. Includes old /// loss that caused a previous fragment to be too short. @@ -440,7 +440,6 @@ struct Aggregate { /// to be too short. loss_since_mark: bool, - channel_id: u8, stream_id: usize, ssrc: u32, sequence_number: u16, @@ -606,7 +605,6 @@ impl Depacketizer { ctx: pkt.ctx, loss: *prev_loss + pkt.loss, loss_since_mark: pkt.loss > 0, - channel_id: pkt.channel_id, stream_id: pkt.stream_id, ssrc: pkt.ssrc, sequence_number: pkt.sequence_number, @@ -726,8 +724,7 @@ impl Depacketizer { fn error(conn_ctx: ConnectionContext, agg: Aggregate, description: String) -> Error { Error(Box::new(ErrorInt::RtpPacketError { conn_ctx, - msg_ctx: agg.ctx, - channel_id: agg.channel_id, + pkt_ctx: agg.ctx, stream_id: agg.stream_id, ssrc: agg.ssrc, sequence_number: agg.sequence_number, diff --git a/src/codec/h264.rs b/src/codec/h264.rs index fc2379a..5ec3e24 100644 --- a/src/codec/h264.rs +++ b/src/codec/h264.rs @@ -56,8 +56,8 @@ struct Nal { /// An access unit that is currently being accumulated during `PreMark` state. #[derive(Debug)] struct AccessUnit { - start_ctx: crate::RtspMessageContext, - end_ctx: crate::RtspMessageContext, + start_ctx: crate::PacketContext, + end_ctx: crate::PacketContext, timestamp: crate::Timestamp, stream_id: usize, @@ -846,8 +846,7 @@ impl Packetizer { }; // TODO: ctx, channel_id, and ssrc are placeholders. return Ok(Some(Packet { - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: self.stream_id, timestamp, ssrc: 0, @@ -870,8 +869,7 @@ impl Packetizer { mark = false; } Ok(Some(Packet { - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: self.stream_id, timestamp, ssrc: 0, @@ -923,8 +921,7 @@ impl Packetizer { } // TODO: placeholders. Ok(Some(Packet { - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: self.stream_id, timestamp, ssrc: 0, @@ -1030,8 +1027,7 @@ mod tests { }; d.push(Packet { // plain SEI packet. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1044,8 +1040,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // STAP-A packet. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1058,8 +1053,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // FU-A packet, start. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1072,8 +1066,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // FU-A packet, middle. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1086,8 +1079,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // FU-A packet, end. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1128,8 +1120,7 @@ mod tests { }; d.push(Packet { // SPS with (incorrect) mark - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts1, ssrc: 0, @@ -1142,8 +1133,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // PPS - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts1, ssrc: 0, @@ -1160,8 +1150,7 @@ mod tests { // RFC 6184 section 5.1 says that "the timestamp must match that of // the primary coded picture of the access unit and that the marker // bit can only be set on the final packet of the access unit."" - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts2, ssrc: 0, @@ -1203,8 +1192,7 @@ mod tests { d.push(Packet { // Slice layer without partitioning non-IDR, representing the // last frame of the previous GOP. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts1, ssrc: 0, @@ -1222,8 +1210,7 @@ mod tests { assert_eq!(frame.timestamp, ts1); d.push(Packet { // SPS with (incorrect) timestamp matching last frame. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts1, ssrc: 0, @@ -1236,8 +1223,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // PPS, again with timestamp matching last frame. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts1, ssrc: 0, @@ -1250,8 +1236,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // Slice layer without partitioning IDR. Now correct timestamp. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp: ts2, ssrc: 0, @@ -1289,8 +1274,7 @@ mod tests { start: 0, }; d.push(Packet { // new SPS. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1302,8 +1286,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // same PPS again. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, @@ -1316,8 +1299,7 @@ mod tests { assert!(d.pull().is_none()); d.push(Packet { // dummy slice NAL to end the AU. - ctx: crate::RtspMessageContext::dummy(), - channel_id: 0, + ctx: crate::PacketContext::dummy(), stream_id: 0, timestamp, ssrc: 0, diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 1abfb1e..c3a18e4 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -157,7 +157,7 @@ impl AudioParameters { /// An audio frame, which consists of one or more samples. pub struct AudioFrame { - pub ctx: crate::RtspMessageContext, + pub ctx: crate::PacketContext, pub stream_id: usize, pub timestamp: crate::Timestamp, pub frame_length: NonZeroU32, @@ -202,7 +202,7 @@ impl Buf for AudioFrame { pub struct MessageParameters(onvif::CompressionType); pub struct MessageFrame { - pub ctx: crate::RtspMessageContext, + pub ctx: crate::PacketContext, pub timestamp: crate::Timestamp, pub stream_id: usize, @@ -242,8 +242,8 @@ pub struct VideoFrame { // A pair of contexts: for the start and for the end. // Having both can be useful to measure the total time elapsed while receiving the frame. - start_ctx: crate::RtspMessageContext, - end_ctx: crate::RtspMessageContext, + start_ctx: crate::PacketContext, + end_ctx: crate::PacketContext, /// This picture's timestamp in the time base associated with the stream. pub timestamp: crate::Timestamp, @@ -265,12 +265,12 @@ pub struct VideoFrame { impl VideoFrame { #[inline] - pub fn start_ctx(&self) -> crate::RtspMessageContext { + pub fn start_ctx(&self) -> crate::PacketContext { self.start_ctx } #[inline] - pub fn end_ctx(&self) -> crate::RtspMessageContext { + pub fn end_ctx(&self) -> crate::PacketContext { self.end_ctx } diff --git a/src/codec/onvif.rs b/src/codec/onvif.rs index 6531546..5401d6b 100644 --- a/src/codec/onvif.rs +++ b/src/codec/onvif.rs @@ -36,7 +36,7 @@ enum State { #[derive(Debug)] struct InProgress { - ctx: crate::RtspMessageContext, + ctx: crate::PacketContext, timestamp: crate::Timestamp, data: BytesMut, loss: u16, diff --git a/src/error.rs b/src/error.rs index fd2efee..66a986d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,7 +3,7 @@ use std::fmt::Display; -use crate::{ConnectionContext, RtspMessageContext}; +use crate::{ConnectionContext, PacketContext, RtspMessageContext}; use thiserror::Error; /// An opaque `std::error::Error + Send + Sync + 'static` implementation. @@ -65,23 +65,21 @@ pub(crate) enum ErrorInt { channel_id: u8, }, - #[error("[{conn_ctx}, {msg_ctx} channel {channel_id} stream {stream_id}] RTSP data message: {description}")] - RtspDataMessageError { + #[error("[{conn_ctx}, {pkt_ctx} stream {stream_id}]: {description}")] + PacketError { conn_ctx: ConnectionContext, - msg_ctx: RtspMessageContext, - channel_id: u8, + pkt_ctx: PacketContext, stream_id: usize, description: String, }, #[error( - "[{conn_ctx}, {msg_ctx}, channel={channel_id}, stream={stream_id}, ssrc={ssrc:08x}, \ + "[{conn_ctx}, {pkt_ctx}, stream={stream_id}, ssrc={ssrc:08x}, \ seq={sequence_number:08x}] {description}" )] RtpPacketError { conn_ctx: ConnectionContext, - msg_ctx: RtspMessageContext, - channel_id: u8, + pkt_ctx: crate::PacketContext, stream_id: usize, ssrc: u32, sequence_number: u16, @@ -92,12 +90,19 @@ pub(crate) enum ErrorInt { ConnectError(#[source] std::io::Error), #[error("[{conn_ctx}, {msg_ctx}] Error reading from RTSP peer: {source}")] - ReadError { + RtspReadError { conn_ctx: ConnectionContext, msg_ctx: RtspMessageContext, source: std::io::Error, }, + #[error("[{conn_ctx}, {pkt_ctx}] Error receiving UDP packet: {source}")] + UdpRecvError { + conn_ctx: ConnectionContext, + pkt_ctx: PacketContext, + source: std::io::Error, + }, + #[error("[{conn_ctx}] Error writing to RTSP peer: {source}")] WriteError { conn_ctx: ConnectionContext, diff --git a/src/lib.rs b/src/lib.rs index c865ca5..bee03ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,11 +2,15 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 use bytes::Bytes; +use log::trace; //use failure::{bail, format_err, Error}; use once_cell::sync::Lazy; +use rand::Rng; use rtsp_types::Message; use std::fmt::{Debug, Display}; +use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::num::NonZeroU32; +use std::ops::Range; mod error; mod rtcp; @@ -218,8 +222,7 @@ pub struct ConnectionContext { impl ConnectionContext { #[doc(hidden)] pub fn dummy() -> Self { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); + let addr = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0); Self { local_addr: addr, peer_addr: addr, @@ -283,6 +286,58 @@ impl Display for RtspMessageContext { } } +/// Context for an RTP or RTCP packet, received either via RTSP interleaved data or UDP. +/// +/// Should be paired with an [`RtspConnectionContext`] of the RTSP connection that started +/// the session. In the interleaved data case, it's assumed the packet was received over +/// that same connection. +#[derive(Copy, Clone, Debug)] +pub struct PacketContext(PacketContextInner); + +impl PacketContext { + #[doc(hidden)] + pub fn dummy() -> PacketContext { + Self(PacketContextInner::Dummy) + } +} + +#[derive(Copy, Clone, Debug)] +enum PacketContextInner { + Tcp { + msg_ctx: RtspMessageContext, + channel_id: u8, + }, + Udp { + local_addr: SocketAddr, + peer_addr: SocketAddr, + received_wall: WallTime, + received: std::time::Instant, + }, + Dummy, +} + +impl Display for PacketContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.0 { + PacketContextInner::Udp { + local_addr, + peer_addr, + received_wall, + .. + } => { + write!(f, "{}->{}@{}", peer_addr, local_addr, received_wall) + } + PacketContextInner::Tcp { + msg_ctx, + channel_id, + } => { + write!(f, "{} ch={}", msg_ctx, channel_id) + } + PacketContextInner::Dummy => write!(f, "dummy"), + } + } +} + /// Returns the range within `buf` that represents `subset`. /// If `subset` is empty, returns None; otherwise panics if `subset` is not within `buf`. pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option> { @@ -303,3 +358,77 @@ pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option Result { + const MAX_TRIES: usize = 10; + const ALLOWED_RTP_RANGE: Range = 5000..65000; // stolen from ffmpeg's defaults. + let mut rng = rand::thread_rng(); + for i in 0..MAX_TRIES { + let rtp_port = rng.gen_range(ALLOWED_RTP_RANGE) & !0b1; + debug_assert!(ALLOWED_RTP_RANGE.contains(&rtp_port)); + let rtp_addr = SocketAddr::new(ip_addr, rtp_port); + let rtp_socket = match UdpSocket::bind(rtp_addr) { + Ok(s) => s, + Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { + trace!( + "Try {}/{}: unable to bind RTP addr {:?}", + i, + MAX_TRIES, + rtp_addr + ); + continue; + } + Err(e) => return Err(e), + }; + let rtcp_addr = SocketAddr::new(ip_addr, rtp_port + 1); + let rtcp_socket = match UdpSocket::bind(rtcp_addr) { + Ok(s) => s, + Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { + trace!( + "Try {}/{}: unable to bind RTCP addr {:?}", + i, + MAX_TRIES, + rtcp_addr + ); + continue; + } + Err(e) => return Err(e), + }; + return Ok(Self { + rtp_port, + rtp_socket, + rtcp_socket, + }); + } + Err(std::io::Error::new( + std::io::ErrorKind::AddrInUse, + format!( + "Unable to find even/odd pair in {}:{}..{} after {} tries", + ip_addr, ALLOWED_RTP_RANGE.start, ALLOWED_RTP_RANGE.end, MAX_TRIES + ), + )) + } +} + +#[cfg(test)] +mod test { + use std::net::Ipv4Addr; + + use super::*; + + #[test] + fn local_udp_pair() { + // Just test that it succeeds. + UdpPair::for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).unwrap(); + } +} diff --git a/src/tokio.rs b/src/tokio.rs index 786e1e9..cdd840b 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -11,7 +11,7 @@ use pretty_hex::PrettyHex; use rtsp_types::{Data, Message}; use std::convert::TryFrom; use std::time::Instant; -use tokio::net::TcpStream; +use tokio::net::{TcpStream, UdpSocket}; use tokio_util::codec::Framed; use url::Host; @@ -84,7 +84,7 @@ impl Stream for Connection { ) -> std::task::Poll> { self.0.poll_next_unpin(cx).map_err(|e| { wrap!(match e { - CodecError::IoError(error) => ErrorInt::ReadError { + CodecError::IoError(error) => ErrorInt::RtspReadError { conn_ctx: *self.ctx(), msg_ctx: self.eof_ctx(), source: error, @@ -296,3 +296,23 @@ impl tokio_util::codec::Encoder> for Codec { Ok(()) } } + +/// tokio-specific version of [`crate::UdpPair`]. +pub(crate) struct UdpPair { + pub(crate) rtp_port: u16, + pub(crate) rtp_socket: UdpSocket, + pub(crate) rtcp_socket: UdpSocket, +} + +impl UdpPair { + pub(crate) fn for_ip(ip_addr: std::net::IpAddr) -> Result { + let inner = crate::UdpPair::for_ip(ip_addr)?; + inner.rtp_socket.set_nonblocking(true)?; + inner.rtcp_socket.set_nonblocking(true)?; + Ok(Self { + rtp_port: inner.rtp_port, + rtp_socket: UdpSocket::from_std(inner.rtp_socket)?, + rtcp_socket: UdpSocket::from_std(inner.rtcp_socket)?, + }) + } +}