From 655fdf846266adea6e02064283a1ddc91d246da7 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Mon, 28 Jun 2021 19:37:53 +0400 Subject: [PATCH] Add modules --- Cargo.toml | 2 +- examples/demo_slow.rs | 17 ++++--- src/builder.rs | 108 +++++++++++++++++++++++++++++++----------- src/lib.rs | 3 +- 4 files changed, 94 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 89bc5bd..8a03207 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "messagebus" -version = "0.6.3" +version = "0.6.4" authors = ["Andrey Tkachenko "] repository = "https://github.com/andreytkachenko/messagebus.git" keywords = ["futures", "async", "tokio", "message", "bus"] diff --git a/examples/demo_slow.rs b/examples/demo_slow.rs index 9e66a6d..1f01ca7 100644 --- a/examples/demo_slow.rs +++ b/examples/demo_slow.rs @@ -1,4 +1,4 @@ -use messagebus::{receivers, Bus, Message, error, Handler}; +use messagebus::{Bus, Handler, Message, Module, error, receivers}; use thiserror::Error; #[derive(Debug, Error)] @@ -50,14 +50,19 @@ impl Handler for TmpReceiver { } } -#[tokio::main] -async fn main() { - let (b, poller) = Bus::build() - .register(TmpReceiver) +fn module() -> Module { + Module::new() + .register(TmpReceiver) .subscribe::, _, _>(8, Default::default()) .subscribe::, _, _>(8, Default::default()) .subscribe::, _, _>(8, Default::default()) - .done() + .done() +} + +#[tokio::main] +async fn main() { + let (b, poller) = Bus::build() + .add_module(module()) .build(); b.send(32f32).await.unwrap(); diff --git a/src/builder.rs b/src/builder.rs index 2ef2841..0c3c70c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -31,9 +31,10 @@ pub struct SyncEntry; pub struct UnsyncEntry; #[must_use] -pub struct RegisterEntry { +pub struct RegisterEntry { item: Untyped, - builder: BusBuilder, + payload: B, + builder: F, receivers: HashMap< TypeId, Vec<( @@ -50,23 +51,23 @@ pub struct RegisterEntry { _m: PhantomData<(K, T)>, } -impl RegisterEntry { - pub fn done(self) -> BusBuilder { - let mut builder = self.builder; - +impl RegisterEntry + where F: FnMut(&mut B, (TypeId, Receiver), Box Pin + Send>>>, Box Pin + Send>>>), +{ + pub fn done(mut self) -> B { for (tid, v) in self.receivers { for (r, poller, poller2) in v { let poller = poller(self.item.clone()); - builder.add_recevier((tid, r), poller, poller2); + (self.builder)(&mut self.payload, (tid, r), poller, poller2); } } - builder + self.payload } } -impl RegisterEntry { +impl RegisterEntry { pub fn subscribe(mut self, queue: u64, cfg: S::Config) -> Self where T: Send + 'static, @@ -88,7 +89,7 @@ impl RegisterEntry { } } -impl RegisterEntry { +impl RegisterEntry { pub fn subscribe(mut self, queue: u64, cfg: S::Config) -> Self where T: Send + Sync + 'static, @@ -110,12 +111,13 @@ impl RegisterEntry { } } -pub struct BusBuilder { + +pub struct Module { receivers: Vec<(TypeId, Receiver)>, pollings: Vec Pin + Send>>>>, } -impl BusBuilder { +impl Module { pub fn new() -> Self { Self { receivers: Vec::new(), @@ -123,42 +125,92 @@ impl BusBuilder { } } - pub fn register(self, item: T) -> RegisterEntry { + pub fn register(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { RegisterEntry { item: Arc::new(item) as Untyped, - builder: self, + payload: self, + builder: |p: &mut Self, val, poller, poller2| { + p.receivers.push(val); + p.pollings.push(poller); + p.pollings.push(poller2); + }, receivers: HashMap::new(), _m: Default::default(), } } - pub fn register_unsync(self, item: T) -> RegisterEntry { + pub fn register_unsync(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { RegisterEntry { item: Arc::new(Mutex::new(item)) as Untyped, - builder: self, + payload: self, + builder: |p: &mut Self, val, poller, poller2| { + p.receivers.push(val); + p.pollings.push(poller); + p.pollings.push(poller2); + }, receivers: HashMap::new(), _m: Default::default(), } } - pub fn add_recevier( - &mut self, - val: (TypeId, Receiver), - poller: Box Pin + Send>>>, - poller2: Box Pin + Send>>>, - ) { - self.receivers.push(val); - self.pollings.push(poller); - self.pollings.push(poller2); + fn extend(&mut self, other: Module) { + self.receivers.extend(other.receivers.into_iter()); + self.pollings.extend(other.pollings.into_iter()); + } +} + +pub struct BusBuilder { + inner: Module, +} + +impl BusBuilder { + pub(crate) fn new() -> Self { + Self { + inner: Module::new(), + } + } + + pub fn register(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { + RegisterEntry { + item: Arc::new(item) as Untyped, + payload: self, + builder: |p: &mut Self, val, poller, poller2| { + p.inner.receivers.push(val); + p.inner.pollings.push(poller); + p.inner.pollings.push(poller2); + }, + receivers: HashMap::new(), + _m: Default::default(), + } + } + + pub fn register_unsync(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { + RegisterEntry { + item: Arc::new(Mutex::new(item)) as Untyped, + payload: self, + builder: |p: &mut Self, val, poller, poller2| { + p.inner.receivers.push(val); + p.inner.pollings.push(poller); + p.inner.pollings.push(poller2); + }, + receivers: HashMap::new(), + _m: Default::default(), + } + } + + pub fn add_module(mut self, module: Module) -> Self { + self.inner.extend(module); + + self } pub fn build(self) -> (Bus, impl Future) { let bus = Bus { - inner: Arc::new(BusInner::new(self.receivers)), + inner: Arc::new(BusInner::new(self.inner.receivers)), }; - let mut futs = Vec::with_capacity(self.pollings.len() * 2); - for poller in self.pollings { + let mut futs = Vec::with_capacity(self.inner.pollings.len() * 2); + for poller in self.inner.pollings { futs.push(tokio::task::spawn(poller(bus.clone()))); } diff --git a/src/lib.rs b/src/lib.rs index c969a4a..93d46b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,9 @@ mod trait_object; extern crate log; use crate::receiver::Permit; -pub use builder::BusBuilder; +use builder::BusBuilder; use core::any::{Any, TypeId}; +pub use builder::Module; pub use envelop::Message; pub use handler::*; use receiver::Receiver;