From dacbca8c28b23c987196e2d607c294edb139b2cb Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Mon, 28 Jun 2021 10:28:48 -0700 Subject: [PATCH] speed up interleaved data parsing This is a 20% improvement on the freshly-added benchmark. --- CHANGELOG.md | 4 ++++ src/lib.rs | 53 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b6f3e8..f0a3229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## unreleased + +* Performance improvements. + ## v0.0.2 (2021-06-25) * Video frames are now provided as a single, contiguous `Bytes`, and diff --git a/src/lib.rs b/src/lib.rs index 0bb064a..e7d5054 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use failure::{bail, format_err, Error}; use once_cell::sync::Lazy; use pretty_hex::PrettyHex; -use rtsp_types::Message; +use rtsp_types::{Data, Message}; use std::convert::TryFrom; use std::fmt::{Debug, Display}; use std::num::NonZeroU32; @@ -254,11 +254,26 @@ pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option Result)>, Error> { + if !src.is_empty() && src[0] == b'$' { + // Fast path for interleaved data, avoiding MessageRef -> Message<&[u8]> -> + // Message conversion. This speeds things up quite a bit in practice, + // avoiding a bunch of memmove calls. + if src.len() < 4 { + return Ok(None); + } + let channel_id = src[1]; + let len = 4 + usize::from(u16::from_be_bytes([src[2], src[3]])); + if src.len() < len { + src.reserve(len - src.len()); + return Ok(None); + } + let mut msg = src.split_to(len); + msg.advance(4); + return Ok(Some((len, Message::Data(Data::new(channel_id, msg.freeze()))))) + } - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let (msg, len): (Message<&[u8]>, _) = match rtsp_types::Message::parse(src) { Ok((m, l)) => (m, l), Err(rtsp_types::ParseError::Error) => { @@ -307,19 +322,21 @@ impl tokio_util::codec::Decoder for Codec { Message::Response(msg.replace_body(Bytes::new())) } } - Message::Data(msg) => { - let body_range = as_range(src, msg.as_slice()); - let msg = msg.replace_body(rtsp_types::Empty); - if let Some(r) = body_range { - let mut raw_msg = src.split_to(len); - raw_msg.advance(r.start); - raw_msg.truncate(r.len()); - Message::Data(msg.replace_body(raw_msg.freeze())) - } else { - src.advance(len); - Message::Data(msg.replace_body(Bytes::new())) - } - } + Message::Data(_) => unreachable!(), + }; + Ok(Some((len, msg))) + } +} + +impl tokio_util::codec::Decoder for Codec { + type Item = ReceivedMessage; + type Error = failure::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let (len, msg) = match self.parse_msg(src) { + Err(e) => return Err(e), + Ok(None) => return Ok(None), + Ok(Some((len, msg))) => (len, msg), }; self.ctx.msg_received_wall = time::get_time(); self.ctx.msg_received = std::time::Instant::now();