directly implement Stream on Session<Playing>

The new implementation has several benefits:

*   it's more correct, in a way that probably doesn't matter now but
    will if we ever send ONVIF backchannel audio. See the removed
    comment about deadlock possibility. Similarly, I realized that if
    packets became readable halfway through flushing a keepalive, the
    keepalive future would be improperly dropped as described at
    the beginning of this blog post:
    https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/

*   we'll be able to make other method calls on the Session while using
    the stream, eg sending audio with ONVIF backchannel

*   it's 8% faster according to the client benchmark.

*   it doesn't pull in the async-stream dependency anymore.

*   we can name the type of the stream, avoiding a Box in some cases.
This commit is contained in:
Scott Lamb 2021-06-28 13:36:02 -07:00
parent dacbca8c28
commit 48f2c2eb15
4 changed files with 178 additions and 128 deletions

View File

@ -1,10 +1,12 @@
## unreleased
* BREAKING CHANGE: `Session<Playing>` now directly implements `Stream` instead of
through `pkts()`.
* Performance improvements.
## v0.0.2 (2021-06-25)
* Video frames are now provided as a single, contiguous `Bytes`, and
* BREAKING CHANGE: Video frames are now provided as a single, contiguous `Bytes`, and
H.264 depacketization is more efficient ([#4](https://github.com/scottlamb/retina/issues/4)).
## v0.0.1 (2021-06-09)

View File

@ -81,7 +81,7 @@ fn make_test_data(max_payload_size: u16) -> Bytes {
let mut pkt_buf = vec![0; 65536];
for _ in 0..30 {
for &f in &frame_sizes {
&dummy_frame[0..4].copy_from_slice(&f.to_be_bytes()[..]);
dummy_frame[0..4].copy_from_slice(&f.to_be_bytes()[..]);
let frame = Bytes::copy_from_slice(&dummy_frame[..(usize::try_from(f).unwrap() + 4)]);
p.push(timestamp, frame).unwrap();
while let Some(pkt) = p.pull().unwrap() {

View File

@ -2,14 +2,14 @@
// SPDX-License-Identifier: MIT OR Apache-2.0
use std::num::NonZeroU32;
use std::task::Poll;
use std::{borrow::Cow, fmt::Debug, num::NonZeroU16, pin::Pin};
use self::channel_mapping::*;
pub use self::timeline::Timeline;
use async_stream::try_stream;
use bytes::Bytes;
use failure::{bail, format_err, Error};
use futures::{SinkExt, StreamExt};
use futures::{ready, Future, SinkExt, StreamExt};
use log::{debug, trace, warn};
use pin_project::pin_project;
use sdp::session_description::SessionDescription;
@ -252,13 +252,19 @@ pub struct Described {
}
impl State for Described {}
enum KeepaliveState {
Idle,
Flushing(u32),
Waiting(u32),
}
/// State after a `PLAY`.
#[pin_project(project = PlayingProj)]
pub struct Playing {
presentation: Presentation,
session_id: String,
channels: ChannelMappings,
pending_keepalive_cseq: Option<u32>,
keepalive_state: KeepaliveState,
#[pin]
keepalive_timer: tokio::time::Sleep,
@ -335,7 +341,10 @@ impl RtspConnection {
req: &mut rtsp_types::Request<Bytes>,
) -> Result<rtsp_types::Response<Bytes>, Error> {
loop {
let cseq = self.send_nowait(req).await?;
let cseq = self.fill_req(req)?;
self.stream
.send(rtsp_types::Message::Request(req.clone()))
.await?;
let msg = self
.stream
.next()
@ -390,8 +399,8 @@ impl RtspConnection {
}
}
/// Sends a request without waiting for a response, returning the `CSeq`.
async fn send_nowait(&mut self, req: &mut rtsp_types::Request<Bytes>) -> Result<u32, Error> {
/// Fills out `req` with authorization and `CSeq` headers.
fn fill_req(&mut self, req: &mut rtsp_types::Request<Bytes>) -> Result<u32, Error> {
let cseq = self.next_cseq;
self.next_cseq += 1;
match (self.requested_auth.as_mut(), self.creds.as_ref()) {
@ -413,9 +422,6 @@ impl RtspConnection {
}
req.insert_header(rtsp_types::headers::CSEQ, cseq.to_string());
req.insert_header(rtsp_types::headers::USER_AGENT, self.user_agent.clone());
self.stream
.send(rtsp_types::Message::Request(req.clone()))
.await?;
Ok(cseq)
}
}
@ -604,8 +610,8 @@ impl Session<Described> {
presentation: self.state.presentation,
session_id,
channels: self.state.channels,
keepalive_state: KeepaliveState::Idle,
keepalive_timer: tokio::time::sleep(KEEPALIVE_DURATION),
pending_keepalive_cseq: None,
},
})
}
@ -617,18 +623,6 @@ pub enum PacketItem {
}
impl Session<Playing> {
/// Returns a stream of packets.
pub fn pkts(self) -> impl futures::Stream<Item = Result<PacketItem, Error>> {
try_stream! {
let self_ = self;
tokio::pin!(self_);
while let Some(pkt) = self_.as_mut().next().await {
let pkt = pkt?;
yield pkt;
}
}
}
pub fn demuxed(
mut self,
) -> Result<impl futures::Stream<Item = Result<CodecItem, Error>>, Error> {
@ -639,117 +633,56 @@ impl Session<Playing> {
}
}
}
Ok(try_stream! {
let self_ = self;
tokio::pin!(self_);
while let Some(pkt) = self_.as_mut().next().await {
let pkt = pkt?;
match pkt {
PacketItem::RtpPacket(p) => {
let self_ = self_.as_mut().project();
let state = self_.state.project();
let depacketizer = match &mut state.presentation.streams[p.stream_id].depacketizer {
Ok(d) => d,
Err(_) => unreachable!("depacketizer was Ok"),
};
depacketizer.push(p)?;
while let Some(demuxed) = depacketizer.pull()? {
yield demuxed;
}
},
PacketItem::SenderReport(p) => yield CodecItem::SenderReport(p),
};
}
Ok(Demuxed {
state: DemuxedState::Waiting,
session: self,
})
}
/// Returns the next packet, an error, or `None` on end of stream.
/// Also manages keepalives; this will send them as necessary to keep the
/// stream open, and fail when sending a following keepalive if the
/// previous one was never acknowledged.
///
/// TODO: this should also pass along RTCP packets. There can be multiple
/// RTCP packets per data message, so that will require keeping more state.
async fn next(self: Pin<&mut Self>) -> Option<Result<PacketItem, Error>> {
let this = self.project();
let mut state = this.state.project();
loop {
tokio::select! {
// Prefer receiving data to sending keepalives. If we can't keep
// up with the server's data stream, it probably should drop us.
biased;
msg = this.conn.stream.next() => {
let msg = match msg {
Some(Ok(m)) => m,
Some(Err(e)) => return Some(Err(e)),
None => return None,
};
match msg.msg {
rtsp_types::Message::Data(data) => {
match Session::handle_data(&mut state, msg.ctx, data) {
Err(e) => return Some(Err(e)),
Ok(Some(pkt)) => return Some(Ok(pkt)),
Ok(None) => continue,
};
},
rtsp_types::Message::Response(response) => {
if let Err(e) = Session::handle_response(&mut state, response) {
return Some(Err(e));
}
},
rtsp_types::Message::Request(request) => {
warn!("Received RTSP request in Playing state. Responding unimplemented.\n{:#?}",
request);
},
}
},
() = &mut state.keepalive_timer => {
// TODO: deadlock possibility. Once we decide to send a
// keepalive, we don't try receiving anything until the
// keepalive is fully sent. The server might similarly be
// stubbornly trying to send before receiving. If all the
// socket buffers are full, deadlock can result.
//
// This is really unlikely right now when all we send are
// keepalives, which are probably much smaller than our send
// buffer. But if we start supporting ONVIF backchannel, it
// will become more of a concern.
if let Err(e) = Session::handle_keepalive_timer(this.conn, &mut state).await {
return Some(Err(e));
}
},
}
}
}
async fn handle_keepalive_timer(
fn handle_keepalive_timer(
conn: &mut RtspConnection,
state: &mut PlayingProj<'_>,
cx: &mut std::task::Context<'_>,
) -> Result<(), Error> {
// Check on the previous keepalive request.
if let Some(cseq) = state.pending_keepalive_cseq {
bail!(
// Expect the previous keepalive request to have finished.
match state.keepalive_state {
KeepaliveState::Flushing(cseq) => bail!(
"Unable to write keepalive {} within {:?}",
cseq,
KEEPALIVE_DURATION,
),
KeepaliveState::Waiting(cseq) => bail!(
"Server failed to respond to keepalive {} within {:?}",
cseq,
KEEPALIVE_DURATION
);
KEEPALIVE_DURATION,
),
KeepaliveState::Idle => {}
}
// Currently the only outbound data should be keepalives, and the previous one
// has already been flushed, so there's no reason the Sink shouldn't be ready.
if matches!(conn.stream.poll_ready_unpin(cx), Poll::Pending) {
bail!("Unexpectedly not ready to send keepalive");
}
// Send a new one and reset the timer.
*state.pending_keepalive_cseq = Some(
conn.send_nowait(
&mut rtsp_types::Request::builder(
let mut req = rtsp_types::Request::builder(
rtsp_types::Method::GetParameter,
rtsp_types::Version::V1_0,
)
.request_uri(state.presentation.base_url.clone())
.header(rtsp_types::headers::SESSION, state.session_id.clone())
.build(Bytes::new()),
)
.await?,
);
.build(Bytes::new());
let cseq = conn.fill_req(&mut req)?;
conn.stream
.start_send_unpin(rtsp_types::Message::Request(req))
.expect("encoding is infallible");
*state.keepalive_state = match conn.stream.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => KeepaliveState::Waiting(cseq),
Poll::Ready(Err(e)) => return Err(e),
Poll::Pending => KeepaliveState::Flushing(cseq),
};
state
.keepalive_timer
.as_mut()
@ -761,11 +694,11 @@ impl Session<Playing> {
state: &mut PlayingProj<'_>,
response: rtsp_types::Response<Bytes>,
) -> Result<(), Error> {
if matches!(*state.pending_keepalive_cseq,
Some(cseq) if parse::get_cseq(&response) == Some(cseq))
if matches!(*state.keepalive_state,
KeepaliveState::Waiting(cseq) if parse::get_cseq(&response) == Some(cseq))
{
// We don't care if the keepalive response succeeds or fails. Just mark complete.
*state.pending_keepalive_cseq = None;
*state.keepalive_state = KeepaliveState::Idle;
return Ok(());
}
@ -808,3 +741,115 @@ impl Session<Playing> {
&self.state.presentation.streams
}
}
impl futures::Stream for Session<Playing> {
type Item = Result<PacketItem, Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let mut state = this.state.project();
loop {
// First try receiving data. Let this starve keepalive handling; if we can't keep up,
// the server should probably drop us.
match Pin::new(&mut this.conn.stream).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => match msg.msg {
rtsp_types::Message::Data(data) => {
match Session::handle_data(&mut state, msg.ctx, data) {
Err(e) => return Poll::Ready(Some(Err(e))),
Ok(Some(pkt)) => return Poll::Ready(Some(Ok(pkt))),
Ok(None) => continue,
};
}
rtsp_types::Message::Response(response) => {
if let Err(e) = Session::handle_response(&mut state, response) {
return Poll::Ready(Some(Err(e)));
}
}
rtsp_types::Message::Request(request) => {
warn!("Received RTSP request in Playing state. Responding unimplemented.\n{:#?}",
request);
}
},
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None),
std::task::Poll::Pending => {}
}
// Then check if it's time for a new keepalive.
if matches!(state.keepalive_timer.as_mut().poll(cx), Poll::Ready(())) {
Session::handle_keepalive_timer(this.conn, &mut state, cx)?;
}
// Then finish flushing the current keepalive if necessary.
if let KeepaliveState::Flushing(cseq) = state.keepalive_state {
match this.conn.stream.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => *state.keepalive_state = KeepaliveState::Waiting(*cseq),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => {}
}
}
// Nothing to do. The poll calls above have already registered cx as necessary.
return Poll::Pending;
}
}
}
enum DemuxedState {
Waiting,
Pulling(usize),
}
#[pin_project]
struct Demuxed {
state: DemuxedState,
#[pin]
session: Session<Playing>,
}
impl futures::Stream for Demuxed {
type Item = Result<CodecItem, Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let (stream_id, pkt) = match this.state {
DemuxedState::Waiting => match ready!(this.session.as_mut().poll_next(cx)) {
Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id, Some(p)),
Some(Ok(PacketItem::SenderReport(p))) => {
return Poll::Ready(Some(Ok(CodecItem::SenderReport(p))))
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
},
DemuxedState::Pulling(stream_id) => (*stream_id, None),
};
let session = this.session.as_mut().project();
let playing = session.state.project();
let depacketizer = match &mut playing.presentation.streams[stream_id].depacketizer {
Ok(d) => d,
Err(_) => unreachable!("depacketizer was Ok"),
};
if let Some(p) = pkt {
depacketizer.push(p)?;
}
match depacketizer.pull() {
Ok(Some(item)) => {
*this.state = DemuxedState::Pulling(stream_id);
return Poll::Ready(Some(Ok(item)));
}
Ok(None) => {
*this.state = DemuxedState::Waiting;
continue;
}
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
}
}

View File

@ -271,7 +271,10 @@ impl Codec {
}
let mut msg = src.split_to(len);
msg.advance(4);
return Ok(Some((len, Message::Data(Data::new(channel_id, msg.freeze())))))
return Ok(Some((
len,
Message::Data(Data::new(channel_id, msg.freeze())),
)));
}
let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) {