Stream per message

This commit is contained in:
Andrey Tkachenko 2023-11-29 11:41:25 +04:00
parent 4dd6a7a2af
commit c240529fd2
5 changed files with 63 additions and 6 deletions

View File

@ -1,10 +1,15 @@
[package] [package]
name = "messagebus" name = "messagebus"
version = "0.1.0" version = "0.15.0"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]
categories = ["network-programming", "asynchronous"]
description = "MessageBus allows intercommunicate with messages between modules"
exclude = [".gitignore", ".cargo/config", ".github/**", ".drone.yml"]
license = "MIT OR Apache-2.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1.0.75" anyhow = "1.0.75"
boxcar = "0.2.3" boxcar = "0.2.3"

View File

@ -11,6 +11,7 @@ pub struct Config {
pub ordered: bool, pub ordered: bool,
pub task_count: u32, pub task_count: u32,
pub lazy_task_creation: bool, pub lazy_task_creation: bool,
pub stream_per_message: bool,
} }
impl Default for Config { impl Default for Config {
@ -21,6 +22,7 @@ impl Default for Config {
ordered: false, ordered: false,
task_count: 1, task_count: 1,
lazy_task_creation: true, lazy_task_creation: true,
stream_per_message: false,
} }
} }
} }
@ -65,6 +67,7 @@ where
ordered: false, ordered: false,
task_count: 1, task_count: 1,
lazy_task_creation: true, lazy_task_creation: true,
stream_per_message: false,
}, },
callback, callback,
_m: PhantomData, _m: PhantomData,
@ -82,6 +85,17 @@ where
} }
} }
pub fn stream_per_message(self) -> Self {
let mut config = self.config;
config.stream_per_message = true;
Self {
config,
callback: self.callback,
_m: PhantomData,
}
}
pub fn tasks(self, tasks: u32) -> Self { pub fn tasks(self, tasks: u32) -> Self {
let mut config = self.config; let mut config = self.config;
config.task_count = tasks; config.task_count = tasks;
@ -139,6 +153,7 @@ where
ordered: false, ordered: false,
task_count, task_count,
lazy_task_creation: true, lazy_task_creation: true,
stream_per_message: false,
}, },
stream_handlers: Default::default(), stream_handlers: Default::default(),
callback, callback,
@ -146,6 +161,18 @@ where
} }
} }
pub fn stream_per_message(self) -> Self {
let mut config = self.config;
config.stream_per_message = true;
Self {
config,
callback: self.callback,
_m: PhantomData,
stream_handlers: Default::default(),
}
}
pub fn ordered(self) -> Self { pub fn ordered(self) -> Self {
let mut config = self.config; let mut config = self.config;
config.ordered = true; config.ordered = true;

View File

@ -28,7 +28,7 @@ pub struct Context {
pub load: (u32, u32), pub load: (u32, u32),
} }
pub trait Handler<M: Message>: Send + Sync + 'static { pub trait Handler<M: Message>: Send + 'static {
type Result: Message + Unpin; type Result: Message + Unpin;
type Error: ErrorMessage + Unpin; type Error: ErrorMessage + Unpin;
@ -209,6 +209,12 @@ async fn send_result<'a, M: Message, E: ErrorMessage>(
_ => { _ => {
while let Some(item) = iter.as_mut().next().await { while let Some(item) = iter.as_mut().next().await {
let index = index_counter.fetch_add(1, Ordering::Relaxed); let index = index_counter.fetch_add(1, Ordering::Relaxed);
let stream_id = if config.stream_per_message {
bus.next_stream_id()
} else {
stream_id
};
bus.send::<M>( bus.send::<M>(
Some(item.map_err(Into::into)), Some(item.map_err(Into::into)),
index, index,

View File

@ -16,7 +16,7 @@ use std::{
collections::HashMap, collections::HashMap,
marker::PhantomData, marker::PhantomData,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
Arc, Arc,
}, },
}; };
@ -35,7 +35,7 @@ pub use async_iter::*;
pub use builder::{Builder, DefaultBuilder, SharedBuilder}; pub use builder::{Builder, DefaultBuilder, SharedBuilder};
pub use error::{Error, VoidError}; pub use error::{Error, VoidError};
pub use handler::{Context, Handler}; pub use handler::{Context, Handler};
pub use message::{ErrorMessage, IntoMessages, Message}; pub use message::{async_iter, ErrorMessage, IntoMessages, Message};
pub const DEFAUL_STREAM_ID: u32 = u32::MAX; pub const DEFAUL_STREAM_ID: u32 = u32::MAX;
pub const DEFAUL_TASK_ID: u32 = 0; pub const DEFAUL_TASK_ID: u32 = 0;
@ -46,6 +46,7 @@ struct BusInner {
spawners: RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>, spawners: RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>,
reordering: DashMap<(u32, TypeId), Arc<dyn AbstractSender>>, reordering: DashMap<(u32, TypeId), Arc<dyn AbstractSender>>,
counters: DashMap<(u32, TypeId), Arc<AtomicU64>>, counters: DashMap<(u32, TypeId), Arc<AtomicU64>>,
stream_id_seq: AtomicU32,
abort_notify: Arc<Notify>, abort_notify: Arc<Notify>,
task_counter: Arc<TaskCounter>, task_counter: Arc<TaskCounter>,
spawn_counter: Arc<TaskCounter>, spawn_counter: Arc<TaskCounter>,
@ -55,6 +56,11 @@ struct BusInner {
} }
impl BusInner { impl BusInner {
#[inline]
pub fn next_stream_id(&self) -> u32 {
self.stream_id_seq.fetch_add(1, Ordering::Relaxed)
}
fn get_task_id<M: Message>(&self, stream_id: u32, config: &Config) -> u32 { fn get_task_id<M: Message>(&self, stream_id: u32, config: &Config) -> u32 {
if !config.queue_per_task || config.task_count == 1 { if !config.queue_per_task || config.task_count == 1 {
return DEFAUL_TASK_ID; return DEFAUL_TASK_ID;

View File

@ -11,6 +11,11 @@ pub trait IntoMessages<M: Message, E: ErrorMessage> {
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>; fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>;
} }
pub struct AsyncIter<I: AsyncIterator>(I);
pub fn async_iter<I: AsyncIterator>(i: I) -> AsyncIter<I> {
AsyncIter(i)
}
impl Message for () {} impl Message for () {}
impl ErrorMessage for anyhow::Error {} impl ErrorMessage for anyhow::Error {}
@ -38,6 +43,14 @@ where
} }
} }
impl<M: Message, E: ErrorMessage, I: AsyncIterator<Item = Result<M, E>>> IntoMessages<M, E>
for AsyncIter<I>
{
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
self.0
}
}
impl<E: ErrorMessage> IntoMessages<(), E> for () { impl<E: ErrorMessage> IntoMessages<(), E> for () {
fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> { fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> {
crate::empty() crate::empty()