From 4dd6a7a2af96548c9448ad4338380758c25c88db Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Thu, 23 Nov 2023 10:17:58 +0400 Subject: [PATCH] handler context --- examples/demo.rs | 8 +++----- src/chan.rs | 5 +++++ src/handler.rs | 31 ++++++++++++++++++++++-------- src/lib.rs | 49 +++++++++++++++++++----------------------------- 4 files changed, 50 insertions(+), 43 deletions(-) diff --git a/examples/demo.rs b/examples/demo.rs index 17c44a4..2f565c5 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::Error; -use messagebus::{Builder, Bus, Handler, IntoMessages, Message}; +use messagebus::{Builder, Bus, Context, Handler, IntoMessages, Message}; #[derive(Debug, Clone)] pub struct Msg(pub i32); @@ -21,8 +21,7 @@ impl Handler for Processor { async fn handle( &mut self, _msg: Msg, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(()) @@ -31,8 +30,7 @@ impl Handler for Processor { async fn handle_error( &mut self, _err: messagebus::Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: messagebus::Bus, ) -> Result + Send + '_, Self::Error> { Ok(None) diff --git a/src/chan.rs b/src/chan.rs index 657f828..3db5096 100644 --- a/src/chan.rs +++ b/src/chan.rs @@ -129,6 +129,11 @@ impl Receiver { } } + #[inline] + pub fn load(&self) -> (u32, u32) { + (self.inner.len() as _, self.inner.capacity() as _) + } + #[inline] pub fn is_empty(&self) -> bool { self.inner.is_empty() diff --git a/src/handler.rs b/src/handler.rs index 976b117..2f283e9 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -19,6 +19,15 @@ use crate::{ AsyncIterator, Bus, BusInner, Error, ErrorMessage, Message, }; +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct Context { + pub stream_id: u32, + pub task_id: u32, + pub index: u64, + pub load: (u32, u32), +} + pub trait Handler: Send + Sync + 'static { type Result: Message + Unpin; type Error: ErrorMessage + Unpin; @@ -26,8 +35,7 @@ pub trait Handler: Send + Sync + 'static { fn handle( &mut self, msg: M, - stream_id: u32, - task_id: u32, + ctx: Context, bus: crate::Bus, ) -> impl Future< Output = Result + Send + '_, Self::Error>, @@ -37,8 +45,7 @@ pub trait Handler: Send + Sync + 'static { fn handle_error( &mut self, _err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: crate::Bus, ) -> impl Future< Output = Result + Send + '_, Self::Error>, @@ -81,13 +88,19 @@ where let bus = Bus { inner: bus.clone() }; let config = self.builder.config(stream_id); - let mut ctx = self.builder.build(stream_id, task_id).await?; + let mut handler = self.builder.build(stream_id, task_id).await?; let _handle = tokio::spawn(async move { let _test = spawn_counter.clone().lease_unit(|| true); while let Some(msg) = rx.recv().await { let _test = task_counter.clone().lease_unit(|| rx.is_empty()); + let ctx = Context { + stream_id, + task_id, + index: msg.index, + load: rx.load(), + }; let res = match msg.inner { Some(Ok(m)) => { @@ -98,7 +111,8 @@ where stream_id, &config, Some( - ctx.handle(m, stream_id, task_id, bus.clone()) + handler + .handle(m, ctx, bus.clone()) .await .map(IntoMessages::into_messages), ), @@ -113,7 +127,8 @@ where stream_id, &config, Some( - ctx.handle_error(err, stream_id, task_id, bus.clone()) + handler + .handle_error(err, ctx, bus.clone()) .await .map(IntoMessages::into_messages), ), @@ -153,7 +168,7 @@ where std::any::type_name::() ); - if let Err(err) = ctx.finalize(bus.clone()).await { + if let Err(err) = handler.finalize(bus.clone()).await { println!("TASK FINALIZE ERROR: {:?}", err); } }); diff --git a/src/lib.rs b/src/lib.rs index da7dac9..4557aca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ use tokio::sync::{Notify, RwLock}; pub use async_iter::*; pub use builder::{Builder, DefaultBuilder, SharedBuilder}; pub use error::{Error, VoidError}; -pub use handler::Handler; +pub use handler::{Context, Handler}; pub use message::{ErrorMessage, IntoMessages, Message}; pub const DEFAUL_STREAM_ID: u32 = u32::MAX; @@ -454,18 +454,16 @@ where async fn handle( &mut self, _msg: M, - _stream_id: u32, - _task_id: u32, + ctx: Context, _bus: Bus, ) -> Result, Self::Error> { - (self.cb)(_stream_id, _task_id, _msg).map(|x| [x]) + (self.cb)(ctx.stream_id, ctx.task_id, _msg).map(|x| [x]) } async fn handle_error( &mut self, _err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(None) @@ -484,7 +482,8 @@ mod tests { use rand::RngCore; use crate::{ - stream, Bus, DefaultBuilder, Error, Handler, IntoMessages, Message, SharedBuilder, + handler::Context, stream, Bus, DefaultBuilder, Error, Handler, IntoMessages, Message, + SharedBuilder, }; impl Message for u64 {} @@ -501,8 +500,7 @@ mod tests { async fn handle( &mut self, _msg: u32, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(stream(stream! { @@ -515,8 +513,7 @@ mod tests { async fn handle_error( &mut self, _err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(None) @@ -542,14 +539,13 @@ mod tests { async fn handle( &mut self, msg: u64, - stream_id: u32, - task_id: u32, + ctx: Context, _bus: Bus, ) -> Result, Self::Error> { tokio::time::sleep(Duration::from_millis(1000)).await; println!( "[{}] shared consumer handle {}u64 ({}:{})", - self.0, msg, stream_id, task_id + self.0, msg, ctx.stream_id, ctx.task_id ); Ok(()) @@ -558,8 +554,7 @@ mod tests { async fn handle_error( &mut self, _err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(None) @@ -578,14 +573,13 @@ mod tests { async fn handle( &mut self, msg: u64, - stream_id: u32, - task_id: u32, + ctx: Context, _bus: Bus, ) -> Result, Self::Error> { tokio::time::sleep(Duration::from_millis(100)).await; println!( "[{}] consumer handle {}u64 ({}:{})", - self.0, msg, stream_id, task_id + self.0, msg, ctx.stream_id, ctx.task_id ); Ok(()) } @@ -593,8 +587,7 @@ mod tests { async fn handle_error( &mut self, _err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(None) @@ -615,11 +608,10 @@ mod tests { async fn handle( &mut self, msg: i16, - _stream_id: u32, - task_id: u32, + ctx: Context, _bus: Bus, ) -> Result, Self::Error> { - if task_id % 2 == 0 { + if ctx.task_id % 2 == 0 { tokio::time::sleep(Duration::from_millis(13)).await; } else { tokio::time::sleep(Duration::from_millis(22)).await; @@ -632,8 +624,7 @@ mod tests { async fn handle_error( &mut self, _err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { Ok(None) @@ -656,8 +647,7 @@ mod tests { async fn handle( &mut self, msg: u16, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { println!("{}", msg); @@ -675,8 +665,7 @@ mod tests { async fn handle_error( &mut self, err: Error, - _stream_id: u32, - _task_id: u32, + _ctx: Context, _bus: Bus, ) -> Result, Self::Error> { println!("{:?}", err);