diff --git a/examples/demo.rs b/examples/demo.rs index aba7c95..5d890b1 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -1,4 +1,5 @@ -#![feature(impl_trait_in_assoc_type)] +#![feature(return_position_impl_trait_in_trait)] +#![feature(async_fn_in_trait)] use std::sync::Arc; @@ -9,48 +10,60 @@ pub struct Msg(pub i32); impl Message for Msg {} pub struct Processor { - state: i32, + _state: i32, } impl Handler for Processor { type Result = (); - type IntoMessage = impl IntoMessage; - type HandleFut<'a> = impl futures::Future> + 'a; - type FinalizeFut<'a> = impl futures::Future> + 'a; - - fn handle(&mut self, msg: Msg, _stream_id: u32, _task_id: u32) -> Self::HandleFut<'_> { - async move { Ok(()) } + async fn handle( + &mut self, + _msg: Msg, + _stream_id: u32, + _task_id: u32, + ) -> Result, Error> { + Ok(()) } - fn finalize<'a>(self) -> Self::FinalizeFut<'a> { - async move { Ok(()) } + async fn finalize(self) -> Result<(), Error> { + Ok(()) + } + + async fn handle_error( + &mut self, + _err: Error, + _stream_id: u32, + _task_id: u32, + ) -> Result, Error> { + Ok(None) } } struct ProcSpawner; impl Builder for ProcSpawner { type Context = Processor; - type BuildFut<'a> = impl futures::Future> + 'a; - fn build(&self, stream_id: u32, _task_id: u32) -> Self::BuildFut<'_> { - async move { - Ok(Processor { - state: stream_id as _, - }) - } + async fn build(&self, stream_id: u32, _task_id: u32) -> Result { + Ok(Processor { + _state: stream_id as _, + }) } } impl Processor { - pub async fn spawn(sid: u32) -> Result<(usize, Self), Error> { - Ok((4, Self { state: 0 })) + pub async fn spawn(_sid: u32) -> Result<(usize, Self), Error> { + Ok((4, Self { _state: 0 })) } - pub async fn handler_msg(self: Arc, sid: u32, tid: u32, msg: Msg) -> Result<(), Error> { + pub async fn handler_msg( + self: Arc, + _sid: u32, + _tid: u32, + _msg: Msg, + ) -> Result<(), Error> { Ok(()) } - pub async fn finalize_msg_handler(self: Arc, sid: u32) -> Result<(), Error> { + pub async fn finalize_msg_handler(self: Arc, _sid: u32) -> Result<(), Error> { Ok(()) } } diff --git a/src/builder.rs b/src/builder.rs index ec01fe8..0f96e87 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -27,15 +27,16 @@ impl Default for Config { pub trait Builder: Send + Sync + 'static { type Context: 'static; - type BuildFut<'a>: Future> + Send + 'a - where - Self: 'a; fn config(&self, _stream_id: u32) -> Config { Default::default() } - fn build(&self, stream_id: u32, _task_id: u32) -> Self::BuildFut<'_>; + fn build( + &self, + stream_id: u32, + _task_id: u32, + ) -> impl Future> + Send + '_; } pub struct DefaultBuilder { @@ -60,10 +61,9 @@ impl DefaultBuilder { impl Builder for DefaultBuilder { type Context = H; - type BuildFut<'a> = impl Future> + Send + 'a; - fn build(&self, _stream_id: u32, _task_id: u32) -> Self::BuildFut<'_> { - async move { Ok(::default()) } + async fn build(&self, _stream_id: u32, _task_id: u32) -> Result { + Ok(::default()) } fn config(&self, _stream_id: u32) -> Config { @@ -133,18 +133,15 @@ where C: Sync + Send + Fn(u32, u32) -> F + 'static, { type Context = Arc; - type BuildFut<'a> = impl Future> + Send + 'a; - fn build(&self, stream_id: u32, task_id: u32) -> Self::BuildFut<'_> { - async move { - if self.stream_handlers.contains_key(&stream_id) { - return Ok(self.stream_handlers.get(&stream_id).unwrap().clone()); - } - - let val = Arc::new((self.callback)(stream_id, task_id).await?); - self.stream_handlers.insert(stream_id, val.clone()); - Ok(val.clone()) + async fn build(&self, stream_id: u32, task_id: u32) -> Result { + if self.stream_handlers.contains_key(&stream_id) { + return Ok(self.stream_handlers.get(&stream_id).unwrap().clone()); } + + let val = Arc::new((self.callback)(stream_id, task_id).await?); + self.stream_handlers.insert(stream_id, val.clone()); + Ok(val.clone()) } fn config(&self, _stream_id: u32) -> Config { diff --git a/src/error.rs b/src/error.rs index f68c1f9..8fcbb0b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,9 +1,27 @@ -#[derive(Debug)] +use kanal::ReceiveError; + +#[derive(Debug, PartialEq, Eq)] pub enum Error { HandlerIsNotRegistered, Aborted, SendError(String), ReceiveError(kanal::ReceiveError), + ReorderingMissedMessage(u64), +} + +impl Clone for Error { + fn clone(&self) -> Self { + match self { + Error::SendError(err) => Error::SendError(err.clone()), + Error::ReceiveError(err) => match err { + ReceiveError::Closed => Error::ReceiveError(ReceiveError::Closed), + ReceiveError::SendClosed => Error::ReceiveError(ReceiveError::SendClosed), + }, + Error::HandlerIsNotRegistered => Error::HandlerIsNotRegistered, + Error::Aborted => Error::Aborted, + Error::ReorderingMissedMessage(idx) => Error::ReorderingMissedMessage(*idx), + } + } } impl From for Error { diff --git a/src/handler.rs b/src/handler.rs index 64aeb6e..aba8320 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,9 +1,4 @@ -use std::{ - any::{Any, TypeId}, - marker::PhantomData, - pin::Pin, - sync::Arc, -}; +use std::{any::Any, marker::PhantomData, pin::Pin, sync::Arc}; use futures::Future; use tokio::sync::Notify; @@ -18,17 +13,22 @@ use crate::{ pub trait Handler: Send + Sync + 'static { type Result: Message; - type IntoMessage: IntoMessage; - type HandleFut<'a>: Future> + Send + 'a - where - Self: 'a; - type FinalizeFut<'a>: Future> + Send + 'a - where - Self: 'a; + fn handle( + &mut self, + msg: M, + stream_id: u32, + task_id: u32, + ) -> impl Future, Error>> + Send + '_; - fn handle(&mut self, msg: M, stream_id: u32, task_id: u32) -> Self::HandleFut<'_>; - fn finalize<'a>(self) -> Self::FinalizeFut<'a>; + fn handle_error( + &mut self, + err: Error, + stream_id: u32, + task_id: u32, + ) -> impl Future, Error>> + Send + '_; + + fn finalize(self) -> impl Future> + Send; } pub(crate) struct HandlerSpawner { @@ -66,28 +66,32 @@ where while let Some(msg) = rx.recv().await { task_counter.inc_running(); - let res_msg = match ctx.handle(msg.inner, stream_id, task_id).await { - Ok(res) => res.into_message(), - Err(err) => { - println!("TASK HANDLE ERROR: {:?}", err); - continue; - } + let resp = match msg.inner { + Ok(m) => ctx + .handle(m, stream_id, task_id) + .await + .map(IntoMessage::into_message), + + Err(err) => ctx + .handle_error(err, stream_id, task_id) + .await + .map(IntoMessage::into_message), }; - if let Some(inner) = res_msg { - if inner.type_id() != TypeId::of::<()>() { - if let Err(err) = bus - .send(Msg { - inner, - index: msg.index, - stream_id, - }) - .await - { - println!("BUS SEND ERROR: {:?}", err); - continue; - } - } + let Some(inner) = resp.transpose() else { + let _ = bus.send_skip::(stream_id, msg.index).await; + continue; + }; + + if let Err(err) = bus + .send(Msg { + inner, + index: msg.index, + stream_id, + }) + .await + { + println!("BUS SEND ERROR: {:?}", err); } task_counter.dec_running(rx.is_empty()); diff --git a/src/lib.rs b/src/lib.rs index 2ae1902..de4fb2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ -#![feature(impl_trait_in_assoc_type)] +#![feature(return_position_impl_trait_in_trait)] +#![feature(async_fn_in_trait)] mod builder; mod chan; @@ -111,11 +112,12 @@ impl BusInner { async fn send_inner( self: &Arc, - msg: Msg, + msg: Result, + index: u64, + stream_id: u32, config: Config, ) -> Result<(), Error> { let type_id = TypeId::of::(); - let stream_id = msg.stream_id; let task_id = self.get_task_id::(stream_id, &config); if !self.senders.contains_key(&(stream_id, task_id, type_id)) { @@ -169,7 +171,11 @@ impl BusInner { .upcast() .downcast_ref::>() .unwrap() - .send(msg) + .send(Msg { + inner: msg, + index, + stream_id, + }) .await .unwrap(); @@ -178,6 +184,10 @@ impl BusInner { pub async fn send(self: &Arc, msg: Msg) -> Result<(), Error> { let type_id = TypeId::of::(); + if type_id == TypeId::of::<()>() { + return Ok(()); + } + let stream_id = msg.stream_id; let config = if let Some(spawner) = self @@ -196,36 +206,48 @@ impl BusInner { let mut queue = self .reordering .entry((stream_id, type_id)) - .or_insert_with(|| Box::new(ReorderQueue::::new(config.task_count as _))); + .or_insert_with(|| { + Box::new(ReorderQueue::>>::new( + config.task_count as _, + )) + }); - let queue = queue.downcast_mut::>().unwrap(); + let queue = queue + .downcast_mut::>>>() + .unwrap(); - if let Some(index) = queue.push(msg) { - log::warn!( - "!!! Reordering queue overflow: dropping message {} with index {}", - std::any::type_name::(), - index - ); - // self.send_error(); + if let Some(index) = queue.push(msg.index, Some(msg.inner)) { + self.send_inner( + Err::(Error::ReorderingMissedMessage(index)), + index, + stream_id, + config, + ) + .await?; } while let Some(msg) = queue.pop() { - self.send_inner(msg, config).await?; + if let (index, Some(Some(msg))) = msg { + self.send_inner(msg, index, stream_id, config).await?; + } } Ok(()) } else { - self.send_inner(msg, config).await + self.send_inner(msg.inner, msg.index, stream_id, config) + .await } } - pub async fn send_error( + pub async fn send_skip( self: &Arc, stream_id: u32, index: u64, - err: Error, ) -> Result<(), Error> { let type_id = TypeId::of::(); + if type_id == TypeId::of::<()>() { + return Ok(()); + } let config = if let Some(spawner) = self .spawners @@ -243,15 +265,31 @@ impl BusInner { let mut queue = self .reordering .entry((stream_id, type_id)) - .or_insert_with(|| Box::new(ReorderQueue::::new(config.task_count as _))); + .or_insert_with(|| { + Box::new(ReorderQueue::>>::new( + config.task_count as _, + )) + }); - let queue = queue.downcast_mut::>().unwrap(); + let queue = queue + .downcast_mut::>>>() + .unwrap(); - while let Some(msg) = queue.pop() { - self.send_inner(msg, config).await?; + if let Some(index) = queue.push(index, None) { + self.send_inner( + Err::(Error::ReorderingMissedMessage(index)), + index, + stream_id, + config, + ) + .await?; } - Ok(()) + while let Some(msg) = queue.pop() { + if let (index, Some(Some(msg))) = msg { + self.send_inner(msg, index, stream_id, config).await?; + } + } } Ok(()) @@ -341,7 +379,7 @@ impl Bus { self.inner .send(Msg { - inner, + inner: Ok(inner), index, stream_id: DEFAUL_STREAM_ID, }) @@ -363,7 +401,7 @@ impl Bus { self.inner .send(Msg { - inner, + inner: Ok(inner), index, stream_id, }) @@ -397,7 +435,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use async_stream::stream; - use futures::{Future, Stream}; + use futures::Stream; use rand::RngCore; use crate::{ @@ -411,12 +449,13 @@ mod tests { struct TestProducer; impl Producer for TestProducer { type Item = u64; - type IntoMessage = impl IntoMessage; - type Stream<'a> = impl Stream> + Send + 'a; - type FinalizeFut<'a> = impl Future> + Send + 'a; - - fn stream(&mut self, _msg: u32, _stream_id: u32, _task_id: u32) -> Self::Stream<'_> { + fn stream( + &mut self, + _msg: u32, + _stream_id: u32, + _task_id: u32, + ) -> impl Stream, Error>> + Send + '_ { stream! { for i in 0u64..10 { yield Ok(i) @@ -424,11 +463,18 @@ mod tests { } } - fn finalize<'a>(self) -> Self::FinalizeFut<'a> { - async move { - println!("producer finalized"); - Ok(()) - } + async fn handle_error( + &mut self, + _err: Error, + _stream_id: u32, + _task_id: u32, + ) -> Result, Error> { + Ok(None) + } + + async fn finalize(self) -> Result<(), Error> { + println!("producer finalized"); + Ok(()) } } @@ -441,55 +487,69 @@ mod tests { impl Handler for Arc { type Result = (); - type IntoMessage = impl IntoMessage; - type HandleFut<'a> = impl Future> + Send + 'a; - type FinalizeFut<'a> = impl Future> + Send + 'a; - fn handle(&mut self, msg: u64, stream_id: u32, task_id: u32) -> Self::HandleFut<'_> { - async move { - tokio::time::sleep(Duration::from_millis(1000)).await; - println!( - "[{}] shared consumer handle {}u64 ({}:{})", - self.0, msg, stream_id, task_id - ); - Ok(()) - } + async fn handle( + &mut self, + msg: u64, + stream_id: u32, + task_id: u32, + ) -> Result, Error> { + tokio::time::sleep(Duration::from_millis(1000)).await; + println!( + "[{}] shared consumer handle {}u64 ({}:{})", + self.0, msg, stream_id, task_id + ); + Ok(()) + } + async fn handle_error( + &mut self, + _err: Error, + _stream_id: u32, + _task_id: u32, + ) -> Result, Error> { + Ok(None) } - fn finalize<'a>(self) -> Self::FinalizeFut<'a> { - async move { - println!("[{}] shared consumer finalized", self.0); - Ok(()) - } + async fn finalize(self) -> Result<(), Error> { + println!("[{}] shared consumer finalized", self.0); + Ok(()) } } impl Handler for TestConsumer { type Result = (); - type IntoMessage = impl IntoMessage; - type HandleFut<'a> = impl Future> + Send + 'a; - type FinalizeFut<'a> = impl Future> + Send + 'a; - fn handle(&mut self, msg: u64, stream_id: u32, task_id: u32) -> Self::HandleFut<'_> { - async move { - tokio::time::sleep(Duration::from_millis(1000)).await; - println!( - "[{}] consumer handle {}u64 ({}:{})", - self.0, msg, stream_id, task_id - ); - Ok(()) - } + async fn handle( + &mut self, + msg: u64, + stream_id: u32, + task_id: u32, + ) -> Result, Error> { + tokio::time::sleep(Duration::from_millis(1000)).await; + println!( + "[{}] consumer handle {}u64 ({}:{})", + self.0, msg, stream_id, task_id + ); + Ok(()) } - fn finalize<'a>(self) -> Self::FinalizeFut<'a> { - async move { - println!("[{}] consumer finalized", self.0); - Ok(()) - } + async fn handle_error( + &mut self, + _err: Error, + _stream_id: u32, + _task_id: u32, + ) -> Result, Error> { + Ok(None) + } + + async fn finalize(self) -> Result<(), Error> { + println!("[{}] consumer finalized", self.0); + Ok(()) } } - // #[tokio::test] + #[tokio::test] + #[ignore = ""] async fn test_streams() { let bus = Bus::default(); @@ -507,7 +567,8 @@ mod tests { bus.wait().await; } - // #[tokio::test] + #[tokio::test] + #[ignore = ""] async fn test_tasks_shared() { let bus = Bus::default(); diff --git a/src/message.rs b/src/message.rs index 92940f7..c6ccf43 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,12 +1,24 @@ use core::fmt; +use crate::Error; + #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct Msg { - pub(crate) inner: M, + pub(crate) inner: Result, pub(crate) index: u64, pub(crate) stream_id: u32, } +// impl Msg { +// pub(crate) fn new(m: M, index: u64, stream_id: u32) -> Self { +// Self { +// inner: Ok(m), +// index, +// stream_id, +// } +// } +// } + pub trait Message: fmt::Debug + Clone + Send + Sync + 'static {} impl Message for () {} diff --git a/src/producer.rs b/src/producer.rs index 0eb1d2f..cbf89c9 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -17,17 +17,22 @@ use crate::{ pub trait Producer: Send + Sync + 'static { type Item: Message; - type IntoMessage: IntoMessage; - type Stream<'a>: Stream> + Send + 'a - where - Self: 'a; - type FinalizeFut<'a>: Future> + Send + 'a - where - Self: 'a; + fn stream( + &mut self, + msg: M, + stream_id: u32, + task_id: u32, + ) -> impl Stream, Error>> + Send + '_; - fn stream(&mut self, msg: M, stream_id: u32, task_id: u32) -> Self::Stream<'_>; - fn finalize<'a>(self) -> Self::FinalizeFut<'a>; + fn handle_error( + &mut self, + err: Error, + stream_id: u32, + task_id: u32, + ) -> impl Future, Error>> + Send + '_; + + fn finalize(self) -> impl Future> + Send; } pub(crate) struct ProducerSpawner { @@ -64,18 +69,18 @@ where while let Some(recv_msg) = rx.recv().await { task_counter.inc_running(); - let mut stream = pin!(ctx - .stream(recv_msg.inner, stream_id, task_id) - .take_until(abort.notified())); + match recv_msg.inner { + Ok(msg) => { + let mut stream = pin!(ctx + .stream(msg, stream_id, task_id) + .take_until(abort.notified())); - let mut index = 0; + let mut index = 0; - loop { - index += 1; - - match stream.next().await { - Some(Ok(msg)) => { - if let Some(inner) = msg.into_message() { + while let Some(res) = stream.next().await { + if let Some(inner) = res.map(IntoMessage::into_message).transpose() + { + index += 1; if let Err(err) = bus .send(Msg { inner, @@ -89,12 +94,32 @@ where } } } - Some(Err(err)) => { - println!("PRODUCER ERROR: {:?}", err); + } + Err(err) => { + let Some(inner) = ctx + .handle_error(err, stream_id, task_id) + .await + .map(IntoMessage::into_message) + .transpose() + else { + if let Err(err) = + bus.send_skip::(stream_id, recv_msg.index).await + { + println!("BUS SEND ERROR: {:?}", err); + } continue; - } + }; - None => break, + if let Err(err) = bus + .send(Msg { + inner, + index: recv_msg.index, + stream_id, + }) + .await + { + println!("BUS SEND ERROR: {:?}", err); + } } } diff --git a/src/reorder_queue.rs b/src/reorder_queue.rs index 579da3e..e8ff3fd 100644 --- a/src/reorder_queue.rs +++ b/src/reorder_queue.rs @@ -1,36 +1,37 @@ use std::{cmp::Ordering, collections::BinaryHeap}; -use crate::{message::Msg, Message}; +struct Entry { + inner: Option, + index: u64, +} -struct Entry(Msg); - -impl PartialOrd for Entry { +impl PartialOrd for Entry { fn partial_cmp(&self, other: &Self) -> Option { - Some(other.0.index.cmp(&self.0.index)) + Some(other.index.cmp(&self.index)) } } -impl Ord for Entry { +impl Ord for Entry { fn cmp(&self, other: &Self) -> Ordering { - other.0.index.cmp(&self.0.index) + other.index.cmp(&self.index) } } -impl PartialEq for Entry { +impl PartialEq for Entry { fn eq(&self, other: &Self) -> bool { - other.0.index.eq(&self.0.index) + other.index.eq(&self.index) } } -impl Eq for Entry {} +impl Eq for Entry {} -pub(crate) struct ReorderQueue { +pub(crate) struct ReorderQueue { cap: usize, recent_index: Option, heap: BinaryHeap>, } -impl ReorderQueue { +impl ReorderQueue { pub fn new(cap: usize) -> Self { Self { cap, @@ -39,8 +40,11 @@ impl ReorderQueue { } } - pub fn push(&mut self, msg: Msg) -> Option { - self.heap.push(Entry(msg)); + pub fn push(&mut self, index: u64, msg: M) -> Option { + self.heap.push(Entry { + inner: Some(msg), + index, + }); if self.heap.len() == self.cap { self.recent_index = self.recent_index.map(|x| x + 1); @@ -50,18 +54,18 @@ impl ReorderQueue { } } - pub fn pop(&mut self) -> Option> { + pub fn pop(&mut self) -> Option<(u64, Option)> { match self.recent_index { None => { let e = self.heap.pop()?; - self.recent_index = Some(e.0.index); - Some(e.0) + self.recent_index = Some(e.index); + Some((e.index, e.inner)) } Some(ri) => { let e = self.heap.peek()?; - if e.0.index == ri + 1 { - self.recent_index = Some(e.0.index); - Some(self.heap.pop()?.0) + if e.index == ri + 1 { + self.recent_index = Some(e.index); + Some((e.index, self.heap.pop()?.inner)) } else { None } @@ -72,195 +76,58 @@ impl ReorderQueue { #[cfg(test)] mod tests { + use crate::Message; + use super::ReorderQueue; + impl Message for i32 {} + #[test] fn test_reordering() { let mut queue = ReorderQueue::new(8); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 0, - stream_id: 0, - }), - None - ); - - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 0, - stream_id: 0, - }) - ); + assert_eq!(queue.push(0, 0), None); + assert_eq!(queue.pop(), Some((0, Some(0)))); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 3, - stream_id: 0, - }), - None - ); + assert_eq!(queue.push(3, 3), None); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 2, - stream_id: 0, - }), - None - ); + assert_eq!(queue.push(2, 2), None); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 4, - stream_id: 0, - }), - None - ); + assert_eq!(queue.push(4, 4), None); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 1, - stream_id: 0, - }), - None - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 1, - stream_id: 0, - }) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 2, - stream_id: 0, - }) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 3, - stream_id: 0, - }) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 4, - stream_id: 0, - }) - ); + assert_eq!(queue.push(1, 1), None); + assert_eq!(queue.pop(), Some((1, Some(1)))); + assert_eq!(queue.pop(), Some((2, Some(2)))); + assert_eq!(queue.pop(), Some((3, Some(3)))); + assert_eq!(queue.pop(), Some((4, Some(4)))); assert_eq!(queue.pop(), None); } #[test] fn test_overflow() { let mut queue = ReorderQueue::new(4); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 0, - stream_id: 0, - }), - None - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 0, - stream_id: 0, - }) - ); + assert_eq!(queue.push(0, 0), None); + assert_eq!(queue.pop(), Some((0, Some(0)))); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 4, - stream_id: 0, - }), - None - ); + assert_eq!(queue.push(4, 4), None); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 2, - stream_id: 0, - }), - None - ); + assert_eq!(queue.push(2, 2), None); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 3, - stream_id: 0, - }), - None - ); + assert_eq!(queue.push(3, 3), None); assert_eq!(queue.pop(), None); - assert_eq!( - queue.push(crate::message::Msg { - inner: (), - index: 5, - stream_id: 0, - }), - Some(1) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 2, - stream_id: 0, - }) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 3, - stream_id: 0, - }) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 4, - stream_id: 0, - }) - ); - assert_eq!( - queue.pop(), - Some(crate::message::Msg { - inner: (), - index: 5, - stream_id: 0, - }) - ); + assert_eq!(queue.push(5, 5), Some(1)); + assert_eq!(queue.pop(), Some((2, Some(2)))); + assert_eq!(queue.pop(), Some((3, Some(3)))); + assert_eq!(queue.pop(), Some((4, Some(4)))); + assert_eq!(queue.pop(), Some((5, Some(5)))); assert_eq!(queue.pop(), None); } }