clarify tokio runtime expectations

This commit is contained in:
Scott Lamb 2022-04-12 13:22:44 -07:00
parent 31515887de
commit 3153355807
5 changed files with 25 additions and 56 deletions

View File

@ -5,6 +5,7 @@
[scottlamb/moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issue-1190760423). [scottlamb/moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issue-1190760423).
* camera interop: allow ignoring RTSP interleaved data messages on unassigned * camera interop: allow ignoring RTSP interleaved data messages on unassigned
channels, also described at [scottlamb-moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093). channels, also described at [scottlamb-moonfire-nvr#213](https://github.com/scottlamb/moonfire-nvr/issues/213#issuecomment-1089411093).
* clarify `Session`'s expectations for tokio runtimes.
## `v0.3.8` (2022-03-08) ## `v0.3.8` (2022-03-08)

View File

@ -53,7 +53,6 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
&conn_ctx, &conn_ctx,
&pkt_ctx, &pkt_ctx,
&mut timelines[stream_id], &mut timelines[stream_id],
None,
stream_id, stream_id,
data, data,
) { ) {

6
fuzz/Cargo.lock generated
View File

@ -530,7 +530,7 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]] [[package]]
name = "retina" name = "retina"
version = "0.3.7" version = "0.3.8"
dependencies = [ dependencies = [
"base64", "base64",
"bitreader", "bitreader",
@ -595,9 +595,9 @@ dependencies = [
[[package]] [[package]]
name = "sdp-types" name = "sdp-types"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae499f6886cff026ebd8355c8f67a1881cd15f23ce89de4aab13588cf52142dd" checksum = "1a627289e3b09f15ef88a28b90d6eca3b7eac332b6ffb34c1af290aa956d4ab9"
dependencies = [ dependencies = [
"bstr", "bstr",
"fallible-iterator", "fallible-iterator",

View File

@ -197,6 +197,11 @@ impl SessionGroup {
/// a `TEARDOWN` without knowing the session id. If desired, the caller can /// a `TEARDOWN` without knowing the session id. If desired, the caller can
/// learn of the existence of the sessions through /// learn of the existence of the sessions through
/// [`SessionGroup::stale_sessions`] and sleep until they expire. /// [`SessionGroup::stale_sessions`] and sleep until they expire.
///
/// ## Panics
///
/// If the `TEARDOWN` was initiated from a tokio runtime which has since
/// shut down.
pub async fn await_teardown(&self) -> Result<(), Error> { pub async fn await_teardown(&self) -> Result<(), Error> {
let mut watches: Vec<_>; let mut watches: Vec<_>;
{ {
@ -214,9 +219,10 @@ impl SessionGroup {
if r.is_none() { if r.is_none() {
// An attempt hasn't finished yet. Wait for it. // An attempt hasn't finished yet. Wait for it.
w.changed() w.changed().await.expect(
.await "teardown Sender shouldn't be dropped; \
.expect("teardown Sender shouldn't be dropped"); ensure the Session's tokio runtime is still alive",
);
r = (*w.borrow()).clone(); r = (*w.borrow()).clone();
} }
@ -892,6 +898,12 @@ enum ResponseMode {
/// `[SessionOptions::teardown`] parameter. /// `[SessionOptions::teardown`] parameter.
/// 6. Possibly wait for `TEARDOWN` to complete; see /// 6. Possibly wait for `TEARDOWN` to complete; see
/// [`SessionOptions::session_group`] and [`SessionGroup::await_teardown`]. /// [`SessionOptions::session_group`] and [`SessionGroup::await_teardown`].
///
/// ## tokio runtime
///
/// All `Session` operations are currently expected to be performed from
/// "within" a tokio runtime with both time and I/O resource drivers enabled.
/// Operations may panic or fail otherwise.
pub struct Session<S: State>(Pin<Box<SessionInner>>, S); pub struct Session<S: State>(Pin<Box<SessionInner>>, S);
#[pin_project(PinnedDrop)] #[pin_project(PinnedDrop)]
@ -901,10 +913,6 @@ struct SessionInner {
/// exception is during drop. /// exception is during drop.
conn: Option<RtspConnection>, conn: Option<RtspConnection>,
/// A handle to the tokio runtime which created this session. It will
/// be used to asynchronously send a `TEARDOWN` on drop.
runtime_handle: Option<tokio::runtime::Handle>,
options: SessionOptions, options: SessionOptions,
requested_auth: Option<http_auth::PasswordClient>, requested_auth: Option<http_auth::PasswordClient>,
presentation: Presentation, presentation: Presentation,
@ -1152,7 +1160,7 @@ impl RtspConnection {
}; };
if live555 { if live555 {
note_stale_live555_data(tokio::runtime::Handle::try_current().ok(), tool, options); note_stale_live555_data(tool, options);
} }
let channel_id = data.channel_id(); let channel_id = data.channel_id();
@ -1261,7 +1269,6 @@ impl Session<Described> {
Box::pin(SessionInner { Box::pin(SessionInner {
conn: Some(conn), conn: Some(conn),
options, options,
runtime_handle: tokio::runtime::Handle::try_current().ok(),
requested_auth, requested_auth,
presentation, presentation,
session: None, session: None,
@ -1620,11 +1627,7 @@ impl Session<Described> {
/// to a since-closed RTSP connection, as described in case 2 of "Stale sessions" /// to a since-closed RTSP connection, as described in case 2 of "Stale sessions"
/// at [`SessionGroup`]. If there's no known session which explains this, /// at [`SessionGroup`]. If there's no known session which explains this,
/// adds an unknown session with live555's default timeout. /// adds an unknown session with live555's default timeout.
fn note_stale_live555_data( fn note_stale_live555_data(tool: Option<&Tool>, options: &SessionOptions) {
handle: Option<tokio::runtime::Handle>,
tool: Option<&Tool>,
options: &SessionOptions,
) {
let known_to_have_live555_tcp_bug = tool.map(Tool::has_live555_tcp_bug).unwrap_or(false); let known_to_have_live555_tcp_bug = tool.map(Tool::has_live555_tcp_bug).unwrap_or(false);
if !known_to_have_live555_tcp_bug { if !known_to_have_live555_tcp_bug {
log::warn!( log::warn!(
@ -1690,15 +1693,8 @@ fn note_stale_live555_data(
// set a deadline within await_stale_sessions() calls, which might be // set a deadline within await_stale_sessions() calls, which might be
// a bit more efficient. But this is simpler given that we already are // a bit more efficient. But this is simpler given that we already are
// spawning tasks for stale sessions created from Session's Drop impl. // spawning tasks for stale sessions created from Session's Drop impl.
let handle = match handle {
Some(h) => h,
None => {
log::warn!("Unable to launch task to clean up stale live555 file descriptor session");
return;
}
};
let group = group.clone(); let group = group.clone();
handle.spawn(async move { tokio::spawn(async move {
tokio::time::sleep_until(expires).await; tokio::time::sleep_until(expires).await;
if !group.try_remove_seqnum(seqnum) { if !group.try_remove_seqnum(seqnum) {
log::warn!( log::warn!(
@ -1957,7 +1953,6 @@ impl Session<Playing> {
conn.inner.ctx(), conn.inner.ctx(),
&pkt_ctx, &pkt_ctx,
timeline, timeline,
inner.runtime_handle.as_ref(),
m.stream_i, m.stream_i,
data.into_body(), data.into_body(),
)?), )?),
@ -1967,7 +1962,6 @@ impl Session<Playing> {
inner.presentation.tool.as_ref(), inner.presentation.tool.as_ref(),
&pkt_ctx, &pkt_ctx,
timeline, timeline,
inner.runtime_handle.as_ref(),
m.stream_i, m.stream_i,
data.into_body(), data.into_body(),
) { ) {
@ -2026,7 +2020,6 @@ impl Session<Playing> {
inner.presentation.tool.as_ref(), inner.presentation.tool.as_ref(),
&pkt_ctx, &pkt_ctx,
timeline, timeline,
inner.runtime_handle.as_ref(),
i, i,
msg, msg,
) { ) {
@ -2066,7 +2059,6 @@ impl Session<Playing> {
conn_ctx, conn_ctx,
&pkt_ctx, &pkt_ctx,
timeline, timeline,
inner.runtime_handle.as_ref(),
i, i,
msg, msg,
) { ) {
@ -2179,19 +2171,7 @@ impl PinnedDrop for SessionInner {
None None
}; };
let handle = match this.runtime_handle.take() { tokio::spawn(teardown::background_teardown(
Some(h) => h,
None => {
const MSG: &str = "Unable to start async TEARDOWN because describe wasn't called \
from a tokio runtime";
log::warn!("{}", MSG);
let _ =
teardown_tx.send(Some(Err(wrap!(ErrorInt::FailedPrecondition(MSG.into())))));
return;
}
};
handle.spawn(teardown::background_teardown(
seqnum, seqnum,
this.presentation.base_url.clone(), this.presentation.base_url.clone(),
this.presentation.tool.take(), this.presentation.tool.take(),

View File

@ -95,7 +95,6 @@ impl InorderParser {
conn_ctx: &ConnectionContext, conn_ctx: &ConnectionContext,
pkt_ctx: &PacketContext, pkt_ctx: &PacketContext,
timeline: &mut Timeline, timeline: &mut Timeline,
runtime_handle: Option<&tokio::runtime::Handle>,
stream_id: usize, stream_id: usize,
mut data: Bytes, mut data: Bytes,
) -> Result<Option<PacketItem>, Error> { ) -> Result<Option<PacketItem>, Error> {
@ -129,7 +128,7 @@ impl InorderParser {
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
if matches!(self.ssrc, Some(s) if s != ssrc) { if matches!(self.ssrc, Some(s) if s != ssrc) {
if matches!(session_options.transport, super::Transport::Tcp) { if matches!(session_options.transport, super::Transport::Tcp) {
super::note_stale_live555_data(runtime_handle.cloned(), tool, session_options); super::note_stale_live555_data(tool, session_options);
} }
bail!(ErrorInt::RtpPacketError { bail!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx, conn_ctx: *conn_ctx,
@ -211,7 +210,6 @@ impl InorderParser {
tool: Option<&super::Tool>, tool: Option<&super::Tool>,
pkt_ctx: &PacketContext, pkt_ctx: &PacketContext,
timeline: &mut Timeline, timeline: &mut Timeline,
runtime_handle: Option<&tokio::runtime::Handle>,
stream_id: usize, stream_id: usize,
data: Bytes, data: Bytes,
) -> Result<Option<PacketItem>, String> { ) -> Result<Option<PacketItem>, String> {
@ -237,11 +235,7 @@ impl InorderParser {
let ssrc = pkt.ssrc(); let ssrc = pkt.ssrc();
if matches!(self.ssrc, Some(s) if s != ssrc) { if matches!(self.ssrc, Some(s) if s != ssrc) {
if matches!(session_options.transport, super::Transport::Tcp) { if matches!(session_options.transport, super::Transport::Tcp) {
super::note_stale_live555_data( super::note_stale_live555_data(tool, session_options);
runtime_handle.cloned(),
tool,
session_options,
);
} }
return Err(format!( return Err(format!(
"Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}",
@ -285,7 +279,6 @@ mod tests {
&ConnectionContext::dummy(), &ConnectionContext::dummy(),
&PacketContext::dummy(), &PacketContext::dummy(),
&mut timeline, &mut timeline,
None,
0, 0,
rtp_rs::RtpPacketBuilder::new() rtp_rs::RtpPacketBuilder::new()
.payload_type(105) .payload_type(105)
@ -309,7 +302,6 @@ mod tests {
&ConnectionContext::dummy(), &ConnectionContext::dummy(),
&PacketContext::dummy(), &PacketContext::dummy(),
&mut timeline, &mut timeline,
None,
0, 0,
rtp_rs::RtpPacketBuilder::new() rtp_rs::RtpPacketBuilder::new()
.payload_type(50) .payload_type(50)
@ -339,7 +331,6 @@ mod tests {
&ConnectionContext::dummy(), &ConnectionContext::dummy(),
&PacketContext::dummy(), &PacketContext::dummy(),
&mut timeline, &mut timeline,
None,
0, 0,
rtp_rs::RtpPacketBuilder::new() rtp_rs::RtpPacketBuilder::new()
.payload_type(96) .payload_type(96)
@ -364,7 +355,6 @@ mod tests {
&ConnectionContext::dummy(), &ConnectionContext::dummy(),
&PacketContext::dummy(), &PacketContext::dummy(),
&mut timeline, &mut timeline,
None,
0, 0,
rtp_rs::RtpPacketBuilder::new() rtp_rs::RtpPacketBuilder::new()
.payload_type(96) .payload_type(96)
@ -387,7 +377,6 @@ mod tests {
&ConnectionContext::dummy(), &ConnectionContext::dummy(),
&PacketContext::dummy(), &PacketContext::dummy(),
&mut timeline, &mut timeline,
None,
0, 0,
rtp_rs::RtpPacketBuilder::new() rtp_rs::RtpPacketBuilder::new()
.payload_type(96) .payload_type(96)