retina/src/rtp.rs

346 lines
11 KiB
Rust
Raw Normal View History

// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
// SPDX-License-Identifier: MIT OR Apache-2.0
//! Handles RTP data as described in
//! [RFC 3550 section 5.1](https://datatracker.ietf.org/doc/html/rfc3550#section-5.1).
use std::convert::TryFrom;
use std::ops::Range;
use bytes::{Buf, Bytes};
use crate::{PacketContext, Timestamp};
/// The minimum length of an RTP header (no CSRCs or extensions).
const MIN_HEADER_LEN: u16 = 12;
/// Raw packet without state-specific interpretation or metadata.
///
/// This design is inspired by [`rtp-rs`](https://crates.io/crates/rtp-rs) in
/// that it primarily validates a raw buffer then provides accessors for it.
/// Some differences though:
///
/// * allows keeping around the payload range (determined during
/// construction/validation) as a `Range<u16>` , rather than reconstructing
/// on later accesses.
/// * currently owns the `Bytes`, although this design may change when/if we
/// [redo the buffering
/// model](https://github.com/scottlamb/retina/issues/6).
/// * directly exposes the sequence number as a `u16`, rather than having an
/// extra type that I find awkward to work with.
pub(crate) struct RawPacket(
/// Full packet data, including headers.
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// |V=2|P|X| CC |M| PT | sequence number |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | timestamp |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | synchronization source (SSRC) identifier |
/// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
/// | contributing source (CSRC) identifiers |
/// | .... |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// ```
pub Bytes,
);
impl RawPacket {
/// Validates an RTP packet, returning a wrapper and the payload range.
///
/// The payload range is not part of the `RawPacket` to avoid extra padding
/// bytes within the containing `ReceivedPacket`.
pub fn new(data: Bytes) -> Result<(Self, Range<u16>), RawPacketError> {
// RTP doesn't have a defined maximum size but it's implied by the transport:
// * UDP packets (even with fragmentation) are at most 65,536 (minus IP/UDP headers).
// * interleaved RTSP data messages have at most 65,536 bytes of data.
let len = match u16::try_from(data.len()) {
Ok(l) => l,
Err(_) => {
return Err(RawPacketError {
reason: "too long",
data,
})
}
};
if len < MIN_HEADER_LEN {
return Err(RawPacketError {
reason: "too short",
data,
});
}
if (data[0] & 0b1100_0000) != 2 << 6 {
return Err(RawPacketError {
reason: "must be version 2",
data,
});
}
let has_padding = (data[0] & 0b0010_0000) != 0;
let has_extension = (data[0] & 0b0001_0000) != 0;
let csrc_count = data[0] & 0b0000_1111;
let csrc_end = MIN_HEADER_LEN + (4 * u16::from(csrc_count));
let payload_start = if has_extension {
if data.len() < usize::from(csrc_end + 4) {
return Err(RawPacketError {
reason: "extension is after end of packet",
data,
});
}
let extension_len = u16::from_be_bytes([
data[usize::from(csrc_end) + 1],
data[usize::from(csrc_end) + 2],
]);
match csrc_end.checked_add(extension_len) {
Some(s) => s,
None => {
return Err(RawPacketError {
reason: "extension extends beyond maximum packet size",
data,
})
}
}
} else {
csrc_end
};
if len < payload_start {
return Err(RawPacketError {
reason: "payload start is after end of packet",
data,
});
}
let payload_end = if has_padding {
if len == payload_start {
return Err(RawPacketError {
reason: "missing padding",
data,
});
}
let padding_len = u16::from(data[data.len() - 1]);
if padding_len == 0 {
return Err(RawPacketError {
reason: "invalid padding length 0",
data,
});
}
let payload_end = match len.checked_sub(padding_len) {
Some(e) => e,
None => {
return Err(RawPacketError {
reason: "padding larger than packet",
data,
})
}
};
if payload_end < payload_start {
return Err(RawPacketError {
reason: "bad padding",
data,
});
}
payload_end
} else {
len
};
Ok((Self(data), payload_start..payload_end))
}
#[inline]
pub fn mark(&self) -> bool {
(self.0[1] & 0b1000_0000) != 0
}
#[inline]
pub fn sequence_number(&self) -> u16 {
assert!(self.0.len() >= usize::from(MIN_HEADER_LEN));
u16::from_be_bytes([self.0[2], self.0[3]])
}
#[inline]
pub fn ssrc(&self) -> u32 {
assert!(self.0.len() >= usize::from(MIN_HEADER_LEN));
u32::from_be_bytes([self.0[8], self.0[9], self.0[10], self.0[11]])
}
#[inline]
pub fn payload_type(&self) -> u8 {
self.0[1] & 0b0111_1111
}
#[inline]
pub fn timestamp(&self) -> u32 {
assert!(self.0.len() >= usize::from(MIN_HEADER_LEN));
u32::from_be_bytes([self.0[4], self.0[5], self.0[6], self.0[7]])
}
}
#[derive(Debug)]
#[doc(hidden)]
pub struct RawPacketError {
pub reason: &'static str,
pub data: Bytes,
}
pub(crate) struct RawPacketBuilder {
pub sequence_number: u16,
pub timestamp: u32,
pub payload_type: u8,
pub ssrc: u32,
pub mark: bool,
}
impl RawPacketBuilder {
pub(crate) fn build<P: IntoIterator<Item = u8>>(
self,
payload: P,
) -> Result<(RawPacket, Range<u16>), &'static str> {
if self.payload_type >= 0x80 {
return Err("payload type too large");
}
let data: Bytes = [
2 << 6, // version=2, no padding, no extensions, no CSRCs.
if self.mark { 0b1000_0000 } else { 0 } | self.payload_type,
]
.into_iter()
.chain(self.sequence_number.to_be_bytes())
.chain(self.timestamp.to_be_bytes())
.chain(self.ssrc.to_be_bytes())
.chain(payload)
.collect();
let len = u16::try_from(data.len()).map_err(|_| "payload too long")?;
Ok((RawPacket(data), MIN_HEADER_LEN..len))
}
}
/// A received RTP packet.
///
/// This holds more information than the packet itself: also a
/// [`PacketContext`], the stream, and extended timestamp.
pub struct ReceivedPacket {
// Currently this is constructed from crate::client::rtp, so everything here
// is pub(crate).
pub(crate) ctx: PacketContext,
pub(crate) stream_id: usize,
pub(crate) timestamp: crate::Timestamp,
pub(crate) raw: RawPacket,
pub(crate) payload_range: Range<u16>,
// TODO: consider dropping this field in favor of a PacketItem::Loss.
// https://github.com/scottlamb/retina/issues/47
pub(crate) loss: u16,
}
impl std::fmt::Debug for ReceivedPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReceivedPacket")
.field("ctx", &self.ctx)
.field("stream_id", &self.stream_id)
.field("timestamp", &self.timestamp)
.field("ssrc", &self.raw.ssrc())
.field("sequence_number", &self.raw.sequence_number())
.field("mark", &self.raw.mark())
2022-04-29 09:31:31 +04:00
.field("payload", &crate::hex::LimitedHex::new(self.payload(), 64))
.finish()
}
}
impl ReceivedPacket {
#[inline]
pub fn timestamp(&self) -> crate::Timestamp {
self.timestamp
}
#[inline]
pub fn mark(&self) -> bool {
self.raw.mark()
}
#[inline]
pub fn ctx(&self) -> &PacketContext {
&self.ctx
}
#[inline]
pub fn stream_id(&self) -> usize {
self.stream_id
}
#[inline]
pub fn ssrc(&self) -> u32 {
self.raw.ssrc()
}
#[inline]
pub fn sequence_number(&self) -> u16 {
self.raw.sequence_number()
}
/// Returns the raw bytes, including the RTP headers.
#[inline]
pub fn raw(&self) -> &[u8] {
&self.raw.0[..]
}
/// Returns only the payload bytes.
#[inline]
pub fn payload(&self) -> &[u8] {
&self.raw.0[usize::from(self.payload_range.start)..usize::from(self.payload_range.end)]
}
#[inline]
pub fn loss(&self) -> u16 {
self.loss
}
/// Consumes the `ReceivedPacket` and returns the `Payload` as a [`Bytes`].
///
/// This is currently is very efficient (no copying or reference-counting),
/// although that is not an API guarantee.
#[inline]
pub fn into_payload_bytes(self) -> Bytes {
let mut data = self.raw.0;
data.truncate(usize::from(self.payload_range.end));
data.advance(usize::from(self.payload_range.start));
data
}
}
/// Testing API; exposed for fuzz tests.
#[doc(hidden)]
pub struct ReceivedPacketBuilder {
pub ctx: PacketContext,
pub stream_id: usize,
pub sequence_number: u16,
pub timestamp: Timestamp,
pub payload_type: u8,
pub ssrc: u32,
pub mark: bool,
pub loss: u16,
}
impl ReceivedPacketBuilder {
pub fn build<P: IntoIterator<Item = u8>>(
self,
payload: P,
) -> Result<ReceivedPacket, &'static str> {
let (raw, payload_range) = RawPacketBuilder {
sequence_number: self.sequence_number,
timestamp: self.timestamp.timestamp as u32,
payload_type: self.payload_type,
ssrc: self.ssrc,
mark: self.mark,
}
.build(payload)?;
Ok(ReceivedPacket {
ctx: self.ctx,
stream_id: self.stream_id,
timestamp: self.timestamp,
raw,
payload_range,
loss: self.loss,
})
}
}