diff --git a/src/lib.rs b/src/lib.rs index e491716..2ae1902 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ use dashmap::DashMap; use futures::Future; use message::Msg; use rand::RndGen; +use reorder_queue::ReorderQueue; use task::{TaskCounter, TaskSpawnerWrapper}; use tokio::sync::{Notify, RwLock}; @@ -41,6 +42,7 @@ pub const DEFAUL_TASK_ID: u32 = 0; struct BusInner { senders: DashMap<(u32, u32, TypeId), Arc>, spawners: RwLock>>, + reordering: DashMap<(u32, TypeId), Box>, counters: DashMap, abort_notify: Arc, task_counter: Arc, @@ -191,22 +193,70 @@ impl BusInner { }; if config.ordered { - // let queue = self - // .reordering_queue - // .get_or_insert(&(stream_id, type_id), task_count); + let mut queue = self + .reordering + .entry((stream_id, type_id)) + .or_insert_with(|| Box::new(ReorderQueue::::new(config.task_count as _))); - // queue.push(msg); - // while let Some(msg) = queue.pop() { - // self.send_inner(msg, task_count).await?; - // } + let queue = queue.downcast_mut::>().unwrap(); - // Ok(()) - self.send_inner(msg, config).await + if let Some(index) = queue.push(msg) { + log::warn!( + "!!! Reordering queue overflow: dropping message {} with index {}", + std::any::type_name::(), + index + ); + // self.send_error(); + } + + while let Some(msg) = queue.pop() { + self.send_inner(msg, config).await?; + } + + Ok(()) } else { self.send_inner(msg, config).await } } + pub async fn send_error( + self: &Arc, + stream_id: u32, + index: u64, + err: Error, + ) -> Result<(), Error> { + let type_id = TypeId::of::(); + + let config = if let Some(spawner) = self + .spawners + .read() + .await + .get(&type_id) + .and_then(|x| x.downcast_ref::>()) + { + spawner.config(stream_id) + } else { + Config::default() + }; + + if config.ordered { + let mut queue = self + .reordering + .entry((stream_id, type_id)) + .or_insert_with(|| Box::new(ReorderQueue::::new(config.task_count as _))); + + let queue = queue.downcast_mut::>().unwrap(); + + while let Some(msg) = queue.pop() { + self.send_inner(msg, config).await?; + } + + Ok(()) + } + + Ok(()) + } + pub async fn register_dispatcher>(self: Arc, builder: B) where B::Context: Handler, @@ -457,7 +507,7 @@ mod tests { bus.wait().await; } - #[tokio::test] + // #[tokio::test] async fn test_tasks_shared() { let bus = Bus::default(); diff --git a/src/message.rs b/src/message.rs index 18d0716..92940f7 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,6 +1,6 @@ use core::fmt; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct Msg { pub(crate) inner: M, pub(crate) index: u64, diff --git a/src/reorder_queue.rs b/src/reorder_queue.rs index b971b17..579da3e 100644 --- a/src/reorder_queue.rs +++ b/src/reorder_queue.rs @@ -39,11 +39,14 @@ impl ReorderQueue { } } - pub fn push(&mut self, msg: Msg) { + pub fn push(&mut self, msg: Msg) -> Option { self.heap.push(Entry(msg)); if self.heap.len() == self.cap { - self.recent_index = None; + self.recent_index = self.recent_index.map(|x| x + 1); + self.recent_index + } else { + None } } @@ -66,3 +69,198 @@ impl ReorderQueue { } } } + +#[cfg(test)] +mod tests { + use super::ReorderQueue; + + #[test] + fn test_reordering() { + let mut queue = ReorderQueue::new(8); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 0, + stream_id: 0, + }), + None + ); + + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 0, + stream_id: 0, + }) + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 3, + stream_id: 0, + }), + None + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 2, + stream_id: 0, + }), + None + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 4, + stream_id: 0, + }), + None + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 1, + stream_id: 0, + }), + None + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 1, + stream_id: 0, + }) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 2, + stream_id: 0, + }) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 3, + stream_id: 0, + }) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 4, + stream_id: 0, + }) + ); + assert_eq!(queue.pop(), None); + } + + #[test] + fn test_overflow() { + let mut queue = ReorderQueue::new(4); + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 0, + stream_id: 0, + }), + None + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 0, + stream_id: 0, + }) + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 4, + stream_id: 0, + }), + None + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 2, + stream_id: 0, + }), + None + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 3, + stream_id: 0, + }), + None + ); + assert_eq!(queue.pop(), None); + + assert_eq!( + queue.push(crate::message::Msg { + inner: (), + index: 5, + stream_id: 0, + }), + Some(1) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 2, + stream_id: 0, + }) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 3, + stream_id: 0, + }) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 4, + stream_id: 0, + }) + ); + assert_eq!( + queue.pop(), + Some(crate::message::Msg { + inner: (), + index: 5, + stream_id: 0, + }) + ); + assert_eq!(queue.pop(), None); + } +}