compute keepalive interval from session timeout
Alessandro Ros [asserts](https://github.com/aler9/rtsp-simple-server/issues/1066#issuecomment-1206405770) that some servers have timeouts shorter than 30 seconds; this should improve compatibility with them. Also add some debug logging around keepalives.
This commit is contained in:
parent
bb3baf0329
commit
da33dddbd7
@ -1,3 +1,9 @@
|
||||
## unreleased
|
||||
|
||||
* Send keepalives at every half-session-timeout rather than a fixed 30-second
|
||||
interval. This should improve compatibility with servers that have session
|
||||
timeouts under 30 seconds.
|
||||
|
||||
## `v0.4.0` (2022-05-17)
|
||||
|
||||
* BREAKING: remove deprecated `retina::client::Session<Playing>::teardown` and
|
||||
|
@ -39,9 +39,6 @@ pub mod rtp;
|
||||
mod teardown;
|
||||
mod timeline;
|
||||
|
||||
/// Duration between keepalive RTSP requests during [Playing] state.
|
||||
const KEEPALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
/// Assumed expiration time for stale live555 TCP sessions (case #2 of "Stale
|
||||
/// sessions" in [`SessionGroup`]).
|
||||
///
|
||||
@ -400,6 +397,15 @@ impl std::str::FromStr for InitialTimestampPolicy {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an appropriate keepalive interval for `session`.
|
||||
///
|
||||
/// This generally uses half the session timeout. However, it's capped in case
|
||||
/// the server offers a generous timeout, messages are rare (e.g. ONVIF metadata
|
||||
/// streams), and there's a NAT box between with a shorter timeout.
|
||||
fn keepalive_interval(session: &SessionHeader) -> std::time::Duration {
|
||||
std::time::Duration::from_secs(std::cmp::min(u64::from(session.timeout_sec), 60)) / 2
|
||||
}
|
||||
|
||||
/// Options which must be known right as a session is created.
|
||||
///
|
||||
/// Decisions which can be deferred are in [`SetupOptions`] or [`PlayOptions`] instead.
|
||||
@ -1471,7 +1477,13 @@ impl Session<Described> {
|
||||
});
|
||||
}
|
||||
Some(_) => {}
|
||||
None => *inner.session = Some(response.session),
|
||||
None => {
|
||||
debug!(
|
||||
"established session {:?}, timeout={}s",
|
||||
response.session.id, response.session.timeout_sec
|
||||
);
|
||||
*inner.session = Some(response.session)
|
||||
}
|
||||
};
|
||||
let conn_ctx = conn.inner.ctx();
|
||||
let (stream_ctx, udp_sockets);
|
||||
@ -1695,7 +1707,7 @@ impl Session<Described> {
|
||||
StreamState::Playing { .. } => unreachable!(),
|
||||
};
|
||||
}
|
||||
*inner.keepalive_timer = Some(Box::pin(tokio::time::sleep(KEEPALIVE_DURATION)));
|
||||
*inner.keepalive_timer = Some(Box::pin(tokio::time::sleep(keepalive_interval(session))));
|
||||
Ok(Session(self.0, Playing(())))
|
||||
}
|
||||
}
|
||||
@ -1865,6 +1877,11 @@ impl Session<Playing> {
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Result<(), Error> {
|
||||
let inner = self.0.as_mut().project();
|
||||
let session = inner
|
||||
.session
|
||||
.as_ref()
|
||||
.expect("keepalive_timer can't fire without a session");
|
||||
let keepalive_interval = keepalive_interval(session);
|
||||
let conn = inner
|
||||
.conn
|
||||
.as_mut()
|
||||
@ -1877,7 +1894,7 @@ impl Session<Playing> {
|
||||
std::io::ErrorKind::TimedOut,
|
||||
format!(
|
||||
"Unable to write keepalive {} within {:?}",
|
||||
cseq, KEEPALIVE_DURATION,
|
||||
cseq, keepalive_interval,
|
||||
),
|
||||
),
|
||||
}),
|
||||
@ -1888,7 +1905,7 @@ impl Session<Playing> {
|
||||
std::io::ErrorKind::TimedOut,
|
||||
format!(
|
||||
"Server failed to respond to keepalive {} within {:?}",
|
||||
cseq, KEEPALIVE_DURATION,
|
||||
cseq, keepalive_interval,
|
||||
),
|
||||
),
|
||||
}),
|
||||
@ -1903,13 +1920,22 @@ impl Session<Playing> {
|
||||
));
|
||||
}
|
||||
|
||||
// Send a new one and reset the timer.
|
||||
// Use a SET_PARAMETER with no body for keepalives, as recommended in the
|
||||
// ONVIF Streaming Specification version version 21.06 section 5.2.2.2.
|
||||
let session_id = &*inner.session.as_ref().unwrap().id;
|
||||
// Send a new keepalive and reset the timer.
|
||||
//
|
||||
// RTSP/1.0 (the version Retina implements) doesn't describe how to send
|
||||
// a keepalive. For now, we're using a SET_PARAMETER with no body for
|
||||
// keepalives, as recommended by the ONVIF Streaming Specification
|
||||
// version 21.06 section 5.2.2.2
|
||||
// <https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec.pdf> and
|
||||
// RTSP/2.0.
|
||||
//
|
||||
// Note this doesn't work with unpatched rtsp-simple-server as of 2021-08-07.
|
||||
// See discussion at <https://github.com/aler9/rtsp-simple-server/issues/1066>.
|
||||
// We could work with this version by trying OPTIONS first (and only switching
|
||||
// to SET_PARAMETER/GET_PARAMETER if OPTIONS advertises the respective method).
|
||||
let mut req = rtsp_types::Request::builder(Method::SetParameter, rtsp_types::Version::V1_0)
|
||||
.request_uri(inner.presentation.base_url.clone())
|
||||
.header(rtsp_types::headers::SESSION, session_id.to_string())
|
||||
.header(rtsp_types::headers::SESSION, session.id.to_string())
|
||||
.build(Bytes::new());
|
||||
let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?;
|
||||
conn.inner
|
||||
@ -1926,7 +1952,7 @@ impl Session<Playing> {
|
||||
.as_mut()
|
||||
.expect("keepalive timer set in state Playing")
|
||||
.as_mut()
|
||||
.reset(tokio::time::Instant::now() + KEEPALIVE_DURATION);
|
||||
.reset(tokio::time::Instant::now() + keepalive_interval);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1939,7 +1965,15 @@ impl Session<Playing> {
|
||||
if matches!(inner.keepalive_state,
|
||||
KeepaliveState::Waiting(cseq) if parse::get_cseq(&response) == Some(*cseq))
|
||||
{
|
||||
// We don't care if the keepalive response succeeds or fails. Just mark complete.
|
||||
// We don't care if the keepalive response succeeds or fails, but we should
|
||||
// log it, to help debugging if on failure the server doesn't extend the
|
||||
// timeout or gets angry and closes the connection. (rtsp-simple-server
|
||||
// does the latter as of 2022-08-07, though I'm told this will be fixed.)
|
||||
if !response.status().is_success() {
|
||||
warn!("keepalive failed with {:?}", response.status());
|
||||
} else {
|
||||
trace!("keepalive succeeded with {:?}", response.status());
|
||||
}
|
||||
*inner.keepalive_state = KeepaliveState::Idle;
|
||||
return Ok(());
|
||||
}
|
||||
@ -2674,9 +2708,9 @@ mod tests {
|
||||
let stale_sessions = group.stale_sessions();
|
||||
assert_eq!(stale_sessions.num_sessions, 1);
|
||||
|
||||
// Even if repeated attempts fail, the stale session will go await on timeout.
|
||||
// The "60" here is the RFC-defined default timeout when none is specified
|
||||
// in the SETUP response.
|
||||
// Even if repeated attempts fail, the stale session will go away on timeout.
|
||||
// The "60" in the assert below is the RFC-defined default timeout when
|
||||
// none is specified in the SETUP response.
|
||||
group.await_stale_sessions(&stale_sessions).await;
|
||||
assert_eq!(group.stale_sessions().num_sessions, 0);
|
||||
|
||||
|
@ -524,6 +524,14 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<Setu
|
||||
if let Some(v) = timeout_str.trim().strip_prefix("timeout=") {
|
||||
let timeout_sec =
|
||||
u32::from_str_radix(v, 10).map_err(|_| format!("Unparseable timeout {}", v))?;
|
||||
|
||||
if timeout_sec == 0 {
|
||||
// This would make Retina send keepalives at an absurd rate; reject.
|
||||
return Err(format!(
|
||||
"Invalid timeout=0 in Session header {:?}",
|
||||
session.as_str()
|
||||
));
|
||||
}
|
||||
SessionHeader {
|
||||
id: id.into(),
|
||||
timeout_sec,
|
||||
|
Loading…
Reference in New Issue
Block a user