Add modules

This commit is contained in:
Andrey Tkachenko 2021-06-28 19:37:53 +04:00
parent f51433557e
commit 655fdf8462
4 changed files with 94 additions and 36 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.6.3"
version = "0.6.4"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -1,4 +1,4 @@
use messagebus::{receivers, Bus, Message, error, Handler};
use messagebus::{Bus, Handler, Message, Module, error, receivers};
use thiserror::Error;
#[derive(Debug, Error)]
@ -50,14 +50,19 @@ impl Handler<u32> for TmpReceiver {
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
fn module() -> Module {
Module::new()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.done()
.done()
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.add_module(module())
.build();
b.send(32f32).await.unwrap();

View File

@ -31,9 +31,10 @@ pub struct SyncEntry;
pub struct UnsyncEntry;
#[must_use]
pub struct RegisterEntry<K, T> {
pub struct RegisterEntry<K, T, F, B> {
item: Untyped,
builder: BusBuilder,
payload: B,
builder: F,
receivers: HashMap<
TypeId,
Vec<(
@ -50,23 +51,23 @@ pub struct RegisterEntry<K, T> {
_m: PhantomData<(K, T)>,
}
impl<K, T: 'static> RegisterEntry<K, T> {
pub fn done(self) -> BusBuilder {
let mut builder = self.builder;
impl<K, T: 'static, F, B> RegisterEntry<K, T, F, B>
where F: FnMut(&mut B, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>),
{
pub fn done(mut self) -> B {
for (tid, v) in self.receivers {
for (r, poller, poller2) in v {
let poller = poller(self.item.clone());
builder.add_recevier((tid, r), poller, poller2);
(self.builder)(&mut self.payload, (tid, r), poller, poller2);
}
}
builder
self.payload
}
}
impl<T> RegisterEntry<UnsyncEntry, T> {
impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self
where
T: Send + 'static,
@ -88,7 +89,7 @@ impl<T> RegisterEntry<UnsyncEntry, T> {
}
}
impl<T> RegisterEntry<SyncEntry, T> {
impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self
where
T: Send + Sync + 'static,
@ -110,12 +111,13 @@ impl<T> RegisterEntry<SyncEntry, T> {
}
}
pub struct BusBuilder {
pub struct Module {
receivers: Vec<(TypeId, Receiver)>,
pollings: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
}
impl BusBuilder {
impl Module {
pub fn new() -> Self {
Self {
receivers: Vec::new(),
@ -123,42 +125,92 @@ impl BusBuilder {
}
}
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T> {
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> {
RegisterEntry {
item: Arc::new(item) as Untyped,
builder: self,
payload: self,
builder: |p: &mut Self, val, poller, poller2| {
p.receivers.push(val);
p.pollings.push(poller);
p.pollings.push(poller2);
},
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T> {
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> {
RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped,
builder: self,
payload: self,
builder: |p: &mut Self, val, poller, poller2| {
p.receivers.push(val);
p.pollings.push(poller);
p.pollings.push(poller2);
},
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn add_recevier(
&mut self,
val: (TypeId, Receiver),
poller: Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
poller2: Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
) {
self.receivers.push(val);
self.pollings.push(poller);
self.pollings.push(poller2);
fn extend(&mut self, other: Module) {
self.receivers.extend(other.receivers.into_iter());
self.pollings.extend(other.pollings.into_iter());
}
}
pub struct BusBuilder {
inner: Module,
}
impl BusBuilder {
pub(crate) fn new() -> Self {
Self {
inner: Module::new(),
}
}
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> {
RegisterEntry {
item: Arc::new(item) as Untyped,
payload: self,
builder: |p: &mut Self, val, poller, poller2| {
p.inner.receivers.push(val);
p.inner.pollings.push(poller);
p.inner.pollings.push(poller2);
},
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> {
RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped,
payload: self,
builder: |p: &mut Self, val, poller, poller2| {
p.inner.receivers.push(val);
p.inner.pollings.push(poller);
p.inner.pollings.push(poller2);
},
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn add_module(mut self, module: Module) -> Self {
self.inner.extend(module);
self
}
pub fn build(self) -> (Bus, impl Future<Output = ()>) {
let bus = Bus {
inner: Arc::new(BusInner::new(self.receivers)),
inner: Arc::new(BusInner::new(self.inner.receivers)),
};
let mut futs = Vec::with_capacity(self.pollings.len() * 2);
for poller in self.pollings {
let mut futs = Vec::with_capacity(self.inner.pollings.len() * 2);
for poller in self.inner.pollings {
futs.push(tokio::task::spawn(poller(bus.clone())));
}

View File

@ -10,8 +10,9 @@ mod trait_object;
extern crate log;
use crate::receiver::Permit;
pub use builder::BusBuilder;
use builder::BusBuilder;
use core::any::{Any, TypeId};
pub use builder::Module;
pub use envelop::Message;
pub use handler::*;
use receiver::Receiver;