diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ea7fa..62869f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,12 @@ ## unreleased * Send keepalives at every half-session-timeout rather than a fixed 30-second - interval. This should improve compatibility with servers that have session - timeouts under 30 seconds. + interval. This allows persistent connections to servers that have timeouts + shorter than 30 seconds. +* Use `OPTIONS` for initial keepalive, and only switch to `SET_PARAMETER` if + the server advertises its support. This allows persistent connections to + `rtsp-simple-server` v0.19.3, which does not support the latter method and + drops the connection on receiving unsupported methods. ## `v0.4.0` (2022-05-17) diff --git a/src/client/mod.rs b/src/client/mod.rs index a04b8ce..bddd87d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -903,8 +903,26 @@ impl State for Described {} enum KeepaliveState { Idle, - Flushing(u32), - Waiting(u32), + Flushing { cseq: u32, method: KeepaliveMethod }, + Waiting { cseq: u32, method: KeepaliveMethod }, +} + +#[repr(u8)] +#[derive(Copy, Clone, Debug)] +enum KeepaliveMethod { + Options, + SetParameter, + GetParameter, +} + +impl From for rtsp_types::Method { + fn from(method: KeepaliveMethod) -> Self { + match method { + KeepaliveMethod::Options => rtsp_types::Method::Options, + KeepaliveMethod::SetParameter => rtsp_types::Method::SetParameter, + KeepaliveMethod::GetParameter => rtsp_types::Method::GetParameter, + } + } } /// State after a `PLAY`; use via `Session`. @@ -1015,6 +1033,14 @@ enum SessionFlag { /// Set if one or more streams are configured to use UDP. UdpStreams = 0x4, + + /// Set if an `OPTIONS` request has completed and advertised supported for + /// `SET_PARAMETER`. + SetParameterSupported = 0x8, + + /// Set if an `OPTIONS` request has completed and advertised supported for + /// `GET_PARAMETER`. + GetParameterSupported = 0x10, } impl RtspConnection { @@ -1888,7 +1914,7 @@ impl Session { .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?; // Expect the previous keepalive request to have finished. match inner.keepalive_state { - KeepaliveState::Flushing(cseq) => bail!(ErrorInt::WriteError { + KeepaliveState::Flushing { cseq, .. } => bail!(ErrorInt::WriteError { conn_ctx: *conn.inner.ctx(), source: std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -1898,7 +1924,7 @@ impl Session { ), ), }), - KeepaliveState::Waiting(cseq) => bail!(ErrorInt::RtspReadError { + KeepaliveState::Waiting { cseq, .. } => bail!(ErrorInt::RtspReadError { conn_ctx: *conn.inner.ctx(), msg_ctx: conn.inner.eof_ctx(), source: std::io::Error::new( @@ -1923,28 +1949,35 @@ impl Session { // Send a new keepalive and reset the timer. // // RTSP/1.0 (the version Retina implements) doesn't describe how to send - // a keepalive. For now, we're using a SET_PARAMETER with no body for - // keepalives, as recommended by the ONVIF Streaming Specification - // version 21.06 section 5.2.2.2 - // and - // RTSP/2.0. - // - // Note this doesn't work with unpatched rtsp-simple-server as of 2021-08-07. - // See discussion at . - // We could work with this version by trying OPTIONS first (and only switching - // to SET_PARAMETER/GET_PARAMETER if OPTIONS advertises the respective method). - let mut req = rtsp_types::Request::builder(Method::SetParameter, rtsp_types::Version::V1_0) + // a keepalive. The ONVIF Streaming Specification (in version 21.06 section + // 5.2.2.2 + // ) and + // RTSP/2.0 recommend using `SET_PARAMETER`. However, this method is optional, + // and some servers (e.g. rtsp-simple-server as of 2021-08-07) behave badly + // on receiving unsupported methods. See discussion at + // . Initially + // send `OPTIONS`, then follow recommendations to use (bodyless) + // `SET_PARAMETER` or `GET_PARAMETER` if available. + let method = if *inner.flags & (SessionFlag::SetParameterSupported as u8) != 0 { + KeepaliveMethod::SetParameter + } else if *inner.flags & (SessionFlag::GetParameterSupported as u8) != 0 { + KeepaliveMethod::GetParameter + } else { + KeepaliveMethod::Options + }; + let mut req = rtsp_types::Request::builder(method.into(), rtsp_types::Version::V1_0) .request_uri(inner.presentation.base_url.clone()) .header(rtsp_types::headers::SESSION, session.id.to_string()) .build(Bytes::new()); let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?; + trace!("sending {:?} keepalive", method); conn.inner .start_send_unpin(rtsp_types::Message::Request(req)) .expect("encoding is infallible"); *inner.keepalive_state = match conn.inner.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq), + Poll::Ready(Ok(())) => KeepaliveState::Waiting { cseq, method }, Poll::Ready(Err(e)) => bail!(e), - Poll::Pending => KeepaliveState::Flushing(cseq), + Poll::Pending => KeepaliveState::Flushing { cseq, method }, }; inner @@ -1962,20 +1995,38 @@ impl Session { response: rtsp_types::Response, ) -> Result<(), Error> { let inner = self.0.as_mut().project(); - if matches!(inner.keepalive_state, - KeepaliveState::Waiting(cseq) if parse::get_cseq(&response) == Some(*cseq)) - { - // We don't care if the keepalive response succeeds or fails, but we should - // log it, to help debugging if on failure the server doesn't extend the - // timeout or gets angry and closes the connection. (rtsp-simple-server - // does the latter as of 2022-08-07, though I'm told this will be fixed.) - if !response.status().is_success() { - warn!("keepalive failed with {:?}", response.status()); - } else { - trace!("keepalive succeeded with {:?}", response.status()); + match inner.keepalive_state { + KeepaliveState::Waiting { cseq, method } + if parse::get_cseq(&response) == Some(*cseq) => + { + // We don't care if the keepalive response succeeds or fails, but we should + // log it, to help debugging if on failure the server doesn't extend the + // timeout or gets angry and closes the connection. (rtsp-simple-server + // does the latter as of 2022-08-07, though I'm told this will be fixed.) + if !response.status().is_success() { + warn!("keepalive failed with {:?}", response.status()); + } else { + trace!("keepalive succeeded with {:?}", response.status()); + if matches!(method, KeepaliveMethod::Options) { + match parse::parse_options(&response) { + Ok(r) => { + if r.set_parameter_supported { + *inner.flags |= SessionFlag::SetParameterSupported as u8; + } + if r.get_parameter_supported { + *inner.flags |= SessionFlag::GetParameterSupported as u8; + } + } + Err(e) => { + warn!("Unable to parse OPTIONS response: {}", e); + } + } + } + } + *inner.keepalive_state = KeepaliveState::Idle; + return Ok(()); } - *inner.keepalive_state = KeepaliveState::Idle; - return Ok(()); + _ => {} } // The only response we expect in this state is to our keepalive request. @@ -2324,14 +2375,15 @@ impl futures::Stream for Session { self.0.keepalive_timer.as_mut().unwrap().as_mut().poll(cx), Poll::Ready(()) ) { - log::debug!("time for a keepalive"); self.as_mut().handle_keepalive_timer(cx)?; } // Then finish flushing the current keepalive if necessary. - if let KeepaliveState::Flushing(cseq) = self.0.keepalive_state { + if let KeepaliveState::Flushing { cseq, method } = self.0.keepalive_state { match self.0.conn.as_mut().unwrap().inner.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => self.0.keepalive_state = KeepaliveState::Waiting(cseq), + Poll::Ready(Ok(())) => { + self.0.keepalive_state = KeepaliveState::Waiting { cseq, method } + } Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Arc::new(e))))), Poll::Pending => {} } diff --git a/src/client/parse.rs b/src/client/parse.rs index 7070d3e..45a22c3 100644 --- a/src/client/parse.rs +++ b/src/client/parse.rs @@ -670,6 +670,35 @@ pub(crate) fn parse_play( Ok(()) } +#[derive(Default)] +pub(crate) struct OptionsResponse { + pub(crate) set_parameter_supported: bool, + pub(crate) get_parameter_supported: bool, +} + +/// Parses an `OPTIONS` response. +pub(crate) fn parse_options( + response: &rtsp_types::Response, +) -> Result { + let mut interpreted = OptionsResponse::default(); + + // RTSP/1.0 OPTIONS method: https://tools.ietf.org/html/rfc2326#section-10.1 + // HTTP/1.1 OPTIONS method: https://www.rfc-editor.org/rfc/rfc2616.html#section-9.2 + // RTSP/1.0 Public header: https://www.rfc-editor.org/rfc/rfc2326.html#section-12.28 + // HTTP/1.1 Public header: https://www.rfc-editor.org/rfc/rfc2068#section-14.35 + if let Some(public) = response.header(&rtsp_types::headers::PUBLIC) { + for method in public.as_str().split(',') { + let method = method.trim(); + match method { + "SET_PARAMETER" => interpreted.set_parameter_supported = true, + "GET_PARAMETER" => interpreted.get_parameter_supported = true, + _ => {} + } + } + } + Ok(interpreted) +} + #[cfg(test)] mod tests { use std::num::NonZeroU16; @@ -813,6 +842,11 @@ mod tests { _ => panic!(), }; // The other streams don't get filled in because they're in state Uninit. + + // OPTIONS. + let opts = + super::parse_options(&response(include_bytes!("testdata/dahua_options.txt"))).unwrap(); + assert!(opts.set_parameter_supported); } #[test] diff --git a/src/client/testdata/dahua_options.txt b/src/client/testdata/dahua_options.txt new file mode 100644 index 0000000..3d35329 --- /dev/null +++ b/src/client/testdata/dahua_options.txt @@ -0,0 +1,6 @@ +RTSP/1.0 200 OK +CSeq: 5 +Session: 136527041864 +Server: Rtsp Server/3.0 +Public: OPTIONS, DESCRIBE, ANNOUNCE, SETUP, PLAY, RECORD, PAUSE, TEARDOWN, SET_PARAMETER, GET_PARAMETER +