take transport options at setup time

This commit is contained in:
Scott Lamb 2022-04-26 14:47:23 -07:00
parent 8744bf52d3
commit 1646142322
3 changed files with 81 additions and 47 deletions

View File

@ -5,9 +5,13 @@
`#[doc(hidden)]`. `#[doc(hidden)]`.
* BREAKING: `retina::client::Session<Described>::setup` takes a new * BREAKING: `retina::client::Session<Described>::setup` takes a new
`SetupOptions` argument for future expansion. `SetupOptions` argument for future expansion.
* BREAKING: `retina::StreamContext` has been split out of `retina::PacketContext`. * BREAKING: the transport to use is configured per-stream as part of
Both must be printed to provide the same information as before. This change `retina::SetupOptions` (rather than the prior `retina::SessionOptions`) and
reduces how much data needs to be copied with each packet. takes per-transport options for future expansion.
* BREAKING: `retina::StreamContext` has been split out of
`retina::PacketContext`. Both must be printed to provide the same
information as before. This change reduces how much data needs to be copied
with each packet.
## `v0.3.9` (2022-04-12) ## `v0.3.9` (2022-04-12)

View File

@ -726,7 +726,7 @@ async fn write_mp4<'a>(
} }
pub async fn run(opts: Opts) -> Result<(), Error> { pub async fn run(opts: Opts) -> Result<(), Error> {
if matches!(opts.transport, Transport::Udp) && !opts.allow_loss { if matches!(opts.transport, Transport::Udp(_)) && !opts.allow_loss {
warn!("Using --transport=udp without strongly recommended --allow-loss!"); warn!("Using --transport=udp without strongly recommended --allow-loss!");
} }
@ -739,7 +739,6 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
.creds(creds) .creds(creds)
.session_group(session_group.clone()) .session_group(session_group.clone())
.user_agent("Retina mp4 example".to_owned()) .user_agent("Retina mp4 example".to_owned())
.transport(opts.transport)
.teardown(opts.teardown), .teardown(opts.teardown),
) )
.await?; .await?;
@ -766,7 +765,9 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
None None
}; };
if let Some(i) = video_stream_i { if let Some(i) = video_stream_i {
session.setup(i, SetupOptions::default()).await?; session
.setup(i, SetupOptions::default().transport(opts.transport.clone()))
.await?;
} }
let audio_stream = if !opts.no_audio { let audio_stream = if !opts.no_audio {
let s = session let s = session
@ -795,7 +796,9 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
None None
}; };
if let Some((i, _)) = audio_stream { if let Some((i, _)) = audio_stream {
session.setup(i, SetupOptions::default()).await?; session
.setup(i, SetupOptions::default().transport(opts.transport.clone()))
.await?;
} }
if video_stream_i.is_none() && audio_stream.is_none() { if video_stream_i.is_none() && audio_stream.is_none() {
bail!("Exiting because no video or audio stream was selected; see info log messages above"); bail!("Exiting because no video or audio stream was selected; see info log messages above");

View File

@ -55,7 +55,7 @@ struct StaleSession {
teardown_rx: Option<tokio::sync::watch::Receiver<Option<Result<(), Error>>>>, teardown_rx: Option<tokio::sync::watch::Receiver<Option<Result<(), Error>>>>,
maybe_playing: bool, maybe_playing: bool,
is_tcp: bool, has_tcp: bool,
/// Upper bound of advertised expiration time. /// Upper bound of advertised expiration time.
expires: tokio::time::Instant, expires: tokio::time::Instant,
@ -398,12 +398,11 @@ impl std::str::FromStr for InitialTimestampPolicy {
/// Options which must be known right as a session is created. /// Options which must be known right as a session is created.
/// ///
/// Decisions which can be deferred are in [PlayOptions] instead. /// Decisions which can be deferred are in [`SetupOptions`] or [`PlayOptions`] instead.
#[derive(Default)] #[derive(Default)]
pub struct SessionOptions { pub struct SessionOptions {
creds: Option<Credentials>, creds: Option<Credentials>,
user_agent: Option<Box<str>>, user_agent: Option<Box<str>>,
transport: Transport,
session_group: Option<Arc<SessionGroup>>, session_group: Option<Arc<SessionGroup>>,
teardown: TeardownPolicy, teardown: TeardownPolicy,
unassigned_channel_data: UnassignedChannelDataPolicy, unassigned_channel_data: UnassignedChannelDataPolicy,
@ -480,10 +479,11 @@ impl std::str::FromStr for UnassignedChannelDataPolicy {
/// The RTP packet transport to request. /// The RTP packet transport to request.
/// ///
/// Defaults to `Transport::Tcp`. /// Defaults to `Transport::Tcp`.
#[derive(Copy, Clone, Debug)] #[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Transport { pub enum Transport {
/// Sends RTP packets over the RTSP TCP connection via interleaved data. /// Sends RTP packets over the RTSP TCP connection via interleaved data.
Tcp, Tcp(TcpTransportOptions),
/// Sends RTP packets over UDP (experimental). /// Sends RTP packets over UDP (experimental).
/// ///
@ -492,20 +492,20 @@ pub enum Transport {
/// * There's no support for sending RTCP RRs (receiver reports), so /// * There's no support for sending RTCP RRs (receiver reports), so
/// servers won't have the correct information to measure packet loss /// servers won't have the correct information to measure packet loss
/// and pace packets appropriately. /// and pace packets appropriately.
Udp, Udp(UdpTransportOptions),
} }
impl Default for Transport { impl Default for Transport {
fn default() -> Self { fn default() -> Self {
Transport::Tcp Transport::Tcp(TcpTransportOptions::default())
} }
} }
impl std::fmt::Display for Transport { impl std::fmt::Display for Transport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.pad(match self { f.pad(match self {
Transport::Tcp => "tcp", Transport::Tcp(_) => "tcp",
Transport::Udp => "udp", Transport::Udp(_) => "udp",
}) })
} }
} }
@ -515,8 +515,8 @@ impl std::str::FromStr for Transport {
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s { Ok(match s {
"tcp" => Transport::Tcp, "tcp" => Transport::Tcp(TcpTransportOptions::default()),
"udp" => Transport::Udp, "udp" => Transport::Udp(UdpTransportOptions::default()),
_ => bail!(ErrorInt::InvalidArgument(format!( _ => bail!(ErrorInt::InvalidArgument(format!(
"bad Transport {}; \ "bad Transport {}; \
expected tcp or udp", expected tcp or udp",
@ -526,8 +526,19 @@ impl std::str::FromStr for Transport {
} }
} }
/// Per-stream TCP transport options (placeholder for future expansion).
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
pub struct TcpTransportOptions;
/// Per-stream UDP transport options (placeholder for future expansion).
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
pub struct UdpTransportOptions;
impl SessionOptions { impl SessionOptions {
/// Uses the given credentials when/if the server requests digest authentication. /// Uses the given credentials when/if the server requests digest authentication.
#[inline]
pub fn creds(mut self, creds: Option<Credentials>) -> Self { pub fn creds(mut self, creds: Option<Credentials>) -> Self {
self.creds = creds; self.creds = creds;
self self
@ -543,12 +554,6 @@ impl SessionOptions {
self self
} }
/// Sets the underlying transport to use.
pub fn transport(mut self, transport: Transport) -> Self {
self.transport = transport;
self
}
pub fn session_group(mut self, session_group: Arc<SessionGroup>) -> Self { pub fn session_group(mut self, session_group: Arc<SessionGroup>) -> Self {
self.session_group = Some(session_group); self.session_group = Some(session_group);
self self
@ -570,8 +575,18 @@ impl SessionOptions {
/// There's nothing here yet. Likely in the future this will allow configuring /// There's nothing here yet. Likely in the future this will allow configuring
/// the output format for H.264 (Annex B or not). /// the output format for H.264 (Annex B or not).
#[derive(Default)] #[derive(Default)]
#[non_exhaustive] pub struct SetupOptions {
pub struct SetupOptions {} transport: Transport,
}
impl SetupOptions {
/// Sets the underlying transport to use.
#[inline]
pub fn transport(mut self, transport: Transport) -> Self {
self.transport = transport;
self
}
}
/// Options which must be decided at `PLAY` time. /// Options which must be decided at `PLAY` time.
/// ///
@ -952,9 +967,8 @@ struct SessionInner {
keepalive_timer: Option<Pin<Box<tokio::time::Sleep>>>, keepalive_timer: Option<Pin<Box<tokio::time::Sleep>>>,
/// Set if the server may be in state Playing: we have sent a `PLAY` /// Bitmask of [`SessionFlag`]s.
/// request, regardless of if the response has been received. flags: u8,
maybe_playing: bool,
/// The index within `presentation.streams` to start the next poll at. /// The index within `presentation.streams` to start the next poll at.
/// Round-robining between them rather than always starting at 0 should /// Round-robining between them rather than always starting at 0 should
@ -962,6 +976,20 @@ struct SessionInner {
udp_next_poll_i: usize, udp_next_poll_i: usize,
} }
#[derive(Copy, Clone)]
#[repr(u8)]
enum SessionFlag {
/// Set if the server may be in state Playing: we have sent a `PLAY`
/// request, regardless of if the response has been received.
MaybePlaying = 0x1,
/// Set if one or more streams are configured to use TCP.
TcpStreams = 0x2,
/// Set if one or more streams are configured to use UDP.
UdpStreams = 0x4,
}
impl RtspConnection { impl RtspConnection {
async fn connect(url: &Url) -> Result<Self, Error> { async fn connect(url: &Url) -> Result<Self, Error> {
let host = let host =
@ -1293,7 +1321,7 @@ impl Session<Described> {
describe_status, describe_status,
keepalive_state: KeepaliveState::Idle, keepalive_state: KeepaliveState::Idle,
keepalive_timer: None, keepalive_timer: None,
maybe_playing: false, flags: 0,
udp_next_poll_i: 0, udp_next_poll_i: 0,
}), }),
Described { sdp }, Described { sdp },
@ -1319,27 +1347,24 @@ impl Session<Described> {
/// ///
/// Panics if `stream_i >= self.streams().len()`. /// Panics if `stream_i >= self.streams().len()`.
pub async fn setup(&mut self, stream_i: usize, options: SetupOptions) -> Result<(), Error> { pub async fn setup(&mut self, stream_i: usize, options: SetupOptions) -> Result<(), Error> {
let _ = options; // not yet used.
let inner = &mut self.0.as_mut().project(); let inner = &mut self.0.as_mut().project();
let presentation = &mut inner.presentation;
let options = &inner.options;
let conn = inner let conn = inner
.conn .conn
.as_mut() .as_mut()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?; .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
let stream = &mut presentation.streams[stream_i]; let stream = &mut inner.presentation.streams[stream_i];
if !matches!(stream.state, StreamState::Uninit) { if !matches!(stream.state, StreamState::Uninit) {
bail!(ErrorInt::FailedPrecondition("stream already set up".into())); bail!(ErrorInt::FailedPrecondition("stream already set up".into()));
} }
let url = stream let url = stream
.control .control
.as_ref() .as_ref()
.unwrap_or(&presentation.control) .unwrap_or(&inner.presentation.control)
.clone(); .clone();
let mut req = let mut req =
rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0).request_uri(url); rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0).request_uri(url);
let udp = match options.transport { let udp = match options.transport {
Transport::Tcp => { Transport::Tcp(_) => {
let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| { let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition( wrap!(ErrorInt::FailedPrecondition(
"no unassigned channels".into() "no unassigned channels".into()
@ -1353,9 +1378,10 @@ impl Session<Described> {
proposed_channel_id + 1 proposed_channel_id + 1
), ),
); );
*inner.flags |= SessionFlag::TcpStreams as u8;
None None
} }
Transport::Udp => { Transport::Udp(_) => {
// Bind an ephemeral UDP port on the same local address used to connect // Bind an ephemeral UDP port on the same local address used to connect
// to the RTSP server. // to the RTSP server.
let local_ip = conn.inner.ctx().local_addr.ip(); let local_ip = conn.inner.ctx().local_addr.ip();
@ -1369,6 +1395,7 @@ impl Session<Described> {
pair.rtp_port + 1, pair.rtp_port + 1,
), ),
); );
*inner.flags |= SessionFlag::UdpStreams as u8;
Some(UdpStreamTransport { Some(UdpStreamTransport {
ctx: UdpStreamContext { ctx: UdpStreamContext {
local_ip, local_ip,
@ -1388,7 +1415,7 @@ impl Session<Described> {
.send( .send(
ResponseMode::Normal, ResponseMode::Normal,
inner.options, inner.options,
presentation.tool.as_ref(), inner.presentation.tool.as_ref(),
inner.requested_auth, inner.requested_auth,
&mut req.build(Bytes::new()), &mut req.build(Bytes::new()),
) )
@ -1515,7 +1542,7 @@ impl Session<Described> {
)) ))
})?; })?;
if let Some(ref t) = inner.presentation.tool { if let Some(ref t) = inner.presentation.tool {
if matches!(inner.options.transport, Transport::Tcp) { if (*inner.flags & (SessionFlag::TcpStreams as u8)) != 0 {
warn!( warn!(
"Connecting via TCP to known-broken RTSP server {:?}. \ "Connecting via TCP to known-broken RTSP server {:?}. \
See <https://github.com/scottlamb/retina/issues/17>. \ See <https://github.com/scottlamb/retina/issues/17>. \
@ -1526,7 +1553,7 @@ impl Session<Described> {
} }
trace!("PLAY with channel mappings: {:#?}", &conn.channels); trace!("PLAY with channel mappings: {:#?}", &conn.channels);
*inner.maybe_playing = true; *inner.flags |= SessionFlag::MaybePlaying as u8;
let (msg_ctx, cseq, response) = conn let (msg_ctx, cseq, response) = conn
.send( .send(
ResponseMode::Play, ResponseMode::Play,
@ -1675,7 +1702,7 @@ fn note_stale_live555_data(tool: Option<&Tool>, options: &SessionOptions) {
{ {
let mut lock = group.sessions.lock().unwrap(); let mut lock = group.sessions.lock().unwrap();
for s in &lock.sessions { for s in &lock.sessions {
if s.is_tcp { if s.has_tcp {
// This session plausibly explains the packet. // This session plausibly explains the packet.
// (We could go so far as to examine the data packet's SSRC to // (We could go so far as to examine the data packet's SSRC to
// see if it matches one associated with this session. But // see if it matches one associated with this session. But
@ -1704,7 +1731,7 @@ fn note_stale_live555_data(tool: Option<&Tool>, options: &SessionOptions) {
seqnum, seqnum,
expires, expires,
teardown_rx: None, teardown_rx: None,
is_tcp: true, has_tcp: true,
maybe_playing: true, maybe_playing: true,
}); });
} }
@ -2119,9 +2146,9 @@ impl PinnedDrop for SessionInner {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
let this = self.project(); let this = self.project();
let is_tcp = matches!(this.options.transport, Transport::Tcp); let has_tcp = (*this.flags & (SessionFlag::TcpStreams as u8)) != 0;
let just_try_once = match this.options.teardown { let just_try_once = match this.options.teardown {
TeardownPolicy::Auto if is_tcp => { TeardownPolicy::Auto if has_tcp => {
// If the server is known to have the live555 bug, try really hard to send a // If the server is known to have the live555 bug, try really hard to send a
// TEARDOWN before considering the session cleaned up. Otherwise, try once on // TEARDOWN before considering the session cleaned up. Otherwise, try once on
// the existing connection, primarily in case the server has // the existing connection, primarily in case the server has
@ -2156,8 +2183,8 @@ impl PinnedDrop for SessionInner {
seqnum, seqnum,
expires, expires,
teardown_rx: Some(teardown_rx), teardown_rx: Some(teardown_rx),
is_tcp, has_tcp,
maybe_playing: *this.maybe_playing, maybe_playing: *this.flags & (SessionFlag::MaybePlaying as u8) != 0,
}); });
log::debug!( log::debug!(
"{:?}/{} tracking TEARDOWN of session id {}", "{:?}/{} tracking TEARDOWN of session id {}",
@ -2222,7 +2249,7 @@ impl futures::Stream for Session<Playing> {
} }
// Next try receiving data on the UDP sockets, if any. // Next try receiving data on the UDP sockets, if any.
if matches!(self.0.options.transport, Transport::Udp) { if self.0.flags & (SessionFlag::UdpStreams as u8) != 0 {
if let Poll::Ready(result) = self.as_mut().poll_udp(cx) { if let Poll::Ready(result) = self.as_mut().poll_udp(cx) {
return Poll::Ready(result); return Poll::Ready(result);
} }