new more reliable TEARDOWN mechanism

This commit is contained in:
Scott Lamb 2021-09-09 17:34:49 -07:00
parent abf10eac16
commit 62f6949873
6 changed files with 501 additions and 113 deletions

View File

@ -6,6 +6,7 @@
* improve compatibility with cameras with non-compliant SDP, including
Anpviz ([#26](https://github.com/scottlamb/retina/issues/26) and
Geovision ([#33])(https://github.com/scottlamb/retina/issues/33)).
* new mechanism to more reliably send `TEARDOWN` requests.
## `v0.3.0` (2021-08-31)

View File

@ -26,10 +26,10 @@ use retina::{
codec::{AudioParameters, CodecItem, VideoParameters},
};
use std::io::SeekFrom;
use std::num::NonZeroU32;
use std::path::PathBuf;
use std::{convert::TryFrom, pin::Pin};
use std::{io::SeekFrom, sync::Arc};
use tokio::{
fs::File,
io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
@ -621,11 +621,20 @@ async fn copy<'a>(
/// Writes the `.mp4`, including trying to finish or clean up the file.
async fn write_mp4<'a>(
opts: &'a Opts,
session: &'a mut retina::client::Demuxed,
session: retina::client::Session<retina::client::Described>,
video_stream: Option<(usize, VideoParameters)>,
audio_stream: Option<(usize, AudioParameters)>,
stop_signal: Pin<Box<dyn Future<Output = Result<(), std::io::Error>>>>,
) -> Result<(), Error> {
let mut session = session
.play(
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
)
.await?
.demuxed()?;
// Append into a filename suffixed with ".partial", then try to either rename it into
// place if it's complete or delete it otherwise.
const PARTIAL_SUFFIX: &str = ".partial";
@ -641,7 +650,7 @@ async fn write_mp4<'a>(
)
.await?;
let result = copy(opts, session, stop_signal, &mut mp4).await;
let result = copy(opts, &mut session, stop_signal, &mut mp4).await;
if let Err(e) = result {
// Log errors about finishing, returning the original error.
if let Err(e) = mp4.finish().await {
@ -677,10 +686,12 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
let creds = super::creds(opts.src.username.clone(), opts.src.password.clone());
let stop_signal = Box::pin(tokio::signal::ctrl_c());
let session_group = Arc::new(retina::client::SessionGroup::default());
let mut session = retina::client::Session::describe(
opts.src.url.clone(),
retina::client::SessionOptions::default()
.creds(creds)
.session_group(session_group.clone())
.user_agent("Retina mp4 example".to_owned())
.transport(opts.transport),
)
@ -715,20 +726,11 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
if let Some((i, _)) = audio_stream {
session.setup(i).await?;
}
let mut session = session
.play(
retina::client::PlayOptions::default()
.initial_timestamp(opts.initial_timestamp)
.enforce_timestamps_with_max_jump_secs(NonZeroU32::new(10).unwrap()),
)
.await?
.demuxed()?;
let result = write_mp4(&opts, session, video_stream, audio_stream, stop_signal).await;
// TODO: should also send a TEARDOWN if the PLAY response won't parse or if
// demuxed() fails. The former isn't even possible with the current API.
let result = write_mp4(&opts, &mut session, video_stream, audio_stream, stop_signal).await;
if let Err(e) = session.teardown().await {
// Session has now been dropped, on success or failure. A TEARDOWN should
// be pending if necessary. session_group.teardown() will wait for it.
if let Err(e) = session_group.teardown().await {
log::error!("TEARDOWN failed: {}", e);
}
result

View File

@ -4,6 +4,7 @@
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::num::NonZeroU32;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::Instant;
use std::{borrow::Cow, fmt::Debug, num::NonZeroU16, pin::Pin};
@ -18,6 +19,7 @@ use rtsp_types::Method;
use tokio::net::UdpSocket;
use url::Url;
use crate::client::parse::SessionHeader;
use crate::codec::CodecItem;
use crate::{Error, ErrorInt, RtspMessageContext};
@ -29,6 +31,169 @@ mod timeline;
/// Duration between keepalive RTSP requests during [Playing] state.
pub const KEEPALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(30);
/// A stale RTP session.
struct StaleSession {
/// If this stale session was created from a dropped [`Session`], the
/// a channel which receives the result of a single `TEARDOWN` attempt.
teardown_receiver: Option<tokio::sync::watch::Receiver<Option<Result<(), String>>>>,
id: Option<Box<str>>,
is_tcp: bool,
/// Upper bound of advertised expiration time.
expires: tokio::time::Instant,
}
/// A grouping of sessions, currently used only to track stale sessions.
///
/// This is an experimental API which may change in an upcoming Retina version.
///
/// Stale sessions are ones that may still be attempting data transmission.
/// They are tracked in three cases:
///
/// 1. UDP sessions for which we've sent a `PLAY` request and dropped the
/// `Session` without receiving a `TEARDOWN` response. These may still be
/// consuming bandwidth.
/// 2. TCP sessions in a similar situation, if the server advertises a specific
/// buggy version of live555 (see [`has_live555_tcp_bug`]). These sessions
/// at least consume CPU on the server, and they will cause problems if
/// another connection claims the same file descriptor.
/// 3. TCP sessions we learn about via unexpected RTSP interleaved data
/// packets. These are assumed to be caused by the same bug as #2 but might
/// have been started by a process unknown to us.
///
/// Currently in cases #1 and #2, a single `TEARDOWN` will be attempted in the
/// background after the `Session` is dropped. [`SessionGroup::teardown`] can
/// be used to wait for it to conclude.
///
/// Stale sessions are forgotten either on teardown or expiration. In general,
/// the tracked expiration time is worst-case. The exception is if the sender
/// hasn't responded to a keepalive request. In that case there's theoretically
/// no bound on when the server could see the request and extend the session.
/// Retina ignores this possibility.
///
/// A SessionGroup can be of any granularity, but a typical use is to ensure
/// there are no stale sessions before starting a fresh session. Groups should
/// be sized to match that idea. If connecting to a live555 server affected by
/// the stale TCP session bug, it might be wise to have one group per server, so
/// that all such sessions can be drained before initiating new connections.
/// Otherwise it might be useful to have one group per describe URL (potentially
/// several per server) and have at most one active session per URL.
#[derive(Default)]
pub struct SessionGroup(Mutex<SessionGroupInner>);
#[derive(Default)]
struct SessionGroupInner {
sessions: Vec<StaleSession>,
}
/// The overall status of stale sessions belonging to a group.
pub struct StaleSessionStatus {
/// The maximum expire time of any stale session in this group.
pub max_expires: Option<tokio::time::Instant>,
/// The total number of stale sessions.
pub num_sessions: usize,
}
impl SessionGroup {
/// Returns the status of stale sessions in this group.
///
/// The caller might use this in a loop to sleep until there are no expired
/// sessions.
pub fn stale_sessions(&self) -> StaleSessionStatus {
let mut l = self.0.lock().unwrap();
l.prune(tokio::time::Instant::now());
StaleSessionStatus {
max_expires: l.sessions.iter().map(|s| s.expires).max(),
num_sessions: l.sessions.len(),
}
}
/// Waits for a `TEARDOWN` to be attempted on all stale sessions that exist
/// as of when this method is called, returning an error if any fail.
///
/// This has no timeout other than the sessions' expiration times. The
/// caller can wrap the call in `tokio::time::timeout` for an earlier time.
///
/// Currently on `Session::drop`, a `TEARDOWN` is started in the background.
/// This method waits for that to conclude. It doesn't attempt any new
/// `TEARDOWN` requests, even if called repeatedly. This may change.
///
/// Ignores the case #3 sessions, as it's not possible to tear them down. If
/// desired, the caller can learn of those through
/// [`SessionGroup::stale_sessions`] and sleep until they expire.
pub async fn teardown(&self) -> Result<(), Error> {
let mut watches: Vec<_>;
{
let mut l = self.0.lock().unwrap();
l.prune(tokio::time::Instant::now());
watches = l
.sessions
.iter()
.filter_map(|s| s.teardown_receiver.clone())
.collect();
}
let mut overall_result = Ok(());
for w in &mut watches {
let mut r = (*w.borrow_and_update()).clone();
if r.is_none() {
// An attempt hasn't finished yet. Wait for it.
w.changed()
.await
.expect("teardown Sender shouldn't be dropped");
r = (*w.borrow()).clone();
}
// Now an attempt has finished, success or failure.
let r = r.expect("teardown result should be populated after change");
overall_result = overall_result.and(r);
}
overall_result.map_err(|description| wrap!(ErrorInt::Teardown(description)))
}
/// Notes an unexpected RTSP interleaved data message.
///
/// This is assumed to be due to a live555 RTP/AVP/TCP session that belonged
/// to a since-closed RTSP connection. If there's no known session which
/// explains this, adds an unknown session with live555's default timeout,
/// 65 seconds.
fn note_stale_live555_data(&self) {
let mut lock = self.0.lock().unwrap();
let now = tokio::time::Instant::now();
lock.prune(now);
for s in &lock.sessions {
if s.is_tcp {
// This session plausibly explains the packet.
// (We could go so far as to examine the data packet's SSRC to
// see if it matches one associated with this session. But
// retrying once per expiration is probably good enough.)
return;
}
}
lock.sessions.push(StaleSession {
// The caller *might* have a better guess than 65 seconds via a
// SETUP response, but it's also possible for
// note_stale_live555_data to be called prior to SETUP.
expires: tokio::time::Instant::now() + std::time::Duration::from_secs(65),
teardown_receiver: None,
is_tcp: true,
id: None,
});
}
}
impl SessionGroupInner {
/// Removes expired sessions.
fn prune(&mut self, now: tokio::time::Instant) {
self.sessions.retain(|session| session.expires > now);
}
}
/// Policy for handling the `rtptime` parameter normally seem in the `RTP-Info` header.
/// This parameter is used to map each stream's RTP timestamp to NPT ("normal play time"),
/// allowing multiple streams to be played in sync.
@ -96,6 +261,7 @@ pub struct SessionOptions {
creds: Option<Credentials>,
user_agent: Option<Box<str>>,
transport: Transport,
session_group: Option<Arc<SessionGroup>>,
}
#[derive(Copy, Clone, Debug)]
@ -165,6 +331,11 @@ impl SessionOptions {
self.transport = transport;
self
}
pub fn session_group(mut self, session_group: Arc<SessionGroup>) -> Self {
self.session_group = Some(session_group);
self
}
}
/// Options which must be decided at `PLAY` time.
@ -376,19 +547,25 @@ enum ResponseMode {
/// attempt to interpret them before having `RTP-Info`.
Play,
/// Silently discard data messages and responses to the given keepalive
/// while awaiting the response to this request.
Teardown { keepalive_cseq: Option<u32> },
/// Discard data messages and unrelated responses while awaiting the
/// response to this request.
Teardown,
}
/// An RTSP session, or a connection that may be used in a proscriptive way.
/// See discussion at [State].
pub struct Session<S: State>(Pin<Box<SessionInner>>, S);
#[pin_project]
#[pin_project(PinnedDrop)]
struct SessionInner {
// TODO: allow this to be closed and reopened during a UDP session?
conn: RtspConnection,
/// The connection. Currently there's expected to always be a RTSP
/// connection, even while playing a RTP/AVP/UDP session. The only
/// exception is during drop.
conn: Option<RtspConnection>,
/// A handle to the tokio runtime which created this session. It will
/// be used to asynchronously send a `TEARDOWN` on drop.
runtime_handle: Option<tokio::runtime::Handle>,
options: SessionOptions,
requested_auth: Option<digest_auth::WwwAuthenticateHeader>,
@ -397,7 +574,7 @@ struct SessionInner {
/// This will be set iff one or more `SETUP` calls have been issued.
/// This is sometimes true in state `Described` and always true in state
/// `Playing`.
session_id: Option<Box<str>>,
session: Option<parse::SessionHeader>,
// Keep some information about the DESCRIBE response. If a depacketizer
// couldn't be constructed correctly for one or more streams, this will be
@ -413,6 +590,9 @@ struct SessionInner {
keepalive_timer: Option<Pin<Box<tokio::time::Sleep>>>,
maybe_playing: bool,
has_live555_tcp_bug: bool,
/// The index within `presentation.streams` to start the next poll at.
/// Round-robining between them rather than always starting at 0 should
/// prevent one stream from starving the others.
@ -485,14 +665,9 @@ impl RtspConnection {
if response_cseq == cseq {
break (r, msg_ctx);
}
if let ResponseMode::Teardown {
keepalive_cseq: Some(k),
} = mode
{
if response_cseq == k {
debug!("ignoring keepalive {} response during TEARDOWN", k);
continue;
}
if matches!(mode, ResponseMode::Teardown) {
debug!("ignoring unrelated response during TEARDOWN");
continue;
}
format!("{} response with CSeq {}", r.reason_phrase(), response_cseq)
} else {
@ -517,6 +692,11 @@ impl RtspConnection {
continue;
}
}
if let Some(session_group) = options.session_group.as_ref() {
session_group.note_stale_live555_data();
}
format!(
"{}-byte interleaved data message on channel {}",
d.len(),
@ -662,6 +842,8 @@ impl Session<Described> {
/// depacketized correctly. If those streams are setup via
/// `Session<Described>::setup`, the erorrs in question will be ultimately
/// returned from `Stream<Playing>::demuxed`.
///
/// Expects to be called from a tokio runtime.
pub async fn describe(url: Url, options: SessionOptions) -> Result<Self, Error> {
let conn = RtspConnection::connect(&url).await?;
Self::describe_with_conn(conn, options, url).await
@ -695,18 +877,26 @@ impl Session<Described> {
description,
})
})?;
let has_live555_tcp_bug = presentation
.tool
.as_deref()
.map(has_live555_tcp_bug)
.unwrap_or(false);
Ok(Session(
Box::pin(SessionInner {
conn,
conn: Some(conn),
options,
runtime_handle: tokio::runtime::Handle::try_current().ok(),
requested_auth,
presentation,
session_id: None,
session: None,
describe_ctx: msg_ctx,
describe_cseq: cseq,
describe_status: response.status(),
keepalive_state: KeepaliveState::Idle,
keepalive_timer: None,
maybe_playing: false,
has_live555_tcp_bug,
udp_next_poll_i: 0,
}),
Described(()),
@ -730,7 +920,7 @@ impl Session<Described> {
let inner = &mut self.0.as_mut().project();
let presentation = &mut inner.presentation;
let options = &inner.options;
let conn = &mut inner.conn;
let conn = inner.conn.as_mut().unwrap();
let stream = &mut presentation.streams[stream_i];
if !matches!(stream.state, StreamState::Uninit) {
bail!(ErrorInt::FailedPrecondition("stream already set up".into()));
@ -784,8 +974,8 @@ impl Session<Described> {
);
}
}
if let Some(ref s) = inner.session_id {
req = req.header(rtsp_types::headers::SESSION, s.to_string());
if let Some(ref s) = inner.session {
req = req.header(rtsp_types::headers::SESSION, s.id.to_string());
}
let (msg_ctx, cseq, response) = conn
.send(
@ -808,22 +998,22 @@ impl Session<Described> {
description,
})
})?;
match inner.session_id.as_ref() {
Some(old) if old.as_ref() != response.session_id => {
match inner.session.as_ref() {
Some(SessionHeader { id, .. }) if id.as_ref() != &*response.session.id => {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Setup,
cseq,
status,
description: format!(
"session id changed from {:?} to {:?}",
old, response.session_id,
id, response.session.id,
),
});
}
Some(_) => {}
None => *inner.session_id = Some(response.session_id.into()),
None => *inner.session = Some(response.session),
};
let conn_ctx = conn.inner.ctx();
match options.transport {
@ -831,7 +1021,7 @@ impl Session<Described> {
let channel_id = match response.channel_id {
Some(id) => id,
None => bail!(ErrorInt::RtspResponseError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Setup,
cseq,
@ -904,15 +1094,23 @@ impl Session<Described> {
///
/// The presentation must support aggregate control, as defined in [RFC 2326
/// section 1.3](https://tools.ietf.org/html/rfc2326#section-1.3).
///
/// When a `Session<Playing>` is dropped—including when `play` returns
/// error, a `TEARDOWN` request may be necessary. This will happen in the
/// background. The caller may need to wait for the session to be
/// successfully destroyed, for example before exiting the program or before
/// starting another session. See [`SessionOptions::session_group`] and
/// [`SessionGroup::teardown`].
pub async fn play(mut self, policy: PlayOptions) -> Result<Session<Playing>, Error> {
let inner = self.0.as_mut().project();
let session_id = inner.session_id.as_deref().ok_or_else(|| {
let conn = inner.conn.as_mut().unwrap();
let session = inner.session.as_ref().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition(
"must SETUP before PLAY".into()
))
})?;
if let Some(tool) = inner.presentation.tool.as_deref() {
if matches!(inner.options.transport, Transport::Tcp) && has_live555_tcp_bug(tool) {
if matches!(inner.options.transport, Transport::Tcp) && *inner.has_live555_tcp_bug {
warn!(
"Connecting via TCP to known-broken RTSP server {:?}. \
See <https://github.com/scottlamb/retina/issues/17>. \
@ -922,23 +1120,23 @@ impl Session<Described> {
}
}
trace!("PLAY with channel mappings: {:#?}", &inner.conn.channels);
let (msg_ctx, cseq, response) = inner
.conn
trace!("PLAY with channel mappings: {:#?}", &conn.channels);
*inner.maybe_playing = true;
let (msg_ctx, cseq, response) = conn
.send(
ResponseMode::Play,
&inner.options,
inner.requested_auth,
&mut rtsp_types::Request::builder(Method::Play, rtsp_types::Version::V1_0)
.request_uri(inner.presentation.control.clone())
.header(rtsp_types::headers::SESSION, session_id.clone())
.header(rtsp_types::headers::SESSION, &*session.id)
.header(rtsp_types::headers::RANGE, "npt=0.000-".to_owned())
.build(Bytes::new()),
)
.await?;
parse::parse_play(&response, inner.presentation).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Play,
cseq,
@ -978,7 +1176,7 @@ impl Session<Described> {
{
if initial_rtptime.is_none() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: rtsp_types::Method::Play,
cseq,
@ -1007,7 +1205,7 @@ impl Session<Described> {
}
o => o,
};
let conn_ctx = inner.conn.inner.ctx();
let conn_ctx = conn.inner.ctx();
s.state = StreamState::Playing {
timeline: Timeline::new(
initial_rtptime,
@ -1097,7 +1295,7 @@ impl Session<Playing> {
if matches!(s.state, StreamState::Playing { .. }) {
if let Err(ref description) = s.depacketizer {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *inner.conn.as_ref().unwrap().inner.ctx(),
msg_ctx: *inner.describe_ctx,
method: rtsp_types::Method::Describe,
cseq: *inner.describe_cseq,
@ -1113,45 +1311,37 @@ impl Session<Playing> {
})
}
/// Tears down the session.
/// Sends a `TEARDOWN`, ending the session.
///
/// This attempts to send a `TEARDOWN` request to end the session:
/// * When using UDP, this signals the server to end the data stream
/// promptly rather than wait for timeout.
/// * Even when using TCP, some old versions of `live555` servers will
/// incorrectly keep trying to send packets to this connection, burning
/// CPU until timeout, and potentially sending packets to a freshly
/// opened connection that happens to claim the same file descriptor
/// number.
///
/// Discards any RTSP interleaved data messages received on the socket
/// while waiting for `TEARDOWN` response.
///
/// Currently closes the socket(s) immediately after `TEARDOWN` response,
/// even on failure. This behavior may change in a future release.
/// Note relying on this method to tear down the session misses some cases
/// in which a `TEARDOWN` is necessary:
/// * `Session<Described>::play` may fail parsing the response. It
/// consumes the session, so calling this method is not possible.
/// * `Session<Playing>::demuxed` similarly consumes the session.
/// See [`SessionOptions::session_group`] and [`SessionGroup::teardown`]
/// for a more robust mechanism.
pub async fn teardown(mut self) -> Result<(), Error> {
let inner = self.0.as_mut().project();
let mut req = rtsp_types::Request::builder(Method::Teardown, rtsp_types::Version::V1_0)
.request_uri(inner.presentation.base_url.clone())
.header(
rtsp_types::headers::SESSION,
inner.session_id.as_deref().unwrap().to_string(),
inner.session.as_ref().unwrap().id.to_string(),
)
.build(Bytes::new());
let keepalive_cseq = match inner.keepalive_state {
KeepaliveState::Idle => None,
KeepaliveState::Flushing(cseq) => Some(*cseq),
KeepaliveState::Waiting(cseq) => Some(*cseq),
};
inner
.conn
.as_mut()
.unwrap()
.send(
ResponseMode::Teardown { keepalive_cseq },
ResponseMode::Teardown,
inner.options,
inner.requested_auth,
&mut req,
)
.await?;
*inner.session = None;
*inner.maybe_playing = false;
Ok(())
}
@ -1164,10 +1354,11 @@ impl Session<Playing> {
cx: &mut std::task::Context<'_>,
) -> Result<(), Error> {
let inner = self.0.as_mut().project();
let conn = inner.conn.as_mut().unwrap();
// Expect the previous keepalive request to have finished.
match inner.keepalive_state {
KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
source: std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
@ -1177,8 +1368,8 @@ impl Session<Playing> {
),
}),
KeepaliveState::Waiting(cseq) => bail!(ErrorInt::RtspReadError {
conn_ctx: *inner.conn.inner.ctx(),
msg_ctx: inner.conn.inner.eof_ctx(),
conn_ctx: *conn.inner.ctx(),
msg_ctx: conn.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
@ -1192,7 +1383,7 @@ impl Session<Playing> {
// Currently the only outbound data should be keepalives, and the previous one
// has already been flushed, so there's no reason the Sink shouldn't be ready.
if matches!(inner.conn.inner.poll_ready_unpin(cx), Poll::Pending) {
if matches!(conn.inner.poll_ready_unpin(cx), Poll::Pending) {
bail!(ErrorInt::Internal(
"Unexpectedly not ready to send keepalive".into()
));
@ -1201,20 +1392,16 @@ impl Session<Playing> {
// Send a new one and reset the timer.
// Use a SET_PARAMETER with no body for keepalives, as recommended in the
// ONVIF Streaming Specification version version 21.06 section 5.2.2.2.
let session_id = inner.session_id.as_deref().unwrap();
let session_id = &*inner.session.as_ref().unwrap().id;
let mut req = rtsp_types::Request::builder(Method::SetParameter, rtsp_types::Version::V1_0)
.request_uri(inner.presentation.base_url.clone())
.header(rtsp_types::headers::SESSION, session_id.to_string())
.build(Bytes::new());
let cseq = inner
.conn
.fill_req(inner.options, inner.requested_auth, &mut req)?;
inner
.conn
.inner
let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?;
conn.inner
.start_send_unpin(rtsp_types::Message::Request(req))
.expect("encoding is infallible");
*inner.keepalive_state = match inner.conn.inner.poll_flush_unpin(cx) {
*inner.keepalive_state = match conn.inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq),
Poll::Ready(Err(e)) => bail!(e),
Poll::Pending => KeepaliveState::Flushing(cseq),
@ -1245,7 +1432,7 @@ impl Session<Playing> {
// The only response we expect in this state is to our keepalive request.
bail!(ErrorInt::RtspFramingError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *inner.conn.as_ref().unwrap().inner.ctx(),
msg_ctx: *msg_ctx,
description: format!("Unexpected RTSP response {:#?}", response),
})
@ -1257,15 +1444,16 @@ impl Session<Playing> {
data: rtsp_types::Data<Bytes>,
) -> Result<Option<PacketItem>, Error> {
let inner = self.0.as_mut().project();
let conn = inner.conn.as_ref().unwrap();
let channel_id = data.channel_id();
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp {
msg_ctx: *msg_ctx,
channel_id,
});
let m = match inner.conn.channels.lookup(channel_id) {
let m = match conn.channels.lookup(channel_id) {
Some(m) => m,
None => bail!(ErrorInt::RtspUnassignedChannelError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
msg_ctx: *msg_ctx,
channel_id,
}),
@ -1284,17 +1472,23 @@ impl Session<Playing> {
match m.channel_type {
ChannelType::Rtp => Ok(rtp_handler.rtp(
&inner.options,
inner.conn.inner.ctx(),
conn.inner.ctx(),
&pkt_ctx,
&mut timeline,
m.stream_i,
data.into_body(),
)?),
ChannelType::Rtcp => {
match rtp_handler.rtcp(&pkt_ctx, &mut timeline, m.stream_i, data.into_body()) {
match rtp_handler.rtcp(
&inner.options,
&pkt_ctx,
&mut timeline,
m.stream_i,
data.into_body(),
) {
Ok(p) => Ok(p),
Err(description) => Err(wrap!(ErrorInt::PacketError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx: *conn.inner.ctx(),
pkt_ctx: pkt_ctx,
stream_id: m.stream_i,
description,
@ -1317,6 +1511,7 @@ impl Session<Playing> {
) -> Poll<Option<Result<PacketItem, Error>>> {
debug_assert!(buf.filled().is_empty());
let inner = self.0.as_mut().project();
let conn_ctx = *inner.conn.as_ref().unwrap().inner.ctx();
let s = &mut inner.presentation.streams[i];
if let Some(sockets) = &mut s.sockets {
let (mut timeline, rtp_handler) = match &mut s.state {
@ -1337,12 +1532,12 @@ impl Session<Playing> {
match r {
Ok(()) => {
let msg = Bytes::copy_from_slice(buf.filled());
match rtp_handler.rtcp(&pkt_ctx, &mut timeline, i, msg) {
match rtp_handler.rtcp(&inner.options, &pkt_ctx, &mut timeline, i, msg) {
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
Ok(None) => buf.clear(),
Err(description) => {
return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx,
pkt_ctx,
stream_id: i,
description,
@ -1352,7 +1547,7 @@ impl Session<Playing> {
}
Err(source) => {
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx,
pkt_ctx,
source,
}))))
@ -1371,7 +1566,7 @@ impl Session<Playing> {
let msg = Bytes::copy_from_slice(buf.filled());
match rtp_handler.rtp(
&inner.options,
inner.conn.inner.ctx(),
&conn_ctx,
&pkt_ctx,
&mut timeline,
i,
@ -1384,7 +1579,7 @@ impl Session<Playing> {
}
Err(source) => {
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
conn_ctx: *inner.conn.inner.ctx(),
conn_ctx,
pkt_ctx,
source,
}))))
@ -1430,6 +1625,111 @@ impl Session<Playing> {
}
}
#[pin_project::pinned_drop]
impl PinnedDrop for SessionInner {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
let is_tcp = matches!(this.options.transport, Transport::Tcp);
if !*this.maybe_playing || (is_tcp && !*this.has_live555_tcp_bug) {
// No TEARDOWN is necessary.
return;
}
let session = match this.session.take() {
Some(s) => s,
None => {
log::warn!("Session::drop: maybe_playing set without a session id");
return;
}
};
// For now, assume the whole timeout is left.
let expires = tokio::time::Instant::now()
+ std::time::Duration::from_secs(session.timeout_sec.into());
// Track the session, if there is a group.
let (teardown_sender, teardown_receiver) = tokio::sync::watch::channel(None);
if let Some(session_group) = this.options.session_group.as_ref() {
let mut lock = session_group.0.lock().unwrap();
lock.sessions.push(StaleSession {
expires: expires.clone(),
teardown_receiver: Some(teardown_receiver),
is_tcp,
id: Some(session.id.clone()),
});
}
let handle = match this.runtime_handle.take() {
Some(h) => h,
None => {
const MSG: &str = "Unable to start async TEARDOWN because describe wasn't called \
from a tokio runtime";
log::warn!("{}", MSG);
let _ = teardown_sender.send(Some(Err(MSG.into())));
return;
}
};
handle.spawn(background_teardown(
this.presentation.base_url.clone(),
session.id,
std::mem::take(this.options),
this.requested_auth.take(),
this.conn.take().unwrap(),
teardown_sender,
expires,
));
}
}
async fn background_teardown(
base_url: Url,
session_id: Box<str>,
options: SessionOptions,
mut requested_auth: Option<digest_auth::WwwAuthenticateHeader>,
mut conn: RtspConnection,
sender: tokio::sync::watch::Sender<Option<Result<(), String>>>,
expires: tokio::time::Instant,
) {
let mut req = rtsp_types::Request::builder(Method::Teardown, rtsp_types::Version::V1_0)
.request_uri(base_url)
.header(rtsp_types::headers::SESSION, session_id.to_string())
.build(Bytes::new());
let r = match tokio::time::timeout_at(
expires,
conn.send(
ResponseMode::Teardown,
&options,
&mut requested_auth,
&mut req,
),
)
.await
{
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(e.to_string()),
Err(_) => Err("unable to complete TEARDOWN before session expiration".to_owned()),
};
log::debug!("Background TEARDOWN: {:?}", &r);
if let (Some(ref mut session_group), Ok(_)) = (options.session_group, &r) {
let mut l = session_group.0.lock().unwrap();
let i = l
.sessions
.iter()
.position(|s| matches!(&s.id, Some(id) if &**id == &*session_id));
match i {
Some(i) => {
l.sessions.swap_remove(i);
}
None => log::warn!("Unable to find session {:?} on TEARDOWN", &*session_id),
}
}
// In the most common case, the send will fail because all receivers have been dropped.
let _ = sender.send(Some(r));
}
impl futures::Stream for Session<Playing> {
type Item = Result<PacketItem, Error>;
@ -1441,7 +1741,7 @@ impl futures::Stream for Session<Playing> {
// First try receiving data on the RTSP connection. Let this starve
// sending keepalives; if we can't keep up, the server should
// probably drop us.
match Pin::new(&mut self.0.conn.inner).poll_next(cx) {
match Pin::new(&mut self.0.conn.as_mut().unwrap().inner).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => match msg.msg {
rtsp_types::Message::Data(data) => {
match self.as_mut().handle_data(&msg.ctx, data) {
@ -1484,7 +1784,7 @@ impl futures::Stream for Session<Playing> {
// Then finish flushing the current keepalive if necessary.
if let KeepaliveState::Flushing(cseq) = self.0.keepalive_state {
match self.0.conn.inner.poll_flush_unpin(cx) {
match self.0.conn.as_mut().unwrap().inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => self.0.keepalive_state = KeepaliveState::Waiting(cseq),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Box::new(e))))),
Poll::Pending => {}
@ -1541,7 +1841,7 @@ impl futures::Stream for Demuxed {
Ok(d) => d,
Err(_) => unreachable!("depacketizer was Ok"),
};
let conn_ctx = inner.conn.inner.ctx();
let conn_ctx = inner.conn.as_ref().unwrap().inner.ctx();
if let Some(p) = pkt {
let pkt_ctx = p.ctx;
let stream_id = p.stream_id;

View File

@ -438,8 +438,14 @@ pub(crate) fn parse_describe(
})
}
pub(crate) struct SetupResponse<'a> {
pub(crate) session_id: &'a str,
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct SessionHeader {
pub(crate) id: Box<str>,
pub(crate) timeout_sec: u32,
}
pub(crate) struct SetupResponse {
pub(crate) session: SessionHeader,
pub(crate) ssrc: Option<u32>,
pub(crate) channel_id: Option<u8>,
pub(crate) source: Option<IpAddr>,
@ -451,12 +457,27 @@ pub(crate) struct SetupResponse<'a> {
/// Returns an assigned interleaved channel id (implying the next channel id
/// is also assigned) or errors.
pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<SetupResponse, String> {
// https://datatracker.ietf.org/doc/html/rfc2326#section-12.37
let session = response
.header(&rtsp_types::headers::SESSION)
.ok_or_else(|| "Missing Session header".to_string())?;
let session_id = match session.as_str().find(';') {
None => session.as_str(),
Some(i) => &session.as_str()[..i],
let session = match session.as_str().split_once(';') {
None => SessionHeader {
id: session.as_str().into(),
timeout_sec: 60, // default
},
Some((id, timeout_str)) => {
if let Some(v) = timeout_str.trim().strip_prefix("timeout=") {
let timeout_sec =
u32::from_str_radix(v, 10).map_err(|_| format!("Unparseable timeout {}", v))?;
SessionHeader {
id: id.into(),
timeout_sec,
}
} else {
return Err(format!("Unparseable Session header {:?}", session.as_str()));
}
}
};
let transport = response
.header(&rtsp_types::headers::TRANSPORT)
@ -504,7 +525,7 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<Setu
}
}
Ok(SetupResponse {
session_id,
session,
ssrc,
channel_id,
source,
@ -606,6 +627,7 @@ mod tests {
use crate::{client::StreamStateInit, codec::Parameters};
use super::super::StreamState;
use super::SessionHeader;
use crate::testutil::response;
fn parse_describe(
@ -705,7 +727,13 @@ mod tests {
// SETUP.
let setup_response = response(include_bytes!("testdata/dahua_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "634214675641");
assert_eq!(
setup_response.session,
SessionHeader {
id: "634214675641".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, Some(0x30a98ee7));
p.streams[0].state = StreamState::Init(StreamStateInit {
@ -802,7 +830,13 @@ mod tests {
// SETUP.
let setup_response = response(include_bytes!("testdata/hikvision_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "708345999");
assert_eq!(
setup_response.session,
SessionHeader {
id: "708345999".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, Some(0x4cacc3d1));
p.streams[0].state = StreamState::Init(StreamStateInit {
@ -881,7 +915,13 @@ mod tests {
// SETUP.
let setup_response = response(include_bytes!("testdata/reolink_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "F8F8E425");
assert_eq!(
setup_response.session,
SessionHeader {
id: "F8F8E425".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit::default());
@ -959,7 +999,13 @@ mod tests {
// SETUP.
let setup_response = response(include_bytes!("testdata/bunny_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "1642021126");
assert_eq!(
setup_response.session,
SessionHeader {
id: "1642021126".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit::default());
@ -1124,7 +1170,13 @@ mod tests {
// SETUP.
let setup_response = response(include_bytes!("testdata/gw_main_setup_video.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "9a90de54");
assert_eq!(
setup_response.session,
SessionHeader {
id: "9a90de54".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit {
@ -1135,7 +1187,13 @@ mod tests {
let setup_response = response(include_bytes!("testdata/gw_main_setup_audio.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "9a90de54");
assert_eq!(
setup_response.session,
SessionHeader {
id: "9a90de54".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(2));
assert_eq!(setup_response.ssrc, None);
p.streams[1].state = StreamState::Init(StreamStateInit {
@ -1204,7 +1262,13 @@ mod tests {
// SETUP.
let setup_response = response(include_bytes!("testdata/gw_sub_setup.txt"));
let setup_response = super::parse_setup(&setup_response).unwrap();
assert_eq!(setup_response.session_id, "9b0d0e54");
assert_eq!(
setup_response.session,
SessionHeader {
id: "9b0d0e54".into(),
timeout_sec: 60
}
);
assert_eq!(setup_response.channel_id, Some(0));
assert_eq!(setup_response.ssrc, None);
p.streams[0].state = StreamState::Init(StreamStateInit {

View File

@ -125,7 +125,11 @@ impl InorderParser {
let sequence_number = u16::from_be_bytes([data[2], data[3]]); // I don't like rtsp_rs::Seq.
let ssrc = reader.ssrc();
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
let is_tcp = matches!(session_options.transport, super::Transport::Tcp);
if matches!(self.ssrc, Some(s) if s != ssrc) {
if let (Some(session_group), true) = (session_options.session_group.as_ref(), is_tcp) {
session_group.note_stale_live555_data();
}
bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
pkt_ctx: *pkt_ctx,
@ -201,6 +205,7 @@ impl InorderParser {
pub fn rtcp(
&mut self,
session_options: &SessionOptions,
pkt_ctx: &PacketContext,
timeline: &mut Timeline,
stream_id: usize,
@ -227,6 +232,13 @@ impl InorderParser {
let ssrc = pkt.ssrc();
if matches!(self.ssrc, Some(s) if s != ssrc) {
use super::Transport;
if let (Some(session_group), Transport::Tcp) = (
session_options.session_group.as_ref(),
session_options.transport,
) {
session_group.note_stale_live555_data();
}
return Err(format!(
"Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}",
self.ssrc, ssrc

View File

@ -114,4 +114,13 @@ pub(crate) enum ErrorInt {
#[error("Internal error: {0}")]
Internal(#[source] Box<dyn std::error::Error + Send + Sync>),
/// Silly kind used by `SessionGroup::teardown`.
///
/// TODO: Currently the teardown process needs to clone its `Result`, and
/// `ErrorInt` isn't cloneable due to `Internal` above. We should come up
/// with some more satisfactory solution. Maybe `RtspConnection::send`
/// should return a more restricted error type which is cloneable.
#[error("{0}")]
Teardown(String),
}