API for waiting out stale sessions
This commit is contained in:
parent
49612617a0
commit
86b5f449e9
@ -17,6 +17,7 @@ use log::{debug, trace, warn};
|
||||
use pin_project::pin_project;
|
||||
use rtsp_types::Method;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::Notify;
|
||||
use url::Url;
|
||||
|
||||
use crate::client::parse::SessionHeader;
|
||||
@ -86,7 +87,10 @@ struct StaleSession {
|
||||
/// Otherwise it might be useful to have one group per describe URL (potentially
|
||||
/// several per server) and have at most one active session per URL.
|
||||
#[derive(Default)]
|
||||
pub struct SessionGroup(Mutex<SessionGroupInner>);
|
||||
pub struct SessionGroup {
|
||||
sessions: Mutex<SessionGroupInner>,
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SessionGroupInner {
|
||||
@ -102,23 +106,28 @@ pub struct StaleSessionStatus {
|
||||
|
||||
/// The total number of sessions.
|
||||
pub num_sessions: usize,
|
||||
|
||||
/// The `SessionGroupInner::next_seqnum` value as of when this was created.
|
||||
next_seqnum: u64,
|
||||
}
|
||||
|
||||
impl SessionGroup {
|
||||
/// Returns the status of stale sessions in this group.
|
||||
///
|
||||
/// Currently this only returns information about sessions which may be in
|
||||
/// state `Playing`. That is, Retina has sent a `PLAY` request, regardless
|
||||
/// of whether it has received a response.
|
||||
/// state `Playing`. That is, ones for which Retina has either sent a
|
||||
/// `PLAY` request (regardless of whether it received a response) or
|
||||
/// discovered as described in [`SessionGroup`].
|
||||
///
|
||||
/// The caller might use this in a loop to sleep until there are no such
|
||||
/// sessions.
|
||||
/// The caller might use this in a loop with `await_stale_sessions` to sleep
|
||||
/// until there are no such sessions, logging updates
|
||||
pub fn stale_sessions(&self) -> StaleSessionStatus {
|
||||
let l = self.0.lock().unwrap();
|
||||
let l = self.sessions.lock().unwrap();
|
||||
let playing = l.sessions.iter().filter(|s| s.maybe_playing);
|
||||
StaleSessionStatus {
|
||||
max_expires: playing.clone().map(|s| s.expires).max(),
|
||||
num_sessions: playing.count(),
|
||||
next_seqnum: l.next_seqnum,
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,7 +148,7 @@ impl SessionGroup {
|
||||
pub async fn await_teardown(&self) -> Result<(), Error> {
|
||||
let mut watches: Vec<_>;
|
||||
{
|
||||
let l = self.0.lock().unwrap();
|
||||
let l = self.sessions.lock().unwrap();
|
||||
watches = l
|
||||
.sessions
|
||||
.iter()
|
||||
@ -166,6 +175,23 @@ impl SessionGroup {
|
||||
overall_result
|
||||
}
|
||||
|
||||
/// Waits for all of the sessions described by `status` to expire or be torn down.
|
||||
pub async fn await_stale_sessions(&self, status: &StaleSessionStatus) {
|
||||
loop {
|
||||
let notified = self.notify.notified();
|
||||
let l = self.sessions.lock().unwrap();
|
||||
let all_gone = l
|
||||
.sessions
|
||||
.iter()
|
||||
.all(|s| !s.maybe_playing || s.seqnum >= status.next_seqnum);
|
||||
drop(l);
|
||||
if all_gone {
|
||||
return;
|
||||
}
|
||||
notified.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Notes an unexpected RTSP interleaved data message.
|
||||
///
|
||||
/// This is assumed to be due to a live555 RTP/AVP/TCP session that belonged
|
||||
@ -173,7 +199,7 @@ impl SessionGroup {
|
||||
/// explains this, adds an unknown session with live555's default timeout,
|
||||
/// 65 seconds.
|
||||
fn note_stale_live555_data(&self) {
|
||||
let mut lock = self.0.lock().unwrap();
|
||||
let mut lock = self.sessions.lock().unwrap();
|
||||
for s in &lock.sessions {
|
||||
if s.is_tcp {
|
||||
// This session plausibly explains the packet.
|
||||
@ -1725,7 +1751,7 @@ impl PinnedDrop for SessionInner {
|
||||
// Track the session, if there is a group.
|
||||
let (teardown_tx, teardown_rx) = tokio::sync::watch::channel(None);
|
||||
let seqnum = if let Some(session_group) = this.options.session_group.as_ref() {
|
||||
let mut lock = session_group.0.lock().unwrap();
|
||||
let mut lock = session_group.sessions.lock().unwrap();
|
||||
let seqnum = lock.next_seqnum;
|
||||
lock.next_seqnum += 1;
|
||||
lock.sessions.push(StaleSession {
|
||||
|
@ -43,11 +43,13 @@ pub(super) async fn background_teardown(
|
||||
}
|
||||
if let Some(ref session_group) = options.session_group {
|
||||
let seqnum = seqnum.expect("seqnum specified when session_group exists");
|
||||
let mut l = session_group.0.lock().unwrap();
|
||||
let mut l = session_group.sessions.lock().unwrap();
|
||||
let i = l.sessions.iter().position(|s| s.seqnum == seqnum);
|
||||
match i {
|
||||
Some(i) => {
|
||||
l.sessions.swap_remove(i);
|
||||
drop(l);
|
||||
session_group.notify.notify_waiters();
|
||||
}
|
||||
None => log::warn!("Unable to find session {:?} on TEARDOWN", &*session_id),
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user