speed up interleaved data parsing

This is a 20% improvement on the freshly-added benchmark.
This commit is contained in:
Scott Lamb 2021-06-28 10:28:48 -07:00
parent 4a8b8f2f77
commit dacbca8c28
2 changed files with 39 additions and 18 deletions

View File

@ -1,3 +1,7 @@
## unreleased
* Performance improvements.
## v0.0.2 (2021-06-25)
* Video frames are now provided as a single, contiguous `Bytes`, and

View File

@ -5,7 +5,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use failure::{bail, format_err, Error};
use once_cell::sync::Lazy;
use pretty_hex::PrettyHex;
use rtsp_types::Message;
use rtsp_types::{Data, Message};
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
use std::num::NonZeroU32;
@ -254,11 +254,26 @@ pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option<std::ops::Range<usiz
Some(off..end)
}
impl tokio_util::codec::Decoder for Codec {
type Item = ReceivedMessage;
type Error = failure::Error;
impl Codec {
fn parse_msg(&self, src: &mut BytesMut) -> Result<Option<(usize, Message<Bytes>)>, Error> {
if !src.is_empty() && src[0] == b'$' {
// Fast path for interleaved data, avoiding MessageRef -> Message<&[u8]> ->
// Message<Bytes> conversion. This speeds things up quite a bit in practice,
// avoiding a bunch of memmove calls.
if src.len() < 4 {
return Ok(None);
}
let channel_id = src[1];
let len = 4 + usize::from(u16::from_be_bytes([src[2], src[3]]));
if src.len() < len {
src.reserve(len - src.len());
return Ok(None);
}
let mut msg = src.split_to(len);
msg.advance(4);
return Ok(Some((len, Message::Data(Data::new(channel_id, msg.freeze())))))
}
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) {
Ok((m, l)) => (m, l),
Err(rtsp_types::ParseError::Error) => {
@ -307,19 +322,21 @@ impl tokio_util::codec::Decoder for Codec {
Message::Response(msg.replace_body(Bytes::new()))
}
}
Message::Data(msg) => {
let body_range = as_range(src, msg.as_slice());
let msg = msg.replace_body(rtsp_types::Empty);
if let Some(r) = body_range {
let mut raw_msg = src.split_to(len);
raw_msg.advance(r.start);
raw_msg.truncate(r.len());
Message::Data(msg.replace_body(raw_msg.freeze()))
} else {
src.advance(len);
Message::Data(msg.replace_body(Bytes::new()))
}
}
Message::Data(_) => unreachable!(),
};
Ok(Some((len, msg)))
}
}
impl tokio_util::codec::Decoder for Codec {
type Item = ReceivedMessage;
type Error = failure::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let (len, msg) = match self.parse_msg(src) {
Err(e) => return Err(e),
Ok(None) => return Ok(None),
Ok(Some((len, msg))) => (len, msg),
};
self.ctx.msg_received_wall = time::get_time();
self.ctx.msg_received = std::time::Instant::now();