From 3153355807ab1944b3fdd51b04ef2d8e644a3be6 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Tue, 12 Apr 2022 13:22:44 -0700 Subject: [PATCH] clarify tokio runtime expectations --- CHANGELOG.md | 1 + benches/depacketize.rs | 1 - fuzz/Cargo.lock | 6 ++--- src/client/mod.rs | 58 ++++++++++++++---------------------------- src/client/rtp.rs | 15 ++--------- 5 files changed, 25 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ebab05..1373807 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ [scottlamb/moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issue-1190760423). * camera interop: allow ignoring RTSP interleaved data messages on unassigned channels, also described at [scottlamb-moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093). +* clarify `Session`'s expectations for tokio runtimes. ## `v0.3.8` (2022-03-08) diff --git a/benches/depacketize.rs b/benches/depacketize.rs index d87decb..7de1a59 100644 --- a/benches/depacketize.rs +++ b/benches/depacketize.rs @@ -53,7 +53,6 @@ fn h264_aac ()>(mut f: F) { &conn_ctx, &pkt_ctx, &mut timelines[stream_id], - None, stream_id, data, ) { diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index e46a26d..3ce3fe5 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -530,7 +530,7 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "retina" -version = "0.3.7" +version = "0.3.8" dependencies = [ "base64", "bitreader", @@ -595,9 +595,9 @@ dependencies = [ [[package]] name = "sdp-types" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae499f6886cff026ebd8355c8f67a1881cd15f23ce89de4aab13588cf52142dd" +checksum = "1a627289e3b09f15ef88a28b90d6eca3b7eac332b6ffb34c1af290aa956d4ab9" dependencies = [ "bstr", "fallible-iterator", diff --git a/src/client/mod.rs b/src/client/mod.rs index c720de7..6985860 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -197,6 +197,11 @@ impl SessionGroup { /// a `TEARDOWN` without knowing the session id. If desired, the caller can /// learn of the existence of the sessions through /// [`SessionGroup::stale_sessions`] and sleep until they expire. + /// + /// ## Panics + /// + /// If the `TEARDOWN` was initiated from a tokio runtime which has since + /// shut down. pub async fn await_teardown(&self) -> Result<(), Error> { let mut watches: Vec<_>; { @@ -214,9 +219,10 @@ impl SessionGroup { if r.is_none() { // An attempt hasn't finished yet. Wait for it. - w.changed() - .await - .expect("teardown Sender shouldn't be dropped"); + w.changed().await.expect( + "teardown Sender shouldn't be dropped; \ + ensure the Session's tokio runtime is still alive", + ); r = (*w.borrow()).clone(); } @@ -892,6 +898,12 @@ enum ResponseMode { /// `[SessionOptions::teardown`] parameter. /// 6. Possibly wait for `TEARDOWN` to complete; see /// [`SessionOptions::session_group`] and [`SessionGroup::await_teardown`]. +/// +/// ## tokio runtime +/// +/// All `Session` operations are currently expected to be performed from +/// "within" a tokio runtime with both time and I/O resource drivers enabled. +/// Operations may panic or fail otherwise. pub struct Session(Pin>, S); #[pin_project(PinnedDrop)] @@ -901,10 +913,6 @@ struct SessionInner { /// exception is during drop. conn: Option, - /// A handle to the tokio runtime which created this session. It will - /// be used to asynchronously send a `TEARDOWN` on drop. - runtime_handle: Option, - options: SessionOptions, requested_auth: Option, presentation: Presentation, @@ -1152,7 +1160,7 @@ impl RtspConnection { }; if live555 { - note_stale_live555_data(tokio::runtime::Handle::try_current().ok(), tool, options); + note_stale_live555_data(tool, options); } let channel_id = data.channel_id(); @@ -1261,7 +1269,6 @@ impl Session { Box::pin(SessionInner { conn: Some(conn), options, - runtime_handle: tokio::runtime::Handle::try_current().ok(), requested_auth, presentation, session: None, @@ -1620,11 +1627,7 @@ impl Session { /// to a since-closed RTSP connection, as described in case 2 of "Stale sessions" /// at [`SessionGroup`]. If there's no known session which explains this, /// adds an unknown session with live555's default timeout. -fn note_stale_live555_data( - handle: Option, - tool: Option<&Tool>, - options: &SessionOptions, -) { +fn note_stale_live555_data(tool: Option<&Tool>, options: &SessionOptions) { let known_to_have_live555_tcp_bug = tool.map(Tool::has_live555_tcp_bug).unwrap_or(false); if !known_to_have_live555_tcp_bug { log::warn!( @@ -1690,15 +1693,8 @@ fn note_stale_live555_data( // set a deadline within await_stale_sessions() calls, which might be // a bit more efficient. But this is simpler given that we already are // spawning tasks for stale sessions created from Session's Drop impl. - let handle = match handle { - Some(h) => h, - None => { - log::warn!("Unable to launch task to clean up stale live555 file descriptor session"); - return; - } - }; let group = group.clone(); - handle.spawn(async move { + tokio::spawn(async move { tokio::time::sleep_until(expires).await; if !group.try_remove_seqnum(seqnum) { log::warn!( @@ -1957,7 +1953,6 @@ impl Session { conn.inner.ctx(), &pkt_ctx, timeline, - inner.runtime_handle.as_ref(), m.stream_i, data.into_body(), )?), @@ -1967,7 +1962,6 @@ impl Session { inner.presentation.tool.as_ref(), &pkt_ctx, timeline, - inner.runtime_handle.as_ref(), m.stream_i, data.into_body(), ) { @@ -2026,7 +2020,6 @@ impl Session { inner.presentation.tool.as_ref(), &pkt_ctx, timeline, - inner.runtime_handle.as_ref(), i, msg, ) { @@ -2066,7 +2059,6 @@ impl Session { conn_ctx, &pkt_ctx, timeline, - inner.runtime_handle.as_ref(), i, msg, ) { @@ -2179,19 +2171,7 @@ impl PinnedDrop for SessionInner { None }; - let handle = match this.runtime_handle.take() { - Some(h) => h, - None => { - const MSG: &str = "Unable to start async TEARDOWN because describe wasn't called \ - from a tokio runtime"; - log::warn!("{}", MSG); - let _ = - teardown_tx.send(Some(Err(wrap!(ErrorInt::FailedPrecondition(MSG.into()))))); - return; - } - }; - - handle.spawn(teardown::background_teardown( + tokio::spawn(teardown::background_teardown( seqnum, this.presentation.base_url.clone(), this.presentation.tool.take(), diff --git a/src/client/rtp.rs b/src/client/rtp.rs index 55450c9..ce163a6 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -95,7 +95,6 @@ impl InorderParser { conn_ctx: &ConnectionContext, pkt_ctx: &PacketContext, timeline: &mut Timeline, - runtime_handle: Option<&tokio::runtime::Handle>, stream_id: usize, mut data: Bytes, ) -> Result, Error> { @@ -129,7 +128,7 @@ impl InorderParser { let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); if matches!(self.ssrc, Some(s) if s != ssrc) { if matches!(session_options.transport, super::Transport::Tcp) { - super::note_stale_live555_data(runtime_handle.cloned(), tool, session_options); + super::note_stale_live555_data(tool, session_options); } bail!(ErrorInt::RtpPacketError { conn_ctx: *conn_ctx, @@ -211,7 +210,6 @@ impl InorderParser { tool: Option<&super::Tool>, pkt_ctx: &PacketContext, timeline: &mut Timeline, - runtime_handle: Option<&tokio::runtime::Handle>, stream_id: usize, data: Bytes, ) -> Result, String> { @@ -237,11 +235,7 @@ impl InorderParser { let ssrc = pkt.ssrc(); if matches!(self.ssrc, Some(s) if s != ssrc) { if matches!(session_options.transport, super::Transport::Tcp) { - super::note_stale_live555_data( - runtime_handle.cloned(), - tool, - session_options, - ); + super::note_stale_live555_data(tool, session_options); } return Err(format!( "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", @@ -285,7 +279,6 @@ mod tests { &ConnectionContext::dummy(), &PacketContext::dummy(), &mut timeline, - None, 0, rtp_rs::RtpPacketBuilder::new() .payload_type(105) @@ -309,7 +302,6 @@ mod tests { &ConnectionContext::dummy(), &PacketContext::dummy(), &mut timeline, - None, 0, rtp_rs::RtpPacketBuilder::new() .payload_type(50) @@ -339,7 +331,6 @@ mod tests { &ConnectionContext::dummy(), &PacketContext::dummy(), &mut timeline, - None, 0, rtp_rs::RtpPacketBuilder::new() .payload_type(96) @@ -364,7 +355,6 @@ mod tests { &ConnectionContext::dummy(), &PacketContext::dummy(), &mut timeline, - None, 0, rtp_rs::RtpPacketBuilder::new() .payload_type(96) @@ -387,7 +377,6 @@ mod tests { &ConnectionContext::dummy(), &PacketContext::dummy(), &mut timeline, - None, 0, rtp_rs::RtpPacketBuilder::new() .payload_type(96)