use keepalive compatible with simple-rtsp-server

https://github.com/scottlamb/moonfire-nvr/issues/234
This commit is contained in:
Scott Lamb 2022-08-07 22:02:26 -07:00
parent da33dddbd7
commit 0d6784485b
4 changed files with 131 additions and 35 deletions

View File

@ -1,8 +1,12 @@
## unreleased ## unreleased
* Send keepalives at every half-session-timeout rather than a fixed 30-second * Send keepalives at every half-session-timeout rather than a fixed 30-second
interval. This should improve compatibility with servers that have session interval. This allows persistent connections to servers that have timeouts
timeouts under 30 seconds. shorter than 30 seconds.
* Use `OPTIONS` for initial keepalive, and only switch to `SET_PARAMETER` if
the server advertises its support. This allows persistent connections to
`rtsp-simple-server` v0.19.3, which does not support the latter method and
drops the connection on receiving unsupported methods.
## `v0.4.0` (2022-05-17) ## `v0.4.0` (2022-05-17)

View File

@ -903,8 +903,26 @@ impl State for Described {}
enum KeepaliveState { enum KeepaliveState {
Idle, Idle,
Flushing(u32), Flushing { cseq: u32, method: KeepaliveMethod },
Waiting(u32), Waiting { cseq: u32, method: KeepaliveMethod },
}
#[repr(u8)]
#[derive(Copy, Clone, Debug)]
enum KeepaliveMethod {
Options,
SetParameter,
GetParameter,
}
impl From<KeepaliveMethod> for rtsp_types::Method {
fn from(method: KeepaliveMethod) -> Self {
match method {
KeepaliveMethod::Options => rtsp_types::Method::Options,
KeepaliveMethod::SetParameter => rtsp_types::Method::SetParameter,
KeepaliveMethod::GetParameter => rtsp_types::Method::GetParameter,
}
}
} }
/// State after a `PLAY`; use via `Session<Playing>`. /// State after a `PLAY`; use via `Session<Playing>`.
@ -1015,6 +1033,14 @@ enum SessionFlag {
/// Set if one or more streams are configured to use UDP. /// Set if one or more streams are configured to use UDP.
UdpStreams = 0x4, UdpStreams = 0x4,
/// Set if an `OPTIONS` request has completed and advertised supported for
/// `SET_PARAMETER`.
SetParameterSupported = 0x8,
/// Set if an `OPTIONS` request has completed and advertised supported for
/// `GET_PARAMETER`.
GetParameterSupported = 0x10,
} }
impl RtspConnection { impl RtspConnection {
@ -1888,7 +1914,7 @@ impl Session<Playing> {
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?; .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
// Expect the previous keepalive request to have finished. // Expect the previous keepalive request to have finished.
match inner.keepalive_state { match inner.keepalive_state {
KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError { KeepaliveState::Flushing { cseq, .. } => bail!(ErrorInt::WriteError {
conn_ctx: *conn.inner.ctx(), conn_ctx: *conn.inner.ctx(),
source: std::io::Error::new( source: std::io::Error::new(
std::io::ErrorKind::TimedOut, std::io::ErrorKind::TimedOut,
@ -1898,7 +1924,7 @@ impl Session<Playing> {
), ),
), ),
}), }),
KeepaliveState::Waiting(cseq) => bail!(ErrorInt::RtspReadError { KeepaliveState::Waiting { cseq, .. } => bail!(ErrorInt::RtspReadError {
conn_ctx: *conn.inner.ctx(), conn_ctx: *conn.inner.ctx(),
msg_ctx: conn.inner.eof_ctx(), msg_ctx: conn.inner.eof_ctx(),
source: std::io::Error::new( source: std::io::Error::new(
@ -1923,28 +1949,35 @@ impl Session<Playing> {
// Send a new keepalive and reset the timer. // Send a new keepalive and reset the timer.
// //
// RTSP/1.0 (the version Retina implements) doesn't describe how to send // RTSP/1.0 (the version Retina implements) doesn't describe how to send
// a keepalive. For now, we're using a SET_PARAMETER with no body for // a keepalive. The ONVIF Streaming Specification (in version 21.06 section
// keepalives, as recommended by the ONVIF Streaming Specification // 5.2.2.2
// version 21.06 section 5.2.2.2 // <https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec.pdf>) and
// <https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec.pdf> and // RTSP/2.0 recommend using `SET_PARAMETER`. However, this method is optional,
// RTSP/2.0. // and some servers (e.g. rtsp-simple-server as of 2021-08-07) behave badly
// // on receiving unsupported methods. See discussion at
// Note this doesn't work with unpatched rtsp-simple-server as of 2021-08-07. // <https://github.com/aler9/rtsp-simple-server/issues/1066>. Initially
// See discussion at <https://github.com/aler9/rtsp-simple-server/issues/1066>. // send `OPTIONS`, then follow recommendations to use (bodyless)
// We could work with this version by trying OPTIONS first (and only switching // `SET_PARAMETER` or `GET_PARAMETER` if available.
// to SET_PARAMETER/GET_PARAMETER if OPTIONS advertises the respective method). let method = if *inner.flags & (SessionFlag::SetParameterSupported as u8) != 0 {
let mut req = rtsp_types::Request::builder(Method::SetParameter, rtsp_types::Version::V1_0) KeepaliveMethod::SetParameter
} else if *inner.flags & (SessionFlag::GetParameterSupported as u8) != 0 {
KeepaliveMethod::GetParameter
} else {
KeepaliveMethod::Options
};
let mut req = rtsp_types::Request::builder(method.into(), rtsp_types::Version::V1_0)
.request_uri(inner.presentation.base_url.clone()) .request_uri(inner.presentation.base_url.clone())
.header(rtsp_types::headers::SESSION, session.id.to_string()) .header(rtsp_types::headers::SESSION, session.id.to_string())
.build(Bytes::new()); .build(Bytes::new());
let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?; let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?;
trace!("sending {:?} keepalive", method);
conn.inner conn.inner
.start_send_unpin(rtsp_types::Message::Request(req)) .start_send_unpin(rtsp_types::Message::Request(req))
.expect("encoding is infallible"); .expect("encoding is infallible");
*inner.keepalive_state = match conn.inner.poll_flush_unpin(cx) { *inner.keepalive_state = match conn.inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq), Poll::Ready(Ok(())) => KeepaliveState::Waiting { cseq, method },
Poll::Ready(Err(e)) => bail!(e), Poll::Ready(Err(e)) => bail!(e),
Poll::Pending => KeepaliveState::Flushing(cseq), Poll::Pending => KeepaliveState::Flushing { cseq, method },
}; };
inner inner
@ -1962,8 +1995,9 @@ impl Session<Playing> {
response: rtsp_types::Response<Bytes>, response: rtsp_types::Response<Bytes>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let inner = self.0.as_mut().project(); let inner = self.0.as_mut().project();
if matches!(inner.keepalive_state, match inner.keepalive_state {
KeepaliveState::Waiting(cseq) if parse::get_cseq(&response) == Some(*cseq)) KeepaliveState::Waiting { cseq, method }
if parse::get_cseq(&response) == Some(*cseq) =>
{ {
// We don't care if the keepalive response succeeds or fails, but we should // We don't care if the keepalive response succeeds or fails, but we should
// log it, to help debugging if on failure the server doesn't extend the // log it, to help debugging if on failure the server doesn't extend the
@ -1973,10 +2007,27 @@ impl Session<Playing> {
warn!("keepalive failed with {:?}", response.status()); warn!("keepalive failed with {:?}", response.status());
} else { } else {
trace!("keepalive succeeded with {:?}", response.status()); trace!("keepalive succeeded with {:?}", response.status());
if matches!(method, KeepaliveMethod::Options) {
match parse::parse_options(&response) {
Ok(r) => {
if r.set_parameter_supported {
*inner.flags |= SessionFlag::SetParameterSupported as u8;
}
if r.get_parameter_supported {
*inner.flags |= SessionFlag::GetParameterSupported as u8;
}
}
Err(e) => {
warn!("Unable to parse OPTIONS response: {}", e);
}
}
}
} }
*inner.keepalive_state = KeepaliveState::Idle; *inner.keepalive_state = KeepaliveState::Idle;
return Ok(()); return Ok(());
} }
_ => {}
}
// The only response we expect in this state is to our keepalive request. // The only response we expect in this state is to our keepalive request.
bail!(ErrorInt::RtspFramingError { bail!(ErrorInt::RtspFramingError {
@ -2324,14 +2375,15 @@ impl futures::Stream for Session<Playing> {
self.0.keepalive_timer.as_mut().unwrap().as_mut().poll(cx), self.0.keepalive_timer.as_mut().unwrap().as_mut().poll(cx),
Poll::Ready(()) Poll::Ready(())
) { ) {
log::debug!("time for a keepalive");
self.as_mut().handle_keepalive_timer(cx)?; self.as_mut().handle_keepalive_timer(cx)?;
} }
// Then finish flushing the current keepalive if necessary. // Then finish flushing the current keepalive if necessary.
if let KeepaliveState::Flushing(cseq) = self.0.keepalive_state { if let KeepaliveState::Flushing { cseq, method } = self.0.keepalive_state {
match self.0.conn.as_mut().unwrap().inner.poll_flush_unpin(cx) { match self.0.conn.as_mut().unwrap().inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => self.0.keepalive_state = KeepaliveState::Waiting(cseq), Poll::Ready(Ok(())) => {
self.0.keepalive_state = KeepaliveState::Waiting { cseq, method }
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Arc::new(e))))), Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Arc::new(e))))),
Poll::Pending => {} Poll::Pending => {}
} }

View File

@ -670,6 +670,35 @@ pub(crate) fn parse_play(
Ok(()) Ok(())
} }
#[derive(Default)]
pub(crate) struct OptionsResponse {
pub(crate) set_parameter_supported: bool,
pub(crate) get_parameter_supported: bool,
}
/// Parses an `OPTIONS` response.
pub(crate) fn parse_options(
response: &rtsp_types::Response<Bytes>,
) -> Result<OptionsResponse, String> {
let mut interpreted = OptionsResponse::default();
// RTSP/1.0 OPTIONS method: https://tools.ietf.org/html/rfc2326#section-10.1
// HTTP/1.1 OPTIONS method: https://www.rfc-editor.org/rfc/rfc2616.html#section-9.2
// RTSP/1.0 Public header: https://www.rfc-editor.org/rfc/rfc2326.html#section-12.28
// HTTP/1.1 Public header: https://www.rfc-editor.org/rfc/rfc2068#section-14.35
if let Some(public) = response.header(&rtsp_types::headers::PUBLIC) {
for method in public.as_str().split(',') {
let method = method.trim();
match method {
"SET_PARAMETER" => interpreted.set_parameter_supported = true,
"GET_PARAMETER" => interpreted.get_parameter_supported = true,
_ => {}
}
}
}
Ok(interpreted)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::num::NonZeroU16; use std::num::NonZeroU16;
@ -813,6 +842,11 @@ mod tests {
_ => panic!(), _ => panic!(),
}; };
// The other streams don't get filled in because they're in state Uninit. // The other streams don't get filled in because they're in state Uninit.
// OPTIONS.
let opts =
super::parse_options(&response(include_bytes!("testdata/dahua_options.txt"))).unwrap();
assert!(opts.set_parameter_supported);
} }
#[test] #[test]

6
src/client/testdata/dahua_options.txt vendored Normal file
View File

@ -0,0 +1,6 @@
RTSP/1.0 200 OK
CSeq: 5
Session: 136527041864
Server: Rtsp Server/3.0
Public: OPTIONS, DESCRIBE, ANNOUNCE, SETUP, PLAY, RECORD, PAUSE, TEARDOWN, SET_PARAMETER, GET_PARAMETER