diff --git a/Cargo.toml b/Cargo.toml index 35ff9d9..4c6a7fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,15 @@ [package] name = "messagebus" -version = "0.1.0" +version = "0.15.0" +authors = ["Andrey Tkachenko "] +repository = "https://github.com/andreytkachenko/messagebus.git" +keywords = ["futures", "async", "tokio", "message", "bus"] +categories = ["network-programming", "asynchronous"] +description = "MessageBus allows intercommunicate with messages between modules" +exclude = [".gitignore", ".cargo/config", ".github/**", ".drone.yml"] +license = "MIT OR Apache-2.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] anyhow = "1.0.75" boxcar = "0.2.3" diff --git a/src/builder.rs b/src/builder.rs index 7383d60..3afbc55 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -11,6 +11,7 @@ pub struct Config { pub ordered: bool, pub task_count: u32, pub lazy_task_creation: bool, + pub stream_per_message: bool, } impl Default for Config { @@ -21,6 +22,7 @@ impl Default for Config { ordered: false, task_count: 1, lazy_task_creation: true, + stream_per_message: false, } } } @@ -65,6 +67,7 @@ where ordered: false, task_count: 1, lazy_task_creation: true, + stream_per_message: false, }, callback, _m: PhantomData, @@ -82,6 +85,17 @@ where } } + pub fn stream_per_message(self) -> Self { + let mut config = self.config; + config.stream_per_message = true; + + Self { + config, + callback: self.callback, + _m: PhantomData, + } + } + pub fn tasks(self, tasks: u32) -> Self { let mut config = self.config; config.task_count = tasks; @@ -139,6 +153,7 @@ where ordered: false, task_count, lazy_task_creation: true, + stream_per_message: false, }, stream_handlers: Default::default(), callback, @@ -146,6 +161,18 @@ where } } + pub fn stream_per_message(self) -> Self { + let mut config = self.config; + config.stream_per_message = true; + + Self { + config, + callback: self.callback, + _m: PhantomData, + stream_handlers: Default::default(), + } + } + pub fn ordered(self) -> Self { let mut config = self.config; config.ordered = true; diff --git a/src/handler.rs b/src/handler.rs index 2f283e9..1cfeae0 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -28,7 +28,7 @@ pub struct Context { pub load: (u32, u32), } -pub trait Handler: Send + Sync + 'static { +pub trait Handler: Send + 'static { type Result: Message + Unpin; type Error: ErrorMessage + Unpin; @@ -209,6 +209,12 @@ async fn send_result<'a, M: Message, E: ErrorMessage>( _ => { while let Some(item) = iter.as_mut().next().await { let index = index_counter.fetch_add(1, Ordering::Relaxed); + let stream_id = if config.stream_per_message { + bus.next_stream_id() + } else { + stream_id + }; + bus.send::( Some(item.map_err(Into::into)), index, diff --git a/src/lib.rs b/src/lib.rs index 4557aca..fd33e0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ use std::{ collections::HashMap, marker::PhantomData, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, }, }; @@ -35,7 +35,7 @@ pub use async_iter::*; pub use builder::{Builder, DefaultBuilder, SharedBuilder}; pub use error::{Error, VoidError}; pub use handler::{Context, Handler}; -pub use message::{ErrorMessage, IntoMessages, Message}; +pub use message::{async_iter, ErrorMessage, IntoMessages, Message}; pub const DEFAUL_STREAM_ID: u32 = u32::MAX; pub const DEFAUL_TASK_ID: u32 = 0; @@ -46,6 +46,7 @@ struct BusInner { spawners: RwLock>>, reordering: DashMap<(u32, TypeId), Arc>, counters: DashMap<(u32, TypeId), Arc>, + stream_id_seq: AtomicU32, abort_notify: Arc, task_counter: Arc, spawn_counter: Arc, @@ -55,6 +56,11 @@ struct BusInner { } impl BusInner { + #[inline] + pub fn next_stream_id(&self) -> u32 { + self.stream_id_seq.fetch_add(1, Ordering::Relaxed) + } + fn get_task_id(&self, stream_id: u32, config: &Config) -> u32 { if !config.queue_per_task || config.task_count == 1 { return DEFAUL_TASK_ID; diff --git a/src/message.rs b/src/message.rs index 91bef14..f3668e1 100644 --- a/src/message.rs +++ b/src/message.rs @@ -11,6 +11,11 @@ pub trait IntoMessages { fn into_messages(self) -> impl AsyncIterator>; } +pub struct AsyncIter(I); +pub fn async_iter(i: I) -> AsyncIter { + AsyncIter(i) +} + impl Message for () {} impl ErrorMessage for anyhow::Error {} @@ -38,6 +43,14 @@ where } } +impl>> IntoMessages + for AsyncIter +{ + fn into_messages(self) -> impl AsyncIterator> { + self.0 + } +} + impl IntoMessages<(), E> for () { fn into_messages(self) -> impl AsyncIterator> { crate::empty()