split PacketContext and StreamContext
As listed in #47. This improves throughput in the client/h264 benchmark by about 20%, I think because we copy fewer bytes around. CodecItem went from 256 bytes to 176.
This commit is contained in:
parent
1bbaf29dc9
commit
8744bf52d3
@ -5,6 +5,9 @@
|
|||||||
`#[doc(hidden)]`.
|
`#[doc(hidden)]`.
|
||||||
* BREAKING: `retina::client::Session<Described>::setup` takes a new
|
* BREAKING: `retina::client::Session<Described>::setup` takes a new
|
||||||
`SetupOptions` argument for future expansion.
|
`SetupOptions` argument for future expansion.
|
||||||
|
* BREAKING: `retina::StreamContext` has been split out of `retina::PacketContext`.
|
||||||
|
Both must be printed to provide the same information as before. This change
|
||||||
|
reduces how much data needs to be copied with each packet.
|
||||||
|
|
||||||
## `v0.3.9` (2022-04-12)
|
## `v0.3.9` (2022-04-12)
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
|
|||||||
Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(),
|
Depacketizer::new("video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg==")).unwrap(),
|
||||||
];
|
];
|
||||||
let conn_ctx = retina::ConnectionContext::dummy();
|
let conn_ctx = retina::ConnectionContext::dummy();
|
||||||
|
let stream_ctx = retina::StreamContextRef::dummy();
|
||||||
let pkt_ctx = retina::PacketContext::dummy();
|
let pkt_ctx = retina::PacketContext::dummy();
|
||||||
while !remaining.is_empty() {
|
while !remaining.is_empty() {
|
||||||
assert!(remaining.len() > 4);
|
assert!(remaining.len() > 4);
|
||||||
@ -49,6 +50,7 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
|
|||||||
};
|
};
|
||||||
let pkt = match rtps[stream_id].rtp(
|
let pkt = match rtps[stream_id].rtp(
|
||||||
&retina::client::SessionOptions::default(),
|
&retina::client::SessionOptions::default(),
|
||||||
|
stream_ctx,
|
||||||
None,
|
None,
|
||||||
&conn_ctx,
|
&conn_ctx,
|
||||||
&pkt_ctx,
|
&pkt_ctx,
|
||||||
@ -60,7 +62,10 @@ fn h264_aac<F: FnMut(CodecItem) -> ()>(mut f: F) {
|
|||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
depacketizers[stream_id].push(pkt).unwrap();
|
depacketizers[stream_id].push(pkt).unwrap();
|
||||||
while let Some(pkt) = depacketizers[stream_id].pull(&conn_ctx).unwrap() {
|
while let Some(pkt) = depacketizers[stream_id]
|
||||||
|
.pull(&conn_ctx, stream_ctx)
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
f(pkt);
|
f(pkt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ fuzz_target!(|data: &[u8]| {
|
|||||||
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
|
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
|
||||||
let mut sequence_number: u16 = 0;
|
let mut sequence_number: u16 = 0;
|
||||||
let conn_ctx = retina::ConnectionContext::dummy();
|
let conn_ctx = retina::ConnectionContext::dummy();
|
||||||
|
let stream_ctx = retina::StreamContextRef::dummy();
|
||||||
let pkt_ctx = retina::PacketContext::dummy();
|
let pkt_ctx = retina::PacketContext::dummy();
|
||||||
while data.has_remaining() {
|
while data.has_remaining() {
|
||||||
let hdr = data.get_u8();
|
let hdr = data.get_u8();
|
||||||
@ -43,7 +44,7 @@ fuzz_target!(|data: &[u8]| {
|
|||||||
if depacketizer.push(pkt).is_err() {
|
if depacketizer.push(pkt).is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while let Some(item) = depacketizer.pull(&conn_ctx).transpose() {
|
while let Some(item) = depacketizer.pull(&conn_ctx, stream_ctx).transpose() {
|
||||||
if item.is_err() {
|
if item.is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ fuzz_target!(|data: &[u8]| {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let conn_ctx = retina::ConnectionContext::dummy();
|
let conn_ctx = retina::ConnectionContext::dummy();
|
||||||
|
let stream_ctx = retina::StreamContextRef::dummy();
|
||||||
let max_payload_size = u16::from_be_bytes([data[0], data[1]]);
|
let max_payload_size = u16::from_be_bytes([data[0], data[1]]);
|
||||||
let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0) {
|
let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
@ -42,7 +43,7 @@ fuzz_target!(|data: &[u8]| {
|
|||||||
if d.push(pkt).is_err() {
|
if d.push(pkt).is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
match d.pull(&conn_ctx) {
|
match d.pull(&conn_ctx, stream_ctx) {
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
Ok(Some(retina::codec::CodecItem::VideoFrame(f))) => {
|
Ok(Some(retina::codec::CodecItem::VideoFrame(f))) => {
|
||||||
assert!(mark);
|
assert!(mark);
|
||||||
@ -68,6 +69,6 @@ fuzz_target!(|data: &[u8]| {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
assert_eq!(&data[2..], &frame.data()[..]);
|
assert_eq!(&data[2..], &frame.data()[..]);
|
||||||
assert!(matches!(d.pull(&conn_ctx), Ok(None)));
|
assert!(matches!(d.pull(&conn_ctx, stream_ctx), Ok(None)));
|
||||||
assert!(matches!(p.pull(), Ok(None)));
|
assert!(matches!(p.pull(), Ok(None)));
|
||||||
});
|
});
|
||||||
|
@ -24,7 +24,10 @@ use url::Url;
|
|||||||
|
|
||||||
use crate::client::parse::SessionHeader;
|
use crate::client::parse::SessionHeader;
|
||||||
use crate::codec::CodecItem;
|
use crate::codec::CodecItem;
|
||||||
use crate::{Error, ErrorInt, RtspMessageContext};
|
use crate::{
|
||||||
|
Error, ErrorInt, RtspMessageContext, StreamContextRef, StreamContextRefInner, TcpStreamContext,
|
||||||
|
UdpStreamContext,
|
||||||
|
};
|
||||||
|
|
||||||
mod channel_mapping;
|
mod channel_mapping;
|
||||||
mod parse;
|
mod parse;
|
||||||
@ -703,9 +706,6 @@ pub struct Stream {
|
|||||||
/// says the server is allowed to omit it when there is only a single stream.
|
/// says the server is allowed to omit it when there is only a single stream.
|
||||||
pub control: Option<Url>,
|
pub control: Option<Url>,
|
||||||
|
|
||||||
/// The sockets for `Transport::Udp`.
|
|
||||||
sockets: Option<UdpSockets>,
|
|
||||||
|
|
||||||
state: StreamState,
|
state: StreamState,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -720,36 +720,38 @@ impl std::fmt::Debug for Stream {
|
|||||||
.field("channels", &self.channels)
|
.field("channels", &self.channels)
|
||||||
.field("framerate", &self.framerate)
|
.field("framerate", &self.framerate)
|
||||||
.field("depacketizer", &self.depacketizer)
|
.field("depacketizer", &self.depacketizer)
|
||||||
.field("UDP", &self.sockets)
|
|
||||||
.field("state", &self.state)
|
.field("state", &self.state)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct UdpSockets {
|
enum StreamTransport {
|
||||||
local_ip: IpAddr,
|
Udp(UdpStreamTransport), // the ...Transport wrapper struct keeps ...Context and the sockets.
|
||||||
local_rtp_port: u16,
|
Tcp(TcpStreamContext), // no sockets; ...Context suffices.
|
||||||
remote_ip: IpAddr,
|
|
||||||
remote_rtp_port: u16,
|
|
||||||
rtp_socket: UdpSocket,
|
|
||||||
remote_rtcp_port: u16,
|
|
||||||
rtcp_socket: UdpSocket,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for UdpSockets {
|
impl std::fmt::Debug for StreamTransport {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("UDP")
|
std::fmt::Debug::fmt(&self.ctx(), f)
|
||||||
.field("local_ip", &self.local_ip)
|
|
||||||
.field("local_rtp_port", &self.local_rtp_port)
|
|
||||||
.field("remote_ip", &self.remote_ip)
|
|
||||||
.field("remote_rtp_port", &self.remote_rtp_port)
|
|
||||||
.field("rtp_socket", &self.rtp_socket)
|
|
||||||
.field("remote_rtcp_port", &self.remote_rtcp_port)
|
|
||||||
.field("rtcp_socket", &self.rtcp_socket)
|
|
||||||
.finish()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl StreamTransport {
|
||||||
|
fn ctx(&self) -> StreamContextRef {
|
||||||
|
StreamContextRef(match self {
|
||||||
|
StreamTransport::Tcp(tcp) => StreamContextRefInner::Tcp(*tcp),
|
||||||
|
StreamTransport::Udp(udp) => StreamContextRefInner::Udp(&udp.ctx),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct UdpStreamTransport {
|
||||||
|
ctx: UdpStreamContext,
|
||||||
|
rtp_socket: UdpSocket,
|
||||||
|
rtcp_socket: UdpSocket,
|
||||||
|
}
|
||||||
|
|
||||||
impl Stream {
|
impl Stream {
|
||||||
/// Returns codec-specified parameters for this stream, if available.
|
/// Returns codec-specified parameters for this stream, if available.
|
||||||
///
|
///
|
||||||
@ -785,6 +787,15 @@ impl Stream {
|
|||||||
pub fn parameters(&self) -> Option<crate::codec::Parameters> {
|
pub fn parameters(&self) -> Option<crate::codec::Parameters> {
|
||||||
self.depacketizer.as_ref().ok().and_then(|d| d.parameters())
|
self.depacketizer.as_ref().ok().and_then(|d| d.parameters())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a context for this stream, if it has been set up.
|
||||||
|
pub fn ctx(&self) -> Option<StreamContextRef> {
|
||||||
|
match &self.state {
|
||||||
|
StreamState::Uninit => None,
|
||||||
|
StreamState::Init(init) => Some(init.transport.ctx()),
|
||||||
|
StreamState::Playing { transport, .. } => Some(transport.ctx()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -799,10 +810,11 @@ enum StreamState {
|
|||||||
Playing {
|
Playing {
|
||||||
timeline: Timeline,
|
timeline: Timeline,
|
||||||
rtp_handler: rtp::InorderParser,
|
rtp_handler: rtp::InorderParser,
|
||||||
|
transport: StreamTransport,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Default)]
|
#[derive(Debug)]
|
||||||
struct StreamStateInit {
|
struct StreamStateInit {
|
||||||
/// The RTP synchronization source (SSRC), as defined in
|
/// The RTP synchronization source (SSRC), as defined in
|
||||||
/// [RFC 3550](https://tools.ietf.org/html/rfc3550). This is normally
|
/// [RFC 3550](https://tools.ietf.org/html/rfc3550). This is normally
|
||||||
@ -819,6 +831,8 @@ struct StreamStateInit {
|
|||||||
/// `RTP-Info` header. This field is only used during the `play()` call
|
/// `RTP-Info` header. This field is only used during the `play()` call
|
||||||
/// itself; by the time it returns, the stream will be in state `Playing`.
|
/// itself; by the time it returns, the stream will be in state `Playing`.
|
||||||
initial_rtptime: Option<u32>,
|
initial_rtptime: Option<u32>,
|
||||||
|
|
||||||
|
transport: StreamTransport,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Username and password authentication credentials.
|
/// Username and password authentication credentials.
|
||||||
@ -1324,7 +1338,7 @@ impl Session<Described> {
|
|||||||
.clone();
|
.clone();
|
||||||
let mut req =
|
let mut req =
|
||||||
rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0).request_uri(url);
|
rtsp_types::Request::builder(Method::Setup, rtsp_types::Version::V1_0).request_uri(url);
|
||||||
match options.transport {
|
let udp = match options.transport {
|
||||||
Transport::Tcp => {
|
Transport::Tcp => {
|
||||||
let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| {
|
let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| {
|
||||||
wrap!(ErrorInt::FailedPrecondition(
|
wrap!(ErrorInt::FailedPrecondition(
|
||||||
@ -1339,22 +1353,14 @@ impl Session<Described> {
|
|||||||
proposed_channel_id + 1
|
proposed_channel_id + 1
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
None
|
||||||
}
|
}
|
||||||
Transport::Udp => {
|
Transport::Udp => {
|
||||||
// Bind an ephemeral UDP port on the same local address used to connect
|
// Bind an ephemeral UDP port on the same local address used to connect
|
||||||
// to the RTSP server.
|
// to the RTSP server.
|
||||||
let ip_addr = conn.inner.ctx().local_addr.ip();
|
let local_ip = conn.inner.ctx().local_addr.ip();
|
||||||
let pair = crate::tokio::UdpPair::for_ip(ip_addr)
|
let pair = crate::tokio::UdpPair::for_ip(local_ip)
|
||||||
.map_err(|e| wrap!(ErrorInt::Internal(e.into())))?;
|
.map_err(|e| wrap!(ErrorInt::Internal(e.into())))?;
|
||||||
stream.sockets = Some(UdpSockets {
|
|
||||||
local_ip: ip_addr,
|
|
||||||
local_rtp_port: pair.rtp_port,
|
|
||||||
remote_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
|
||||||
remote_rtp_port: 0,
|
|
||||||
rtp_socket: pair.rtp_socket,
|
|
||||||
remote_rtcp_port: 0,
|
|
||||||
rtcp_socket: pair.rtcp_socket,
|
|
||||||
});
|
|
||||||
req = req.header(
|
req = req.header(
|
||||||
rtsp_types::headers::TRANSPORT,
|
rtsp_types::headers::TRANSPORT,
|
||||||
format!(
|
format!(
|
||||||
@ -1363,8 +1369,18 @@ impl Session<Described> {
|
|||||||
pair.rtp_port + 1,
|
pair.rtp_port + 1,
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
Some(UdpStreamTransport {
|
||||||
|
ctx: UdpStreamContext {
|
||||||
|
local_ip,
|
||||||
|
peer_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||||
|
local_rtp_port: pair.rtp_port,
|
||||||
|
peer_rtp_port: 0,
|
||||||
|
},
|
||||||
|
rtp_socket: pair.rtp_socket,
|
||||||
|
rtcp_socket: pair.rtcp_socket,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
if let Some(ref s) = inner.session {
|
if let Some(ref s) = inner.session {
|
||||||
req = req.header(rtsp_types::headers::SESSION, s.id.to_string());
|
req = req.header(rtsp_types::headers::SESSION, s.id.to_string());
|
||||||
}
|
}
|
||||||
@ -1408,8 +1424,8 @@ impl Session<Described> {
|
|||||||
None => *inner.session = Some(response.session),
|
None => *inner.session = Some(response.session),
|
||||||
};
|
};
|
||||||
let conn_ctx = conn.inner.ctx();
|
let conn_ctx = conn.inner.ctx();
|
||||||
match options.transport {
|
let transport = match udp {
|
||||||
Transport::Tcp => {
|
None => {
|
||||||
let channel_id = match response.channel_id {
|
let channel_id = match response.channel_id {
|
||||||
Some(id) => id,
|
Some(id) => id,
|
||||||
None => bail!(ErrorInt::RtspResponseError {
|
None => bail!(ErrorInt::RtspResponseError {
|
||||||
@ -1433,8 +1449,11 @@ impl Session<Described> {
|
|||||||
description,
|
description,
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
StreamTransport::Tcp(TcpStreamContext {
|
||||||
|
rtp_channel_id: channel_id,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
Transport::Udp => {
|
Some(mut udp) => {
|
||||||
// TODO: RFC 2326 section 12.39 says "If the source address for
|
// TODO: RFC 2326 section 12.39 says "If the source address for
|
||||||
// the stream is different than can be derived from the RTSP
|
// the stream is different than can be derived from the RTSP
|
||||||
// endpoint address (the server in playback or the client in
|
// endpoint address (the server in playback or the client in
|
||||||
@ -1455,29 +1474,27 @@ impl Session<Described> {
|
|||||||
description: "Transport header is missing server_port parameter".to_owned(),
|
description: "Transport header is missing server_port parameter".to_owned(),
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
let udp_sockets = stream.sockets.as_mut().unwrap();
|
udp.ctx.peer_ip = source;
|
||||||
udp_sockets.remote_ip = source;
|
udp.ctx.peer_rtp_port = server_port;
|
||||||
udp_sockets.remote_rtp_port = server_port.0;
|
udp.rtp_socket
|
||||||
udp_sockets.remote_rtcp_port = server_port.1;
|
.connect(SocketAddr::new(source, server_port))
|
||||||
udp_sockets
|
|
||||||
.rtp_socket
|
|
||||||
.connect(SocketAddr::new(source, udp_sockets.remote_rtp_port))
|
|
||||||
.await
|
.await
|
||||||
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
|
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
|
||||||
udp_sockets
|
udp.rtcp_socket
|
||||||
.rtcp_socket
|
.connect(SocketAddr::new(source, server_port + 1))
|
||||||
.connect(SocketAddr::new(source, udp_sockets.remote_rtcp_port))
|
|
||||||
.await
|
.await
|
||||||
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
|
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
|
||||||
punch_firewall_hole(&udp_sockets.rtp_socket, &udp_sockets.rtcp_socket)
|
punch_firewall_hole(&udp.rtp_socket, &udp.rtcp_socket)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
|
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
|
||||||
|
StreamTransport::Udp(udp)
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
stream.state = StreamState::Init(StreamStateInit {
|
stream.state = StreamState::Init(StreamStateInit {
|
||||||
ssrc: response.ssrc,
|
ssrc: response.ssrc,
|
||||||
initial_seq: None,
|
initial_seq: None,
|
||||||
initial_rtptime: None,
|
initial_rtptime: None,
|
||||||
|
transport,
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -1552,11 +1569,12 @@ impl Session<Described> {
|
|||||||
// Move all streams that have been set up from Init to Playing state. Check that required
|
// Move all streams that have been set up from Init to Playing state. Check that required
|
||||||
// parameters are present while doing so.
|
// parameters are present while doing so.
|
||||||
for (i, s) in inner.presentation.streams.iter_mut().enumerate() {
|
for (i, s) in inner.presentation.streams.iter_mut().enumerate() {
|
||||||
match s.state {
|
match std::mem::replace(&mut s.state, StreamState::Uninit) {
|
||||||
StreamState::Init(StreamStateInit {
|
StreamState::Init(StreamStateInit {
|
||||||
initial_rtptime,
|
initial_rtptime,
|
||||||
initial_seq,
|
initial_seq,
|
||||||
ssrc,
|
ssrc,
|
||||||
|
transport,
|
||||||
..
|
..
|
||||||
}) => {
|
}) => {
|
||||||
let initial_rtptime = match policy.initial_timestamp {
|
let initial_rtptime = match policy.initial_timestamp {
|
||||||
@ -1612,6 +1630,7 @@ impl Session<Described> {
|
|||||||
})
|
})
|
||||||
})?,
|
})?,
|
||||||
rtp_handler: rtp::InorderParser::new(ssrc, initial_seq),
|
rtp_handler: rtp::InorderParser::new(ssrc, initial_seq),
|
||||||
|
transport,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
StreamState::Uninit => {}
|
StreamState::Uninit => {}
|
||||||
@ -1893,10 +1912,7 @@ impl Session<Playing> {
|
|||||||
.as_mut()
|
.as_mut()
|
||||||
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
|
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
|
||||||
let channel_id = data.channel_id();
|
let channel_id = data.channel_id();
|
||||||
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp {
|
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp { msg_ctx: *msg_ctx });
|
||||||
msg_ctx: *msg_ctx,
|
|
||||||
channel_id,
|
|
||||||
});
|
|
||||||
let m = match conn.channels.lookup(channel_id) {
|
let m = match conn.channels.lookup(channel_id) {
|
||||||
Some(m) => m,
|
Some(m) => m,
|
||||||
None => {
|
None => {
|
||||||
@ -1910,11 +1926,12 @@ impl Session<Playing> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let stream = &mut inner.presentation.streams[m.stream_i];
|
let stream = &mut inner.presentation.streams[m.stream_i];
|
||||||
let (timeline, rtp_handler) = match &mut stream.state {
|
let (timeline, rtp_handler, stream_ctx) = match &mut stream.state {
|
||||||
StreamState::Playing {
|
StreamState::Playing {
|
||||||
timeline,
|
timeline,
|
||||||
rtp_handler,
|
rtp_handler,
|
||||||
} => (timeline, rtp_handler),
|
transport,
|
||||||
|
} => (timeline, rtp_handler, transport.ctx()),
|
||||||
_ => unreachable!(
|
_ => unreachable!(
|
||||||
"Session<Playing>'s {}->{:?} not in Playing state",
|
"Session<Playing>'s {}->{:?} not in Playing state",
|
||||||
channel_id, m
|
channel_id, m
|
||||||
@ -1923,6 +1940,7 @@ impl Session<Playing> {
|
|||||||
match m.channel_type {
|
match m.channel_type {
|
||||||
ChannelType::Rtp => Ok(rtp_handler.rtp(
|
ChannelType::Rtp => Ok(rtp_handler.rtp(
|
||||||
inner.options,
|
inner.options,
|
||||||
|
stream_ctx,
|
||||||
inner.presentation.tool.as_ref(),
|
inner.presentation.tool.as_ref(),
|
||||||
conn.inner.ctx(),
|
conn.inner.ctx(),
|
||||||
&pkt_ctx,
|
&pkt_ctx,
|
||||||
@ -1933,6 +1951,7 @@ impl Session<Playing> {
|
|||||||
ChannelType::Rtcp => {
|
ChannelType::Rtcp => {
|
||||||
match rtp_handler.rtcp(
|
match rtp_handler.rtcp(
|
||||||
inner.options,
|
inner.options,
|
||||||
|
stream_ctx,
|
||||||
inner.presentation.tool.as_ref(),
|
inner.presentation.tool.as_ref(),
|
||||||
&pkt_ctx,
|
&pkt_ctx,
|
||||||
timeline,
|
timeline,
|
||||||
@ -1942,6 +1961,7 @@ impl Session<Playing> {
|
|||||||
Ok(p) => Ok(p),
|
Ok(p) => Ok(p),
|
||||||
Err(description) => Err(wrap!(ErrorInt::PacketError {
|
Err(description) => Err(wrap!(ErrorInt::PacketError {
|
||||||
conn_ctx: *conn.inner.ctx(),
|
conn_ctx: *conn.inner.ctx(),
|
||||||
|
stream_ctx: stream.ctx().unwrap().to_owned(),
|
||||||
pkt_ctx,
|
pkt_ctx,
|
||||||
stream_id: m.stream_i,
|
stream_id: m.stream_i,
|
||||||
description,
|
description,
|
||||||
@ -1964,91 +1984,96 @@ impl Session<Playing> {
|
|||||||
) -> Poll<Option<Result<PacketItem, Error>>> {
|
) -> Poll<Option<Result<PacketItem, Error>>> {
|
||||||
debug_assert!(buf.filled().is_empty());
|
debug_assert!(buf.filled().is_empty());
|
||||||
let inner = self.0.as_mut().project();
|
let inner = self.0.as_mut().project();
|
||||||
|
let s = &mut inner.presentation.streams[i];
|
||||||
|
let (timeline, rtp_handler, udp) = match &mut s.state {
|
||||||
|
StreamState::Playing {
|
||||||
|
timeline,
|
||||||
|
rtp_handler,
|
||||||
|
transport: StreamTransport::Udp(udp),
|
||||||
|
} => (timeline, rtp_handler, udp),
|
||||||
|
_ => return Poll::Pending,
|
||||||
|
};
|
||||||
let conn_ctx = inner
|
let conn_ctx = inner
|
||||||
.conn
|
.conn
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?
|
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?
|
||||||
.inner
|
.inner
|
||||||
.ctx();
|
.ctx();
|
||||||
let s = &mut inner.presentation.streams[i];
|
let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp.ctx));
|
||||||
if let Some(sockets) = &mut s.sockets {
|
|
||||||
let (timeline, rtp_handler) = match &mut s.state {
|
// Prioritize RTCP over RTP within a stream.
|
||||||
StreamState::Playing {
|
while let Poll::Ready(r) = udp.rtcp_socket.poll_recv(cx, buf) {
|
||||||
timeline,
|
let when = crate::WallTime::now();
|
||||||
rtp_handler,
|
match r {
|
||||||
} => (timeline, rtp_handler),
|
Ok(()) => {
|
||||||
_ => unreachable!("Session<Playing>'s {}->{:?} not in Playing state", i, s),
|
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
|
||||||
};
|
received_wall: when,
|
||||||
// Prioritize RTCP over RTP within a stream.
|
});
|
||||||
while let Poll::Ready(r) = sockets.rtcp_socket.poll_recv(cx, buf) {
|
let msg = Bytes::copy_from_slice(buf.filled());
|
||||||
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
|
match rtp_handler.rtcp(
|
||||||
local_addr: SocketAddr::new(sockets.local_ip, sockets.local_rtp_port + 1),
|
inner.options,
|
||||||
peer_addr: SocketAddr::new(sockets.remote_ip, sockets.remote_rtcp_port),
|
stream_ctx,
|
||||||
received_wall: crate::WallTime::now(),
|
inner.presentation.tool.as_ref(),
|
||||||
});
|
&pkt_ctx,
|
||||||
match r {
|
timeline,
|
||||||
Ok(()) => {
|
i,
|
||||||
let msg = Bytes::copy_from_slice(buf.filled());
|
msg,
|
||||||
match rtp_handler.rtcp(
|
) {
|
||||||
inner.options,
|
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
|
||||||
inner.presentation.tool.as_ref(),
|
Ok(None) => buf.clear(),
|
||||||
&pkt_ctx,
|
Err(description) => {
|
||||||
timeline,
|
return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError {
|
||||||
i,
|
conn_ctx: *conn_ctx,
|
||||||
msg,
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
) {
|
pkt_ctx,
|
||||||
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
|
stream_id: i,
|
||||||
Ok(None) => buf.clear(),
|
description,
|
||||||
Err(description) => {
|
}))))
|
||||||
return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError {
|
|
||||||
conn_ctx: *conn_ctx,
|
|
||||||
pkt_ctx,
|
|
||||||
stream_id: i,
|
|
||||||
description,
|
|
||||||
}))))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(source) => {
|
}
|
||||||
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
|
Err(source) => {
|
||||||
conn_ctx: *conn_ctx,
|
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
|
||||||
pkt_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
source,
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
}))))
|
when,
|
||||||
}
|
source,
|
||||||
|
}))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while let Poll::Ready(r) = sockets.rtp_socket.poll_recv(cx, buf) {
|
}
|
||||||
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
|
while let Poll::Ready(r) = udp.rtp_socket.poll_recv(cx, buf) {
|
||||||
local_addr: SocketAddr::new(sockets.local_ip, sockets.local_rtp_port),
|
let when = crate::WallTime::now();
|
||||||
peer_addr: SocketAddr::new(sockets.remote_ip, sockets.remote_rtp_port),
|
let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp.ctx));
|
||||||
received_wall: crate::WallTime::now(),
|
match r {
|
||||||
});
|
Ok(()) => {
|
||||||
match r {
|
let msg = Bytes::copy_from_slice(buf.filled());
|
||||||
Ok(()) => {
|
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
|
||||||
let msg = Bytes::copy_from_slice(buf.filled());
|
received_wall: when,
|
||||||
match rtp_handler.rtp(
|
});
|
||||||
inner.options,
|
match rtp_handler.rtp(
|
||||||
inner.presentation.tool.as_ref(),
|
inner.options,
|
||||||
conn_ctx,
|
stream_ctx,
|
||||||
&pkt_ctx,
|
inner.presentation.tool.as_ref(),
|
||||||
timeline,
|
conn_ctx,
|
||||||
i,
|
&pkt_ctx,
|
||||||
msg,
|
timeline,
|
||||||
) {
|
i,
|
||||||
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
|
msg,
|
||||||
Ok(None) => buf.clear(),
|
) {
|
||||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
|
||||||
}
|
Ok(None) => buf.clear(),
|
||||||
}
|
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||||
Err(source) => {
|
|
||||||
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
|
|
||||||
conn_ctx: *conn_ctx,
|
|
||||||
pkt_ctx,
|
|
||||||
source,
|
|
||||||
}))))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(source) => {
|
||||||
|
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
|
||||||
|
conn_ctx: *conn_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
|
when,
|
||||||
|
source,
|
||||||
|
}))))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
@ -2273,7 +2298,12 @@ impl futures::Stream for Demuxed {
|
|||||||
DemuxedState::Fused => return Poll::Ready(None),
|
DemuxedState::Fused => return Poll::Ready(None),
|
||||||
};
|
};
|
||||||
let inner = self.session.0.as_mut().project();
|
let inner = self.session.0.as_mut().project();
|
||||||
let depacketizer = match &mut inner.presentation.streams[stream_id].depacketizer {
|
let stream = &mut inner.presentation.streams[stream_id];
|
||||||
|
let stream_ctx = match stream.state {
|
||||||
|
StreamState::Playing { ref transport, .. } => transport.ctx(),
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
let depacketizer = match &mut stream.depacketizer {
|
||||||
Ok(d) => d,
|
Ok(d) => d,
|
||||||
Err(_) => unreachable!("depacketizer was Ok"),
|
Err(_) => unreachable!("depacketizer was Ok"),
|
||||||
};
|
};
|
||||||
@ -2291,6 +2321,7 @@ impl futures::Stream for Demuxed {
|
|||||||
depacketizer.push(p).map_err(|description| {
|
depacketizer.push(p).map_err(|description| {
|
||||||
wrap!(ErrorInt::RtpPacketError {
|
wrap!(ErrorInt::RtpPacketError {
|
||||||
conn_ctx: *conn_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
pkt_ctx,
|
pkt_ctx,
|
||||||
stream_id,
|
stream_id,
|
||||||
ssrc,
|
ssrc,
|
||||||
@ -2299,7 +2330,7 @@ impl futures::Stream for Demuxed {
|
|||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
match depacketizer.pull(conn_ctx) {
|
match depacketizer.pull(conn_ctx, stream_ctx) {
|
||||||
Ok(Some(item)) => {
|
Ok(Some(item)) => {
|
||||||
self.state = DemuxedState::Pulling(stream_id);
|
self.state = DemuxedState::Pulling(stream_id);
|
||||||
return Poll::Ready(Some(Ok(item)));
|
return Poll::Ready(Some(Ok(item)));
|
||||||
|
@ -375,7 +375,6 @@ fn parse_media(base_url: &Url, media_description: &Media) -> Result<Stream, Stri
|
|||||||
rtp_payload_type,
|
rtp_payload_type,
|
||||||
depacketizer,
|
depacketizer,
|
||||||
control,
|
control,
|
||||||
sockets: None,
|
|
||||||
channels,
|
channels,
|
||||||
framerate,
|
framerate,
|
||||||
state: super::StreamState::Uninit,
|
state: super::StreamState::Uninit,
|
||||||
@ -486,7 +485,25 @@ pub(crate) struct SetupResponse {
|
|||||||
pub(crate) ssrc: Option<u32>,
|
pub(crate) ssrc: Option<u32>,
|
||||||
pub(crate) channel_id: Option<u8>,
|
pub(crate) channel_id: Option<u8>,
|
||||||
pub(crate) source: Option<IpAddr>,
|
pub(crate) source: Option<IpAddr>,
|
||||||
pub(crate) server_port: Option<(u16, u16)>,
|
pub(crate) server_port: Option<u16>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_server_port(server_port: &str) -> Result<u16, ()> {
|
||||||
|
if let Some((a, b)) = server_port.split_once('-') {
|
||||||
|
let a = u16::from_str_radix(a, 10).map_err(|_| ())?;
|
||||||
|
let b = u16::from_str_radix(b, 10).map_err(|_| ())?;
|
||||||
|
if a.checked_add(1) != Some(b) {
|
||||||
|
// It's unclear what a non-consecutive range means.
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
return Ok(a);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note returning a range is allowed by RFC 2326's grammar, but I'm not sure
|
||||||
|
// what it means. RTSP 2.0 allows "RTCP-mux" for using a single port for
|
||||||
|
// both RTP and RTCP, but it's only by client request, and RTSP 1.0 doesn't
|
||||||
|
// reference this.
|
||||||
|
Err(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses a `SETUP` response.
|
/// Parses a `SETUP` response.
|
||||||
@ -546,19 +563,12 @@ pub(crate) fn parse_setup(response: &rtsp_types::Response<Bytes>) -> Result<Setu
|
|||||||
.map_err(|_| format!("Transport header has unparseable source {:?}", s))?,
|
.map_err(|_| format!("Transport header has unparseable source {:?}", s))?,
|
||||||
);
|
);
|
||||||
} else if let Some(s) = part.strip_prefix("server_port=") {
|
} else if let Some(s) = part.strip_prefix("server_port=") {
|
||||||
let mut ports = s.splitn(2, '-');
|
server_port = Some(parse_server_port(s).map_err(|()| {
|
||||||
let n = ports.next().expect("splitn returns at least one part");
|
format!(
|
||||||
let n = u16::from_str_radix(n, 10)
|
"Transport header {:?} has bad server_port",
|
||||||
.map_err(|_| format!("bad port in Transport: {}", transport.as_str()))?;
|
transport.as_str()
|
||||||
if let Some(m) = ports.next() {
|
)
|
||||||
let m = u16::from_str_radix(m, 10).map_err(|_| format!("bad second port {}", m))?;
|
})?);
|
||||||
server_port = Some((n, m))
|
|
||||||
} else {
|
|
||||||
// TODO: this is allowed by RFC 2326's grammar, but I'm not sure
|
|
||||||
// what it means. Does it use the same port for both RTP and
|
|
||||||
// RTCP, or is it implied the second is one more than the first?
|
|
||||||
return Err("Transport header specifies a single server_port".to_owned());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(SetupResponse {
|
Ok(SetupResponse {
|
||||||
@ -660,6 +670,8 @@ mod tests {
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
|
use crate::client::StreamTransport;
|
||||||
|
use crate::TcpStreamContext;
|
||||||
use crate::{client::StreamStateInit, codec::Parameters};
|
use crate::{client::StreamStateInit, codec::Parameters};
|
||||||
|
|
||||||
use super::super::StreamState;
|
use super::super::StreamState;
|
||||||
@ -674,6 +686,15 @@ mod tests {
|
|||||||
super::parse_describe(url, &response(raw_response))
|
super::parse_describe(url, &response(raw_response))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn dummy_stream_state_init(ssrc: Option<u32>) -> StreamState {
|
||||||
|
StreamState::Init(StreamStateInit {
|
||||||
|
ssrc,
|
||||||
|
initial_seq: None,
|
||||||
|
initial_rtptime: None,
|
||||||
|
transport: StreamTransport::Tcp(TcpStreamContext { rtp_channel_id: 0 }),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn anvpiz_sdp() {
|
fn anvpiz_sdp() {
|
||||||
let url = Url::parse("rtsp://127.0.0.1/").unwrap();
|
let url = Url::parse("rtsp://127.0.0.1/").unwrap();
|
||||||
@ -771,11 +792,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(0));
|
assert_eq!(setup_response.channel_id, Some(0));
|
||||||
assert_eq!(setup_response.ssrc, Some(0x30a98ee7));
|
assert_eq!(setup_response.ssrc, Some(0x30a98ee7));
|
||||||
p.streams[0].state = StreamState::Init(StreamStateInit {
|
p.streams[0].state = dummy_stream_state_init(Some(0x30a98ee7));
|
||||||
ssrc: setup_response.ssrc,
|
|
||||||
initial_seq: None,
|
|
||||||
initial_rtptime: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
// PLAY.
|
// PLAY.
|
||||||
super::parse_play(&response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap();
|
super::parse_play(&response(include_bytes!("testdata/dahua_play.txt")), &mut p).unwrap();
|
||||||
@ -873,11 +890,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(0));
|
assert_eq!(setup_response.channel_id, Some(0));
|
||||||
assert_eq!(setup_response.ssrc, Some(0x4cacc3d1));
|
assert_eq!(setup_response.ssrc, Some(0x4cacc3d1));
|
||||||
p.streams[0].state = StreamState::Init(StreamStateInit {
|
p.streams[0].state = dummy_stream_state_init(Some(0x4cacc3d1));
|
||||||
ssrc: setup_response.ssrc,
|
|
||||||
initial_seq: None,
|
|
||||||
initial_rtptime: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
// PLAY.
|
// PLAY.
|
||||||
super::parse_play(
|
super::parse_play(
|
||||||
@ -885,7 +898,7 @@ mod tests {
|
|||||||
&mut p,
|
&mut p,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
match p.streams[0].state {
|
match &p.streams[0].state {
|
||||||
StreamState::Init(state) => {
|
StreamState::Init(state) => {
|
||||||
assert_eq!(state.initial_seq, Some(24104));
|
assert_eq!(state.initial_seq, Some(24104));
|
||||||
assert_eq!(state.initial_rtptime, Some(1270711678));
|
assert_eq!(state.initial_rtptime, Some(1270711678));
|
||||||
@ -957,8 +970,8 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(0));
|
assert_eq!(setup_response.channel_id, Some(0));
|
||||||
assert_eq!(setup_response.ssrc, None);
|
assert_eq!(setup_response.ssrc, None);
|
||||||
p.streams[0].state = StreamState::Init(StreamStateInit::default());
|
p.streams[0].state = dummy_stream_state_init(None);
|
||||||
p.streams[1].state = StreamState::Init(StreamStateInit::default());
|
p.streams[1].state = dummy_stream_state_init(None);
|
||||||
|
|
||||||
// PLAY.
|
// PLAY.
|
||||||
super::parse_play(
|
super::parse_play(
|
||||||
@ -966,14 +979,14 @@ mod tests {
|
|||||||
&mut p,
|
&mut p,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
match p.streams[0].state {
|
match &p.streams[0].state {
|
||||||
StreamState::Init(state) => {
|
StreamState::Init(state) => {
|
||||||
assert_eq!(state.initial_seq, Some(16852));
|
assert_eq!(state.initial_seq, Some(16852));
|
||||||
assert_eq!(state.initial_rtptime, Some(1070938629));
|
assert_eq!(state.initial_rtptime, Some(1070938629));
|
||||||
}
|
}
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
};
|
};
|
||||||
match p.streams[1].state {
|
match &p.streams[1].state {
|
||||||
StreamState::Init(state) => {
|
StreamState::Init(state) => {
|
||||||
assert_eq!(state.initial_rtptime, Some(3075976528));
|
assert_eq!(state.initial_rtptime, Some(3075976528));
|
||||||
assert_eq!(state.ssrc, Some(0x9fc9fff8));
|
assert_eq!(state.ssrc, Some(0x9fc9fff8));
|
||||||
@ -1040,12 +1053,12 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(0));
|
assert_eq!(setup_response.channel_id, Some(0));
|
||||||
assert_eq!(setup_response.ssrc, None);
|
assert_eq!(setup_response.ssrc, None);
|
||||||
p.streams[0].state = StreamState::Init(StreamStateInit::default());
|
p.streams[0].state = dummy_stream_state_init(None);
|
||||||
p.streams[1].state = StreamState::Init(StreamStateInit::default());
|
p.streams[1].state = dummy_stream_state_init(None);
|
||||||
|
|
||||||
// PLAY.
|
// PLAY.
|
||||||
super::parse_play(&response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap();
|
super::parse_play(&response(include_bytes!("testdata/bunny_play.txt")), &mut p).unwrap();
|
||||||
match p.streams[1].state {
|
match &p.streams[1].state {
|
||||||
StreamState::Init(state) => {
|
StreamState::Init(state) => {
|
||||||
assert_eq!(state.initial_rtptime, Some(0));
|
assert_eq!(state.initial_rtptime, Some(0));
|
||||||
assert_eq!(state.initial_seq, Some(1));
|
assert_eq!(state.initial_seq, Some(1));
|
||||||
@ -1207,11 +1220,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(0));
|
assert_eq!(setup_response.channel_id, Some(0));
|
||||||
assert_eq!(setup_response.ssrc, None);
|
assert_eq!(setup_response.ssrc, None);
|
||||||
p.streams[0].state = StreamState::Init(StreamStateInit {
|
p.streams[0].state = dummy_stream_state_init(None);
|
||||||
ssrc: None,
|
|
||||||
initial_seq: None,
|
|
||||||
initial_rtptime: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
let setup_response = response(include_bytes!("testdata/gw_main_setup_audio.txt"));
|
let setup_response = response(include_bytes!("testdata/gw_main_setup_audio.txt"));
|
||||||
let setup_response = super::parse_setup(&setup_response).unwrap();
|
let setup_response = super::parse_setup(&setup_response).unwrap();
|
||||||
@ -1224,11 +1233,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(2));
|
assert_eq!(setup_response.channel_id, Some(2));
|
||||||
assert_eq!(setup_response.ssrc, None);
|
assert_eq!(setup_response.ssrc, None);
|
||||||
p.streams[1].state = StreamState::Init(StreamStateInit {
|
p.streams[1].state = dummy_stream_state_init(None);
|
||||||
ssrc: None,
|
|
||||||
initial_seq: None,
|
|
||||||
initial_rtptime: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
// PLAY.
|
// PLAY.
|
||||||
super::parse_play(
|
super::parse_play(
|
||||||
@ -1297,11 +1302,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(setup_response.channel_id, Some(0));
|
assert_eq!(setup_response.channel_id, Some(0));
|
||||||
assert_eq!(setup_response.ssrc, None);
|
assert_eq!(setup_response.ssrc, None);
|
||||||
p.streams[0].state = StreamState::Init(StreamStateInit {
|
p.streams[0].state = dummy_stream_state_init(None);
|
||||||
ssrc: None,
|
|
||||||
initial_seq: None,
|
|
||||||
initial_rtptime: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
// PLAY.
|
// PLAY.
|
||||||
super::parse_play(
|
super::parse_play(
|
||||||
|
@ -7,7 +7,9 @@ use bytes::{Buf, Bytes};
|
|||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
|
|
||||||
use crate::client::PacketItem;
|
use crate::client::PacketItem;
|
||||||
use crate::{ConnectionContext, Error, ErrorInt, PacketContext};
|
use crate::{
|
||||||
|
ConnectionContext, Error, ErrorInt, PacketContext, StreamContextRef, StreamContextRefInner,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{SessionOptions, Timeline};
|
use super::{SessionOptions, Timeline};
|
||||||
|
|
||||||
@ -102,6 +104,7 @@ impl InorderParser {
|
|||||||
pub fn rtp(
|
pub fn rtp(
|
||||||
&mut self,
|
&mut self,
|
||||||
session_options: &SessionOptions,
|
session_options: &SessionOptions,
|
||||||
|
stream_ctx: StreamContextRef,
|
||||||
tool: Option<&super::Tool>,
|
tool: Option<&super::Tool>,
|
||||||
conn_ctx: &ConnectionContext,
|
conn_ctx: &ConnectionContext,
|
||||||
pkt_ctx: &PacketContext,
|
pkt_ctx: &PacketContext,
|
||||||
@ -112,6 +115,7 @@ impl InorderParser {
|
|||||||
let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| {
|
let reader = rtp_rs::RtpReader::new(&data[..]).map_err(|e| {
|
||||||
wrap!(ErrorInt::PacketError {
|
wrap!(ErrorInt::PacketError {
|
||||||
conn_ctx: *conn_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
pkt_ctx: *pkt_ctx,
|
pkt_ctx: *pkt_ctx,
|
||||||
stream_id,
|
stream_id,
|
||||||
description: format!(
|
description: format!(
|
||||||
@ -138,12 +142,13 @@ impl InorderParser {
|
|||||||
let ssrc = reader.ssrc();
|
let ssrc = reader.ssrc();
|
||||||
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
|
let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number));
|
||||||
if matches!(self.ssrc, Some(s) if s != ssrc) {
|
if matches!(self.ssrc, Some(s) if s != ssrc) {
|
||||||
if matches!(session_options.transport, super::Transport::Tcp) {
|
if matches!(stream_ctx.0, StreamContextRefInner::Udp(_)) {
|
||||||
super::note_stale_live555_data(tool, session_options);
|
super::note_stale_live555_data(tool, session_options);
|
||||||
}
|
}
|
||||||
bail!(ErrorInt::RtpPacketError {
|
bail!(ErrorInt::RtpPacketError {
|
||||||
conn_ctx: *conn_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
pkt_ctx: *pkt_ctx,
|
pkt_ctx: *pkt_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
stream_id,
|
stream_id,
|
||||||
ssrc,
|
ssrc,
|
||||||
sequence_number,
|
sequence_number,
|
||||||
@ -155,10 +160,11 @@ impl InorderParser {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
if loss > 0x80_00 {
|
if loss > 0x80_00 {
|
||||||
if matches!(session_options.transport, super::Transport::Tcp) {
|
if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) {
|
||||||
bail!(ErrorInt::RtpPacketError {
|
bail!(ErrorInt::RtpPacketError {
|
||||||
conn_ctx: *conn_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
pkt_ctx: *pkt_ctx,
|
pkt_ctx: *pkt_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
stream_id,
|
stream_id,
|
||||||
ssrc,
|
ssrc,
|
||||||
sequence_number,
|
sequence_number,
|
||||||
@ -182,6 +188,7 @@ impl InorderParser {
|
|||||||
Err(description) => bail!(ErrorInt::RtpPacketError {
|
Err(description) => bail!(ErrorInt::RtpPacketError {
|
||||||
conn_ctx: *conn_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
pkt_ctx: *pkt_ctx,
|
pkt_ctx: *pkt_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
stream_id,
|
stream_id,
|
||||||
ssrc,
|
ssrc,
|
||||||
sequence_number,
|
sequence_number,
|
||||||
@ -193,6 +200,7 @@ impl InorderParser {
|
|||||||
let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| {
|
let payload_range = crate::as_range(&data, reader.payload()).ok_or_else(|| {
|
||||||
wrap!(ErrorInt::RtpPacketError {
|
wrap!(ErrorInt::RtpPacketError {
|
||||||
conn_ctx: *conn_ctx,
|
conn_ctx: *conn_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
pkt_ctx: *pkt_ctx,
|
pkt_ctx: *pkt_ctx,
|
||||||
stream_id,
|
stream_id,
|
||||||
ssrc,
|
ssrc,
|
||||||
@ -220,6 +228,7 @@ impl InorderParser {
|
|||||||
pub fn rtcp(
|
pub fn rtcp(
|
||||||
&mut self,
|
&mut self,
|
||||||
session_options: &SessionOptions,
|
session_options: &SessionOptions,
|
||||||
|
stream_ctx: StreamContextRef,
|
||||||
tool: Option<&super::Tool>,
|
tool: Option<&super::Tool>,
|
||||||
pkt_ctx: &PacketContext,
|
pkt_ctx: &PacketContext,
|
||||||
timeline: &mut Timeline,
|
timeline: &mut Timeline,
|
||||||
@ -247,7 +256,7 @@ impl InorderParser {
|
|||||||
|
|
||||||
let ssrc = pkt.ssrc();
|
let ssrc = pkt.ssrc();
|
||||||
if matches!(self.ssrc, Some(s) if s != ssrc) {
|
if matches!(self.ssrc, Some(s) if s != ssrc) {
|
||||||
if matches!(session_options.transport, super::Transport::Tcp) {
|
if matches!(stream_ctx.0, StreamContextRefInner::Tcp { .. }) {
|
||||||
super::note_stale_live555_data(tool, session_options);
|
super::note_stale_live555_data(tool, session_options);
|
||||||
}
|
}
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
@ -274,6 +283,10 @@ impl InorderParser {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
|
|
||||||
|
use crate::client::UdpStreamContext;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Checks dropping and logging Geovision's extra payload type 50 packets.
|
/// Checks dropping and logging Geovision's extra payload type 50 packets.
|
||||||
@ -284,10 +297,12 @@ mod tests {
|
|||||||
fn geovision_pt50_packet() {
|
fn geovision_pt50_packet() {
|
||||||
let mut timeline = Timeline::new(None, 90_000, None).unwrap();
|
let mut timeline = Timeline::new(None, 90_000, None).unwrap();
|
||||||
let mut parser = InorderParser::new(Some(0xd25614e), None);
|
let mut parser = InorderParser::new(Some(0xd25614e), None);
|
||||||
|
let stream_ctx = StreamContextRef::dummy();
|
||||||
|
|
||||||
// Normal packet.
|
// Normal packet.
|
||||||
match parser.rtp(
|
match parser.rtp(
|
||||||
&SessionOptions::default(),
|
&SessionOptions::default(),
|
||||||
|
stream_ctx,
|
||||||
None,
|
None,
|
||||||
&ConnectionContext::dummy(),
|
&ConnectionContext::dummy(),
|
||||||
&PacketContext::dummy(),
|
&PacketContext::dummy(),
|
||||||
@ -311,6 +326,7 @@ mod tests {
|
|||||||
// Mystery pt=50 packet with same sequence number.
|
// Mystery pt=50 packet with same sequence number.
|
||||||
match parser.rtp(
|
match parser.rtp(
|
||||||
&SessionOptions::default(),
|
&SessionOptions::default(),
|
||||||
|
stream_ctx,
|
||||||
None,
|
None,
|
||||||
&ConnectionContext::dummy(),
|
&ConnectionContext::dummy(),
|
||||||
&PacketContext::dummy(),
|
&PacketContext::dummy(),
|
||||||
@ -336,10 +352,17 @@ mod tests {
|
|||||||
fn out_of_order() {
|
fn out_of_order() {
|
||||||
let mut timeline = Timeline::new(None, 90_000, None).unwrap();
|
let mut timeline = Timeline::new(None, 90_000, None).unwrap();
|
||||||
let mut parser = InorderParser::new(Some(0xd25614e), None);
|
let mut parser = InorderParser::new(Some(0xd25614e), None);
|
||||||
|
let udp = UdpStreamContext {
|
||||||
let session_options = SessionOptions::default().transport(crate::client::Transport::Udp);
|
local_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||||
|
peer_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||||
|
local_rtp_port: 0,
|
||||||
|
peer_rtp_port: 0,
|
||||||
|
};
|
||||||
|
let stream_ctx = StreamContextRef(StreamContextRefInner::Udp(&udp));
|
||||||
|
let session_options = SessionOptions::default();
|
||||||
match parser.rtp(
|
match parser.rtp(
|
||||||
&session_options,
|
&session_options,
|
||||||
|
stream_ctx,
|
||||||
None,
|
None,
|
||||||
&ConnectionContext::dummy(),
|
&ConnectionContext::dummy(),
|
||||||
&PacketContext::dummy(),
|
&PacketContext::dummy(),
|
||||||
@ -364,6 +387,7 @@ mod tests {
|
|||||||
|
|
||||||
match parser.rtp(
|
match parser.rtp(
|
||||||
&session_options,
|
&session_options,
|
||||||
|
stream_ctx,
|
||||||
None,
|
None,
|
||||||
&ConnectionContext::dummy(),
|
&ConnectionContext::dummy(),
|
||||||
&PacketContext::dummy(),
|
&PacketContext::dummy(),
|
||||||
@ -386,6 +410,7 @@ mod tests {
|
|||||||
|
|
||||||
match parser.rtp(
|
match parser.rtp(
|
||||||
&session_options,
|
&session_options,
|
||||||
|
stream_ctx,
|
||||||
None,
|
None,
|
||||||
&ConnectionContext::dummy(),
|
&ConnectionContext::dummy(),
|
||||||
&PacketContext::dummy(),
|
&PacketContext::dummy(),
|
||||||
|
135
src/codec/aac.rs
135
src/codec/aac.rs
@ -21,7 +21,7 @@ use std::{
|
|||||||
num::{NonZeroU16, NonZeroU32},
|
num::{NonZeroU16, NonZeroU32},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error};
|
use crate::{client::rtp::Packet, error::ErrorInt, ConnectionContext, Error, StreamContextRef};
|
||||||
|
|
||||||
use super::CodecItem;
|
use super::CodecItem;
|
||||||
|
|
||||||
@ -670,6 +670,7 @@ impl Depacketizer {
|
|||||||
pub(super) fn pull(
|
pub(super) fn pull(
|
||||||
&mut self,
|
&mut self,
|
||||||
conn_ctx: &ConnectionContext,
|
conn_ctx: &ConnectionContext,
|
||||||
|
stream_ctx: StreamContextRef,
|
||||||
) -> Result<Option<super::CodecItem>, Error> {
|
) -> Result<Option<super::CodecItem>, Error> {
|
||||||
match std::mem::take(&mut self.state) {
|
match std::mem::take(&mut self.state) {
|
||||||
s @ DepacketizerState::Idle { .. } | s @ DepacketizerState::Fragmented(..) => {
|
s @ DepacketizerState::Idle { .. } | s @ DepacketizerState::Fragmented(..) => {
|
||||||
@ -692,6 +693,7 @@ impl Depacketizer {
|
|||||||
// says "receivers MUST support de-interleaving".
|
// says "receivers MUST support de-interleaving".
|
||||||
return Err(error(
|
return Err(error(
|
||||||
*conn_ctx,
|
*conn_ctx,
|
||||||
|
stream_ctx,
|
||||||
agg,
|
agg,
|
||||||
"interleaving not yet supported".to_owned(),
|
"interleaving not yet supported".to_owned(),
|
||||||
));
|
));
|
||||||
@ -701,6 +703,7 @@ impl Depacketizer {
|
|||||||
if agg.frame_count != 1 {
|
if agg.frame_count != 1 {
|
||||||
return Err(error(
|
return Err(error(
|
||||||
*conn_ctx,
|
*conn_ctx,
|
||||||
|
stream_ctx,
|
||||||
agg,
|
agg,
|
||||||
"fragmented AUs must not share packets".to_owned(),
|
"fragmented AUs must not share packets".to_owned(),
|
||||||
));
|
));
|
||||||
@ -719,6 +722,7 @@ impl Depacketizer {
|
|||||||
}
|
}
|
||||||
return Err(error(
|
return Err(error(
|
||||||
*conn_ctx,
|
*conn_ctx,
|
||||||
|
stream_ctx,
|
||||||
agg,
|
agg,
|
||||||
"mark can't be set on beginning of fragment".to_owned(),
|
"mark can't be set on beginning of fragment".to_owned(),
|
||||||
));
|
));
|
||||||
@ -737,6 +741,7 @@ impl Depacketizer {
|
|||||||
if !agg.mark {
|
if !agg.mark {
|
||||||
return Err(error(
|
return Err(error(
|
||||||
*conn_ctx,
|
*conn_ctx,
|
||||||
|
stream_ctx,
|
||||||
agg,
|
agg,
|
||||||
"mark must be set on non-fragmented au".to_owned(),
|
"mark must be set on non-fragmented au".to_owned(),
|
||||||
));
|
));
|
||||||
@ -756,6 +761,7 @@ impl Depacketizer {
|
|||||||
None => {
|
None => {
|
||||||
return Err(error(
|
return Err(error(
|
||||||
*conn_ctx,
|
*conn_ctx,
|
||||||
|
stream_ctx,
|
||||||
agg,
|
agg,
|
||||||
format!(
|
format!(
|
||||||
"aggregate timestamp {} + {} overflows",
|
"aggregate timestamp {} + {} overflows",
|
||||||
@ -778,9 +784,15 @@ impl Depacketizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error(conn_ctx: ConnectionContext, agg: Aggregate, description: String) -> Error {
|
fn error(
|
||||||
|
conn_ctx: ConnectionContext,
|
||||||
|
stream_ctx: StreamContextRef,
|
||||||
|
agg: Aggregate,
|
||||||
|
description: String,
|
||||||
|
) -> Error {
|
||||||
Error(std::sync::Arc::new(ErrorInt::RtpPacketError {
|
Error(std::sync::Arc::new(ErrorInt::RtpPacketError {
|
||||||
conn_ctx,
|
conn_ctx,
|
||||||
|
stream_ctx: stream_ctx.to_owned(),
|
||||||
pkt_ctx: agg.ctx,
|
pkt_ctx: agg.ctx,
|
||||||
stream_id: agg.stream_id,
|
stream_id: agg.stream_id,
|
||||||
ssrc: agg.ssrc,
|
ssrc: agg.ssrc,
|
||||||
@ -840,13 +852,19 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.timestamp, timestamp);
|
assert_eq!(a.timestamp, timestamp);
|
||||||
assert_eq!(&a.data[..], b"asdf");
|
assert_eq!(&a.data[..], b"asdf");
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Aggregate of 3 frames.
|
// Aggregate of 3 frames.
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
@ -868,25 +886,37 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.timestamp, timestamp);
|
assert_eq!(a.timestamp, timestamp);
|
||||||
assert_eq!(&a.data[..], b"foo");
|
assert_eq!(&a.data[..], b"foo");
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.timestamp, timestamp.try_add(1_024).unwrap());
|
assert_eq!(a.timestamp, timestamp.try_add(1_024).unwrap());
|
||||||
assert_eq!(&a.data[..], b"bar");
|
assert_eq!(&a.data[..], b"bar");
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.timestamp, timestamp.try_add(2_048).unwrap());
|
assert_eq!(a.timestamp, timestamp.try_add(2_048).unwrap());
|
||||||
assert_eq!(&a.data[..], b"baz");
|
assert_eq!(&a.data[..], b"baz");
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Fragment across 3 packets.
|
// Fragment across 3 packets.
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
@ -907,7 +937,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
// fragment 2/3.
|
// fragment 2/3.
|
||||||
ctx: crate::PacketContext::dummy(),
|
ctx: crate::PacketContext::dummy(),
|
||||||
@ -926,7 +959,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
// fragment 3/3.
|
// fragment 3/3.
|
||||||
ctx: crate::PacketContext::dummy(),
|
ctx: crate::PacketContext::dummy(),
|
||||||
@ -945,13 +981,19 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.timestamp, timestamp);
|
assert_eq!(a.timestamp, timestamp);
|
||||||
assert_eq!(&a.data[..], b"foobarbaz");
|
assert_eq!(&a.data[..], b"foobarbaz");
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tests that depacketization skips/reports a frame in which its first packet was lost.
|
/// Tests that depacketization skips/reports a frame in which its first packet was lost.
|
||||||
@ -986,7 +1028,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
ctx: crate::PacketContext::dummy(),
|
ctx: crate::PacketContext::dummy(),
|
||||||
stream_id: 0,
|
stream_id: 0,
|
||||||
@ -1004,7 +1049,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Following frame reports the loss.
|
// Following frame reports the loss.
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
@ -1025,13 +1073,19 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.loss, 1);
|
assert_eq!(a.loss, 1);
|
||||||
assert_eq!(&a.data[..], b"asdf");
|
assert_eq!(&a.data[..], b"asdf");
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tests that depacketization skips/reports a frame in which an interior frame is lost.
|
/// Tests that depacketization skips/reports a frame in which an interior frame is lost.
|
||||||
@ -1066,7 +1120,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
// Fragment 2/3 is lost
|
// Fragment 2/3 is lost
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
// 3/3 reports the loss
|
// 3/3 reports the loss
|
||||||
@ -1086,7 +1143,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Following frame reports the loss.
|
// Following frame reports the loss.
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
@ -1107,13 +1167,19 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.loss, 1);
|
assert_eq!(a.loss, 1);
|
||||||
assert_eq!(&a.data[..], b"asdf");
|
assert_eq!(&a.data[..], b"asdf");
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tests that depacketization skips/reports a frame in which the interior frame is lost.
|
/// Tests that depacketization skips/reports a frame in which the interior frame is lost.
|
||||||
@ -1148,7 +1214,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
// Fragment 2/3 is lost
|
// Fragment 2/3 is lost
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
// 3/3 reports the loss
|
// 3/3 reports the loss
|
||||||
@ -1168,7 +1237,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Following frame reports the loss.
|
// Following frame reports the loss.
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
@ -1189,13 +1261,19 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let a = match d.pull(&ConnectionContext::dummy()).unwrap() {
|
let a = match d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
Some(CodecItem::AudioFrame(a)) => a,
|
Some(CodecItem::AudioFrame(a)) => a,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
assert_eq!(a.loss, 1);
|
assert_eq!(a.loss, 1);
|
||||||
assert_eq!(&a.data[..], b"asdf");
|
assert_eq!(&a.data[..], b"asdf");
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tests the distinction between `loss` and `loss_since_last_mark`.
|
/// Tests the distinction between `loss` and `loss_since_last_mark`.
|
||||||
@ -1233,7 +1311,10 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(d.pull(&ConnectionContext::dummy()).unwrap().is_none());
|
assert!(d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Incomplete fragment with no reported loss.
|
// Incomplete fragment with no reported loss.
|
||||||
d.push(Packet {
|
d.push(Packet {
|
||||||
@ -1254,7 +1335,9 @@ mod tests {
|
|||||||
]),
|
]),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = d.pull(&ConnectionContext::dummy()).unwrap_err();
|
let e = d
|
||||||
|
.pull(&ConnectionContext::dummy(), StreamContextRef::dummy())
|
||||||
|
.unwrap_err();
|
||||||
let e_str = e.to_string();
|
let e_str = e.to_string();
|
||||||
assert!(
|
assert!(
|
||||||
e_str.contains("mark can't be set on beginning of fragment"),
|
e_str.contains("mark can't be set on beginning of fragment"),
|
||||||
|
@ -12,6 +12,7 @@ use std::num::{NonZeroU16, NonZeroU32};
|
|||||||
use crate::client::rtp;
|
use crate::client::rtp;
|
||||||
use crate::ConnectionContext;
|
use crate::ConnectionContext;
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
use crate::StreamContextRef;
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
|
|
||||||
pub(crate) mod aac;
|
pub(crate) mod aac;
|
||||||
@ -470,9 +471,13 @@ impl Depacketizer {
|
|||||||
///
|
///
|
||||||
/// Some packetization formats support aggregating multiple frames into one packet, so a single
|
/// Some packetization formats support aggregating multiple frames into one packet, so a single
|
||||||
/// `push` call may cause `pull` to return `Ok(Some(...))` more than once.
|
/// `push` call may cause `pull` to return `Ok(Some(...))` more than once.
|
||||||
pub fn pull(&mut self, conn_ctx: &ConnectionContext) -> Result<Option<CodecItem>, Error> {
|
pub fn pull(
|
||||||
|
&mut self,
|
||||||
|
conn_ctx: &ConnectionContext,
|
||||||
|
stream_ctx: StreamContextRef,
|
||||||
|
) -> Result<Option<CodecItem>, Error> {
|
||||||
match &mut self.0 {
|
match &mut self.0 {
|
||||||
DepacketizerInner::Aac(d) => d.pull(conn_ctx),
|
DepacketizerInner::Aac(d) => d.pull(conn_ctx, stream_ctx),
|
||||||
DepacketizerInner::G723(d) => Ok(d.pull()),
|
DepacketizerInner::G723(d) => Ok(d.pull()),
|
||||||
DepacketizerInner::H264(d) => Ok(d.pull()),
|
DepacketizerInner::H264(d) => Ok(d.pull()),
|
||||||
DepacketizerInner::Onvif(d) => Ok(d.pull()),
|
DepacketizerInner::Onvif(d) => Ok(d.pull()),
|
||||||
|
33
src/error.rs
33
src/error.rs
@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
use std::{fmt::Display, sync::Arc};
|
use std::{fmt::Display, sync::Arc};
|
||||||
|
|
||||||
use crate::{ConnectionContext, PacketContext, RtspMessageContext};
|
use crate::{ConnectionContext, PacketContext, RtspMessageContext, StreamContext, WallTime};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
@ -40,15 +40,15 @@ pub(crate) enum ErrorInt {
|
|||||||
InvalidArgument(String),
|
InvalidArgument(String),
|
||||||
|
|
||||||
/// Unparseable or unexpected RTSP message.
|
/// Unparseable or unexpected RTSP message.
|
||||||
#[error("[{conn_ctx}, {msg_ctx}] RTSP framing error: {description}")]
|
#[error("RTSP framing error: {description}\n\nconn: {conn_ctx}\nmsg: {msg_ctx}")]
|
||||||
RtspFramingError {
|
RtspFramingError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
msg_ctx: RtspMessageContext,
|
msg_ctx: RtspMessageContext,
|
||||||
description: String,
|
description: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[error("[{conn_ctx}, {msg_ctx}] {status} response to {} CSeq={cseq}: \
|
#[error("{status} response to {} CSeq={cseq}: {description}\n\n\
|
||||||
{description}", Into::<&str>::into(.method))]
|
conn: {conn_ctx}\nmsg: {msg_ctx}", Into::<&str>::into(.method))]
|
||||||
RtspResponseError {
|
RtspResponseError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
msg_ctx: RtspMessageContext,
|
msg_ctx: RtspMessageContext,
|
||||||
@ -59,8 +59,8 @@ pub(crate) enum ErrorInt {
|
|||||||
},
|
},
|
||||||
|
|
||||||
#[error(
|
#[error(
|
||||||
"[{conn_ctx}, {msg_ctx}] Received interleaved data on unassigned channel {channel_id}: \n\
|
"Received interleaved data on unassigned channel {channel_id}: \n\
|
||||||
{:?}",
|
{:?}\n\nconn: {conn_ctx}\nmsg: {msg_ctx}",
|
||||||
crate::hex::LimitedHex::new(data, 64)
|
crate::hex::LimitedHex::new(data, 64)
|
||||||
)]
|
)]
|
||||||
RtspUnassignedChannelError {
|
RtspUnassignedChannelError {
|
||||||
@ -70,20 +70,23 @@ pub(crate) enum ErrorInt {
|
|||||||
data: Bytes,
|
data: Bytes,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[error("[{conn_ctx}, {pkt_ctx} stream {stream_id}]: {description}")]
|
#[error("{description}\n\nconn: {conn_ctx}\nstream: {stream_ctx}\npkt: {pkt_ctx}")]
|
||||||
PacketError {
|
PacketError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
|
stream_ctx: StreamContext,
|
||||||
pkt_ctx: PacketContext,
|
pkt_ctx: PacketContext,
|
||||||
stream_id: usize,
|
stream_id: usize,
|
||||||
description: String,
|
description: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[error(
|
#[error(
|
||||||
"[{conn_ctx}, {pkt_ctx}, stream={stream_id}, ssrc={ssrc:08x}, \
|
"{description}\n\n\
|
||||||
seq={sequence_number:08x}] {description}"
|
conn: {conn_ctx}\nstream: {stream_ctx}\n\
|
||||||
|
ssrc: {ssrc:08x}\nseq: {sequence_number:08x}\npkt: {pkt_ctx}"
|
||||||
)]
|
)]
|
||||||
RtpPacketError {
|
RtpPacketError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
|
stream_ctx: StreamContext,
|
||||||
pkt_ctx: crate::PacketContext,
|
pkt_ctx: crate::PacketContext,
|
||||||
stream_id: usize,
|
stream_id: usize,
|
||||||
ssrc: u32,
|
ssrc: u32,
|
||||||
@ -94,21 +97,25 @@ pub(crate) enum ErrorInt {
|
|||||||
#[error("Unable to connect to RTSP server: {0}")]
|
#[error("Unable to connect to RTSP server: {0}")]
|
||||||
ConnectError(#[source] std::io::Error),
|
ConnectError(#[source] std::io::Error),
|
||||||
|
|
||||||
#[error("[{conn_ctx}, {msg_ctx}] Error reading from RTSP peer: {source}")]
|
#[error("Error reading from RTSP peer: {source}\n\nconn: {conn_ctx}\nmsg: {msg_ctx}")]
|
||||||
RtspReadError {
|
RtspReadError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
msg_ctx: RtspMessageContext,
|
msg_ctx: RtspMessageContext,
|
||||||
source: std::io::Error,
|
source: std::io::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[error("[{conn_ctx}, {pkt_ctx}] Error receiving UDP packet: {source}")]
|
#[error(
|
||||||
|
"Error receiving UDP packet: {source}\n\n\
|
||||||
|
conn: {conn_ctx}\nstream: {stream_ctx}\nat: {when}"
|
||||||
|
)]
|
||||||
UdpRecvError {
|
UdpRecvError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
pkt_ctx: PacketContext,
|
stream_ctx: StreamContext,
|
||||||
|
when: WallTime,
|
||||||
source: std::io::Error,
|
source: std::io::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[error("[{conn_ctx}] Error writing to RTSP peer: {source}")]
|
#[error("Error writing to RTSP peer: {source}\n\nconn: {conn_ctx}")]
|
||||||
WriteError {
|
WriteError {
|
||||||
conn_ctx: ConnectionContext,
|
conn_ctx: ConnectionContext,
|
||||||
source: std::io::Error,
|
source: std::io::Error,
|
||||||
|
128
src/lib.rs
128
src/lib.rs
@ -330,6 +330,107 @@ impl Display for RtspMessageContext {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Context for an active stream (RTP+RTCP session), either TCP or UDP. Owned version.
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct StreamContext(StreamContextInner);
|
||||||
|
|
||||||
|
impl StreamContext {
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn dummy() -> Self {
|
||||||
|
StreamContext(StreamContextInner::Dummy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for StreamContext {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match &self.0 {
|
||||||
|
StreamContextInner::Tcp(tcp) => {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"TCP, interleaved channel ids {}-{}",
|
||||||
|
tcp.rtp_channel_id,
|
||||||
|
tcp.rtp_channel_id + 1
|
||||||
|
)
|
||||||
|
}
|
||||||
|
StreamContextInner::Udp(udp) => Display::fmt(udp, f),
|
||||||
|
StreamContextInner::Dummy => write!(f, "dummy"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
enum StreamContextInner {
|
||||||
|
Tcp(TcpStreamContext),
|
||||||
|
Udp(UdpStreamContext),
|
||||||
|
Dummy,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Context for an active stream (RTP+RTCP session), either TCP or UDP. Reference version.
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct StreamContextRef<'a>(StreamContextRefInner<'a>);
|
||||||
|
|
||||||
|
impl StreamContextRef<'_> {
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn dummy() -> Self {
|
||||||
|
StreamContextRef(StreamContextRefInner::Dummy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
enum StreamContextRefInner<'a> {
|
||||||
|
Tcp(TcpStreamContext), // this one is by value because TcpStreamContext is small.
|
||||||
|
Udp(&'a UdpStreamContext),
|
||||||
|
Dummy,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StreamContextRef<'_> {
|
||||||
|
fn to_owned(self) -> StreamContext {
|
||||||
|
StreamContext(match self.0 {
|
||||||
|
StreamContextRefInner::Tcp(tcp) => StreamContextInner::Tcp(tcp),
|
||||||
|
StreamContextRefInner::Udp(udp) => StreamContextInner::Udp(*udp),
|
||||||
|
StreamContextRefInner::Dummy => StreamContextInner::Dummy,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Context for a UDP stream (aka UDP-based RTP transport). Unstable/internal. Exposed for benchmarks.
|
||||||
|
///
|
||||||
|
/// This stores only the RTP addresses; the RTCP addresses are assumed to use
|
||||||
|
/// the same IP and one port higher.
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct UdpStreamContext {
|
||||||
|
local_ip: IpAddr,
|
||||||
|
peer_ip: IpAddr,
|
||||||
|
local_rtp_port: u16,
|
||||||
|
peer_rtp_port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Context for a TCP stream. Unstable/internal. Exposed for benchmarks.
|
||||||
|
///
|
||||||
|
/// This stores only the RTP channel id; the RTCP channel id is assumed to be one higher.
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct TcpStreamContext {
|
||||||
|
rtp_channel_id: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for UdpStreamContext {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
// TODO: this assumes we are the client. Revisit when adding server support.
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"{}:{}-{}(me) -> {}:{}-{}",
|
||||||
|
self.local_ip,
|
||||||
|
self.local_rtp_port,
|
||||||
|
self.local_rtp_port + 1,
|
||||||
|
self.peer_ip,
|
||||||
|
self.peer_rtp_port,
|
||||||
|
self.peer_rtp_port + 1
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Context for an RTP or RTCP packet, received either via RTSP interleaved data or UDP.
|
/// Context for an RTP or RTCP packet, received either via RTSP interleaved data or UDP.
|
||||||
///
|
///
|
||||||
/// Should be paired with an [`ConnectionContext`] of the RTSP connection that started
|
/// Should be paired with an [`ConnectionContext`] of the RTSP connection that started
|
||||||
@ -347,35 +448,16 @@ impl PacketContext {
|
|||||||
|
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
enum PacketContextInner {
|
enum PacketContextInner {
|
||||||
Tcp {
|
Tcp { msg_ctx: RtspMessageContext },
|
||||||
msg_ctx: RtspMessageContext,
|
Udp { received_wall: WallTime },
|
||||||
channel_id: u8,
|
|
||||||
},
|
|
||||||
Udp {
|
|
||||||
local_addr: SocketAddr,
|
|
||||||
peer_addr: SocketAddr,
|
|
||||||
received_wall: WallTime,
|
|
||||||
},
|
|
||||||
Dummy,
|
Dummy,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for PacketContext {
|
impl Display for PacketContext {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self.0 {
|
match self.0 {
|
||||||
PacketContextInner::Udp {
|
PacketContextInner::Udp { received_wall } => std::fmt::Display::fmt(&received_wall, f),
|
||||||
local_addr,
|
PacketContextInner::Tcp { msg_ctx } => std::fmt::Display::fmt(&msg_ctx, f),
|
||||||
peer_addr,
|
|
||||||
received_wall,
|
|
||||||
..
|
|
||||||
} => {
|
|
||||||
write!(f, "{}->{}@{}", peer_addr, local_addr, received_wall)
|
|
||||||
}
|
|
||||||
PacketContextInner::Tcp {
|
|
||||||
msg_ctx,
|
|
||||||
channel_id,
|
|
||||||
} => {
|
|
||||||
write!(f, "{} ch={}", msg_ctx, channel_id)
|
|
||||||
}
|
|
||||||
PacketContextInner::Dummy => write!(f, "dummy"),
|
PacketContextInner::Dummy => write!(f, "dummy"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user