handler context

This commit is contained in:
Andrey Tkachenko 2023-11-23 10:17:58 +04:00
parent d57dd8fed9
commit 4dd6a7a2af
4 changed files with 50 additions and 43 deletions

View File

@ -4,7 +4,7 @@
use std::sync::Arc;
use anyhow::Error;
use messagebus::{Builder, Bus, Handler, IntoMessages, Message};
use messagebus::{Builder, Bus, Context, Handler, IntoMessages, Message};
#[derive(Debug, Clone)]
pub struct Msg(pub i32);
@ -21,8 +21,7 @@ impl Handler<Msg> for Processor {
async fn handle(
&mut self,
_msg: Msg,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(())
@ -31,8 +30,7 @@ impl Handler<Msg> for Processor {
async fn handle_error(
&mut self,
_err: messagebus::Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: messagebus::Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error> {
Ok(None)

View File

@ -129,6 +129,11 @@ impl<T> Receiver<T> {
}
}
#[inline]
pub fn load(&self) -> (u32, u32) {
(self.inner.len() as _, self.inner.capacity() as _)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()

View File

@ -19,6 +19,15 @@ use crate::{
AsyncIterator, Bus, BusInner, Error, ErrorMessage, Message,
};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct Context {
pub stream_id: u32,
pub task_id: u32,
pub index: u64,
pub load: (u32, u32),
}
pub trait Handler<M: Message>: Send + Sync + 'static {
type Result: Message + Unpin;
type Error: ErrorMessage + Unpin;
@ -26,8 +35,7 @@ pub trait Handler<M: Message>: Send + Sync + 'static {
fn handle(
&mut self,
msg: M,
stream_id: u32,
task_id: u32,
ctx: Context,
bus: crate::Bus,
) -> impl Future<
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
@ -37,8 +45,7 @@ pub trait Handler<M: Message>: Send + Sync + 'static {
fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: crate::Bus,
) -> impl Future<
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
@ -81,13 +88,19 @@ where
let bus = Bus { inner: bus.clone() };
let config = self.builder.config(stream_id);
let mut ctx = self.builder.build(stream_id, task_id).await?;
let mut handler = self.builder.build(stream_id, task_id).await?;
let _handle = tokio::spawn(async move {
let _test = spawn_counter.clone().lease_unit(|| true);
while let Some(msg) = rx.recv().await {
let _test = task_counter.clone().lease_unit(|| rx.is_empty());
let ctx = Context {
stream_id,
task_id,
index: msg.index,
load: rx.load(),
};
let res = match msg.inner {
Some(Ok(m)) => {
@ -98,7 +111,8 @@ where
stream_id,
&config,
Some(
ctx.handle(m, stream_id, task_id, bus.clone())
handler
.handle(m, ctx, bus.clone())
.await
.map(IntoMessages::into_messages),
),
@ -113,7 +127,8 @@ where
stream_id,
&config,
Some(
ctx.handle_error(err, stream_id, task_id, bus.clone())
handler
.handle_error(err, ctx, bus.clone())
.await
.map(IntoMessages::into_messages),
),
@ -153,7 +168,7 @@ where
std::any::type_name::<B>()
);
if let Err(err) = ctx.finalize(bus.clone()).await {
if let Err(err) = handler.finalize(bus.clone()).await {
println!("TASK FINALIZE ERROR: {:?}", err);
}
});

View File

@ -34,7 +34,7 @@ use tokio::sync::{Notify, RwLock};
pub use async_iter::*;
pub use builder::{Builder, DefaultBuilder, SharedBuilder};
pub use error::{Error, VoidError};
pub use handler::Handler;
pub use handler::{Context, Handler};
pub use message::{ErrorMessage, IntoMessages, Message};
pub const DEFAUL_STREAM_ID: u32 = u32::MAX;
@ -454,18 +454,16 @@ where
async fn handle(
&mut self,
_msg: M,
_stream_id: u32,
_task_id: u32,
ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
(self.cb)(_stream_id, _task_id, _msg).map(|x| [x])
(self.cb)(ctx.stream_id, ctx.task_id, _msg).map(|x| [x])
}
async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
@ -484,7 +482,8 @@ mod tests {
use rand::RngCore;
use crate::{
stream, Bus, DefaultBuilder, Error, Handler, IntoMessages, Message, SharedBuilder,
handler::Context, stream, Bus, DefaultBuilder, Error, Handler, IntoMessages, Message,
SharedBuilder,
};
impl Message for u64 {}
@ -501,8 +500,7 @@ mod tests {
async fn handle(
&mut self,
_msg: u32,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(stream(stream! {
@ -515,8 +513,7 @@ mod tests {
async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
@ -542,14 +539,13 @@ mod tests {
async fn handle(
&mut self,
msg: u64,
stream_id: u32,
task_id: u32,
ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
tokio::time::sleep(Duration::from_millis(1000)).await;
println!(
"[{}] shared consumer handle {}u64 ({}:{})",
self.0, msg, stream_id, task_id
self.0, msg, ctx.stream_id, ctx.task_id
);
Ok(())
@ -558,8 +554,7 @@ mod tests {
async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
@ -578,14 +573,13 @@ mod tests {
async fn handle(
&mut self,
msg: u64,
stream_id: u32,
task_id: u32,
ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
tokio::time::sleep(Duration::from_millis(100)).await;
println!(
"[{}] consumer handle {}u64 ({}:{})",
self.0, msg, stream_id, task_id
self.0, msg, ctx.stream_id, ctx.task_id
);
Ok(())
}
@ -593,8 +587,7 @@ mod tests {
async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
@ -615,11 +608,10 @@ mod tests {
async fn handle(
&mut self,
msg: i16,
_stream_id: u32,
task_id: u32,
ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
if task_id % 2 == 0 {
if ctx.task_id % 2 == 0 {
tokio::time::sleep(Duration::from_millis(13)).await;
} else {
tokio::time::sleep(Duration::from_millis(22)).await;
@ -632,8 +624,7 @@ mod tests {
async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
@ -656,8 +647,7 @@ mod tests {
async fn handle(
&mut self,
msg: u16,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
println!("{}", msg);
@ -675,8 +665,7 @@ mod tests {
async fn handle_error(
&mut self,
err: Error,
_stream_id: u32,
_task_id: u32,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
println!("{:?}", err);