build fix: add missing files

This commit is contained in:
Scott Lamb 2021-07-08 11:37:03 -07:00
parent e9a5e4e34a
commit 29c80c6f0e
2 changed files with 400 additions and 0 deletions

109
src/error.rs Normal file
View File

@ -0,0 +1,109 @@
use std::fmt::Display;
use crate::{ConnectionContext, RtspMessageContext};
use thiserror::Error;
/// An opaque `std::error::Error + Send + Sync + 'static` implementation.
///
/// Currently the focus is on providing detailed human-readable error messages.
/// In most cases they have enough information to find the offending packet
/// in Wireshark.
///
/// If you wish to inspect Retina errors programmatically, or if you need
/// errors formatted in a different way, please file an issue on the `retina`
/// repository.
pub struct Error(pub(crate) Box<ErrorInt>);
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.0, f)
}
}
impl std::error::Error for Error {}
#[derive(Debug, Error)]
pub(crate) enum ErrorInt {
/// The method's caller provided an invalid argument.
#[error("Invalid argument: {0}")]
InvalidArgument(String),
/// Unparseable or unexpected RTSP message.
#[error("[{conn_ctx}, {msg_ctx}] RTSP framing error: {description}")]
RtspFramingError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
description: String,
},
#[error("[{conn_ctx}, {msg_ctx}] {status} response to {} CSeq={cseq}: \
{description}", Into::<&str>::into(.method))]
RtspResponseError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
method: rtsp_types::Method,
cseq: u32,
status: rtsp_types::StatusCode,
description: String,
},
#[error(
"[{conn_ctx}, {msg_ctx}] Received interleaved data on unassigned channel {channel_id}"
)]
RtspUnassignedChannelError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
channel_id: u8,
},
#[error("[{conn_ctx}, {msg_ctx} channel {channel_id} stream {stream_id}] RTSP data message: {description}")]
RtspDataMessageError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
channel_id: u8,
stream_id: usize,
description: String,
},
#[error(
"[{conn_ctx}, {msg_ctx}, channel={channel_id}, stream={stream_id}, ssrc={ssrc:08x}] \
{description}"
)]
RtpPacketError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
channel_id: u8,
stream_id: usize,
ssrc: u32,
sequence_number: u16,
description: String,
},
#[error("Unable to connect to RTSP server: {0}")]
ConnectError(#[source] std::io::Error),
#[error("[{conn_ctx}, {msg_ctx}] Error reading from RTSP peer: {source}")]
ReadError {
conn_ctx: ConnectionContext,
msg_ctx: RtspMessageContext,
source: std::io::Error,
},
#[error("[{conn_ctx}] Error writing to RTSP peer: {source}")]
WriteError {
conn_ctx: ConnectionContext,
source: std::io::Error,
},
#[error("Failed precondition: {0}")]
FailedPrecondition(String),
#[error("Internal error: {0}")]
Internal(#[source] Box<dyn std::error::Error + Send + Sync>),
}

291
src/tokio.rs Normal file
View File

@ -0,0 +1,291 @@
//! tokio-based [`Connection`].
//!
//! In theory there could be a similar async-std-based implementation.
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Sink, SinkExt, Stream, StreamExt};
use pretty_hex::PrettyHex;
use rtsp_types::{Data, Message};
use std::convert::TryFrom;
use std::time::Instant;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use url::Host;
use crate::{Error, ErrorInt, RtspMessageContext};
use super::{ConnectionContext, ReceivedMessage, WallTime};
/// A RTSP connection which implements `Stream`, `Sink`, and `Unpin`.
pub(crate) struct Connection(Framed<TcpStream, Codec>);
impl Connection {
pub(crate) async fn connect(host: Host<&str>, port: u16) -> Result<Self, std::io::Error> {
let stream = match host {
Host::Domain(h) => TcpStream::connect((h, port)).await,
Host::Ipv4(h) => TcpStream::connect((h, port)).await,
Host::Ipv6(h) => TcpStream::connect((h, port)).await,
}?;
let established_wall = WallTime::now();
let established = Instant::now();
let local_addr = stream.local_addr()?;
let peer_addr = stream.peer_addr()?;
Ok(Self(Framed::new(
stream,
Codec {
ctx: ConnectionContext {
local_addr,
peer_addr,
established_wall,
established,
},
read_pos: 0,
},
)))
}
pub(crate) fn ctx(&self) -> &ConnectionContext {
&self.0.codec().ctx
}
pub(crate) fn eof_ctx(&self) -> RtspMessageContext {
RtspMessageContext {
pos: self.0.codec().read_pos
+ u64::try_from(self.0.read_buffer().remaining()).expect("usize fits in u64"),
received_wall: WallTime::now(),
received: Instant::now(),
}
}
fn wrap_write_err(&self, e: CodecError) -> ErrorInt {
match e {
CodecError::IoError(source) => ErrorInt::WriteError {
conn_ctx: *self.ctx(),
source,
},
CodecError::ParseError { .. } => unreachable!(),
}
}
}
impl Stream for Connection {
type Item = Result<ReceivedMessage, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx).map_err(|e| {
wrap!(match e {
CodecError::IoError(error) => ErrorInt::ReadError {
conn_ctx: *self.ctx(),
msg_ctx: self.eof_ctx(),
source: error,
},
CodecError::ParseError { description, pos } => ErrorInt::RtspFramingError {
conn_ctx: *self.ctx(),
msg_ctx: RtspMessageContext {
pos,
received_wall: WallTime::now(),
received: Instant::now(),
},
description,
},
})
})
}
}
impl Sink<Message<Bytes>> for Connection {
type Error = ErrorInt;
fn poll_ready(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0
.poll_ready_unpin(cx)
.map_err(|e| self.wrap_write_err(e))
}
fn start_send(
mut self: std::pin::Pin<&mut Self>,
item: Message<Bytes>,
) -> Result<(), Self::Error> {
self.0
.start_send_unpin(item)
.map_err(|e| self.wrap_write_err(e))
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0
.poll_flush_unpin(cx)
.map_err(|e| self.wrap_write_err(e))
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0
.poll_close_unpin(cx)
.map_err(|e| self.wrap_write_err(e))
}
}
/// Encodes and decodes RTSP messages.
struct Codec {
ctx: ConnectionContext,
/// Number of bytes read and processed (drained from the input buffer).
read_pos: u64,
}
/// An intermediate error type that exists because [`Framed`] expects the
/// codec's error type to implement `From<std::io::Error>`, and [`Error`]
/// takes additional context.
enum CodecError {
IoError(std::io::Error),
ParseError { description: String, pos: u64 },
}
impl std::convert::From<std::io::Error> for CodecError {
fn from(e: std::io::Error) -> Self {
CodecError::IoError(e)
}
}
impl Codec {
fn parse_msg(&self, src: &mut BytesMut) -> Result<Option<(usize, Message<Bytes>)>, CodecError> {
if !src.is_empty() && src[0] == b'$' {
// Fast path for interleaved data, avoiding MessageRef -> Message<&[u8]> ->
// Message<Bytes> conversion. This speeds things up quite a bit in practice,
// avoiding a bunch of memmove calls.
if src.len() < 4 {
return Ok(None);
}
let channel_id = src[1];
let len = 4 + usize::from(u16::from_be_bytes([src[2], src[3]]));
if src.len() < len {
src.reserve(len - src.len());
return Ok(None);
}
let mut msg = src.split_to(len);
msg.advance(4);
return Ok(Some((
len,
Message::Data(Data::new(channel_id, msg.freeze())),
)));
}
let (msg, len): (Message<&[u8]>, _) = match Message::parse(src) {
Ok((m, l)) => (m, l),
Err(rtsp_types::ParseError::Error) => {
const MAX_DUMP_BYTES: usize = 128;
let conf = pretty_hex::HexConfig {
title: false,
..Default::default()
};
if src.len() > MAX_DUMP_BYTES {
return Err(CodecError::ParseError {
description: format!(
"Invalid RTSP message; next {} of {} buffered bytes are:\n{:#?}",
MAX_DUMP_BYTES,
src.len(),
(&src[0..MAX_DUMP_BYTES]).hex_conf(conf)
),
pos: self.read_pos,
});
}
return Err(CodecError::ParseError {
description: format!(
"Invalid RTSP message; next {} buffered bytes are:\n{:#?}",
MAX_DUMP_BYTES,
src.hex_conf(conf)
),
pos: self.read_pos,
});
}
Err(rtsp_types::ParseError::Incomplete) => return Ok(None),
};
// Map msg's body to a Bytes representation and advance `src`. Awkward:
// 1. lifetime concerns require mapping twice: first so the message
// doesn't depend on the BytesMut, which needs to be split/advanced;
// then to get the proper Bytes body in place post-split.
// 2. rtsp_types messages must be AsRef<[u8]>, so we can't use the
// range as an intermediate body.
// 3. within a match because the rtsp_types::Message enum itself
// doesn't have body/replace_body/map_body methods.
let msg = match msg {
Message::Request(msg) => {
let body_range = crate::as_range(src, msg.body());
let msg = msg.replace_body(rtsp_types::Empty);
if let Some(r) = body_range {
let mut raw_msg = src.split_to(len);
raw_msg.advance(r.start);
raw_msg.truncate(r.len());
Message::Request(msg.replace_body(raw_msg.freeze()))
} else {
src.advance(len);
Message::Request(msg.replace_body(Bytes::new()))
}
}
Message::Response(msg) => {
let body_range = crate::as_range(src, msg.body());
let msg = msg.replace_body(rtsp_types::Empty);
if let Some(r) = body_range {
let mut raw_msg = src.split_to(len);
raw_msg.advance(r.start);
raw_msg.truncate(r.len());
Message::Response(msg.replace_body(raw_msg.freeze()))
} else {
src.advance(len);
Message::Response(msg.replace_body(Bytes::new()))
}
}
Message::Data(_) => unreachable!(),
};
Ok(Some((len, msg)))
}
}
impl tokio_util::codec::Decoder for Codec {
type Item = ReceivedMessage;
type Error = CodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let (len, msg) = match self.parse_msg(src) {
Err(e) => return Err(e),
Ok(None) => return Ok(None),
Ok(Some((len, msg))) => (len, msg),
};
let msg = ReceivedMessage {
msg,
ctx: RtspMessageContext {
pos: self.read_pos,
received_wall: WallTime::now(),
received: Instant::now(),
},
};
self.read_pos += u64::try_from(len).expect("usize fits in u64");
Ok(Some(msg))
}
}
impl tokio_util::codec::Encoder<rtsp_types::Message<Bytes>> for Codec {
type Error = CodecError;
fn encode(
&mut self,
item: rtsp_types::Message<Bytes>,
mut dst: &mut BytesMut,
) -> Result<(), Self::Error> {
item.write(&mut (&mut dst).writer())
.expect("BufMut Writer is infallible");
Ok(())
}
}