MessageBus: Initial

This commit is contained in:
Andrey Tkachenko 2020-12-17 17:35:11 +04:00
commit 7240aa2b74
28 changed files with 2799 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
Cargo.lock

22
Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "messagebus"
version = "0.4.2"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]
categories = ["network-programming", "asynchronous"]
description = "MessageBus allows you intercommunicate with messages between modules"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
[dependencies]
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "blocking"] }
parking_lot = "0.11.1"
async-trait = "0.1.42"
futures = "0.3.8"
anyhow = "1.0.34"
crossbeam = "0.8.0"
[dev-dependencies]
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "macros"] }

121
README.md Normal file
View File

@ -0,0 +1,121 @@
<div align="center">
<h1>Message Bus</h1>
<p>
<strong>Async Message Bus for Rust</strong>
</p>
<p>
</div>
Inspired by Actix
### Basics
1. Can deliver messages between actors using receivers (usually a queue implementations)
2. Messages distincts and delivers by TypeId
3. Messages delivers in a broadcast fashion to many receivers (Cloned)
4. There are different kind of receivers implemented:
- BufferUnordered Receiver (in sync and async version depending by handler)
- Synchronized (also sync and async) if receiving part needs syncronization
- SynchronizeBuffered (also sync and async)
here are the implmented handlers definitions:
```rust
// Handler is Sync and we can spawn many of concurrent tasks
pub trait Handler<M: Message>: Send + Sync {
fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {Ok(())}
}
#[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync {
async fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {Ok(())}
}
// Handler is not Sync and we cannot spawn many of concurrent tasks same time (uses synchronization primitives such as Mutex or RwLock)
pub trait SynchronizedHandler<M: Message>: Send {
fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())}
}
#[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send {
async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())}
}
// Handler is not Sync and handler will process items in batched mode
pub trait BatchSynchronizedHandler<M: Message>: Send {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())}
}
#[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {Ok(())}
}
```
4. Handler Kinds:
1. No Synchronization needed (Handler is `Send` + `Sync`)
* Not batched operations **(implemented)**
- sync (spawn_blocking)
- async (spawn)
* Batched
- sync (spawn_blocking)
- async (spawn)
2. Synchronization needed (Handler is `Sync` + `!Send`)
* Not batched operations **(implemented)**
- sync (spawn_blocking)
- async (spawn)
* Batched **(implemented)**
- sync (spawn_blocking)
- async (spawn)
3. Synchronization needed and thread dedicated (Handler is `!Sync` + `!Send`)
* Not batched operations
- sync (spawn_blocking)
- async (spawn)
* Batched
- sync (spawn_blocking)
- async (spawn)
5. Example:
```rust
use messagebus::{Bus, AsyncHandler, Result as MbusResult, receivers};
use async_trait::async_trait;
struct TmpReceiver;
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
async fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
println!("---> i32 {}", msg);
bus.send(2i64).await?;
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i64> for TmpReceiver {
async fn handle(&self, msg: i64, _bus: &Bus) -> MbusResult {
println!("---> i64 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<i32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i64, receivers::BufferUnorderedAsync<_>>(Default::default())
.done()
.build();
b.send(1i32).await.unwrap();
poller.await
}
```

79
examples/demo.rs Normal file
View File

@ -0,0 +1,79 @@
use messagebus::{receivers, Bus, Handler, Result as MbusResult};
struct TmpReceiver;
struct TmpReceiver2;
impl Handler<f32> for TmpReceiver {
fn handle(&self, msg: f32, bus: &Bus) -> MbusResult {
bus.try_send(1u16).unwrap();
println!("---> f32 {}", msg);
Ok(())
}
}
impl Handler<u16> for TmpReceiver {
fn handle(&self, msg: u16, bus: &Bus) -> MbusResult {
bus.try_send(1u32).unwrap();
println!("---> u16 {}", msg);
Ok(())
}
}
impl Handler<u32> for TmpReceiver {
fn handle(&self, msg: u32, bus: &Bus) -> MbusResult {
bus.try_send(2i32).unwrap();
println!("---> u32 {}", msg);
Ok(())
}
}
impl Handler<i32> for TmpReceiver {
fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
bus.try_send(3i16).unwrap();
println!("---> i32 {}", msg);
Ok(())
}
}
impl Handler<i16> for TmpReceiver {
fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> i16 {}", msg);
Ok(())
}
}
impl Handler<i32> for TmpReceiver2 {
fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
bus.try_send(3i16).unwrap();
println!("---> 2 i32 {}", msg);
Ok(())
}
}
impl Handler<i16> for TmpReceiver2 {
fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> 2 i16 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>>(Default::default())
.done()
.register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>>(Default::default())
.done()
.build();
b.send(32f32).await.unwrap();
poller.await
}

93
examples/demo_async.rs Normal file
View File

@ -0,0 +1,93 @@
use async_trait::async_trait;
use messagebus::{receivers, AsyncHandler, Bus, Handler, Result as MbusResult};
struct TmpReceiver;
struct TmpReceiver2;
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver {
async fn handle(&self, msg: f32, bus: &Bus) -> MbusResult {
bus.send(1u16).await?;
println!("---> f32 {}", msg);
Ok(())
}
}
#[async_trait]
impl AsyncHandler<u16> for TmpReceiver {
async fn handle(&self, msg: u16, bus: &Bus) -> MbusResult {
bus.send(2u32).await?;
println!("---> u16 {}", msg);
Ok(())
}
}
#[async_trait]
impl AsyncHandler<u32> for TmpReceiver {
async fn handle(&self, msg: u32, bus: &Bus) -> MbusResult {
bus.send(3i32).await?;
println!("---> u32 {}", msg);
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
async fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
bus.send(4i16).await?;
println!("---> i32 {}", msg);
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i16> for TmpReceiver {
async fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> i16 {}", msg);
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver2 {
async fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
bus.send(5i16).await?;
println!("---> 2 i32 {}", msg);
Ok(())
}
}
impl Handler<i16> for TmpReceiver2 {
fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> 2 i16 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_>>(Default::default())
.done()
.register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>>(Default::default())
.done()
.build();
b.send(0f32).await.unwrap();
poller.await
}

View File

@ -0,0 +1,63 @@
use messagebus::{receivers, Bus, Handler, Result as MbusResult};
struct TmpReceiver;
impl Handler<f32> for TmpReceiver {
fn handle(&self, msg: f32, _bus: &Bus) -> MbusResult {
println!("---> f32 {}", msg);
std::thread::sleep(std::time::Duration::from_secs(1));
println!("done");
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>>(receivers::BufferUnorderedConfig {
buffer_size: 1,
max_parallel: 1,
})
.done()
.build();
println!("sending 1");
b.send(32f32).await.unwrap();
println!("sending 2");
b.send(32f32).await.unwrap();
println!("sending 3");
b.send(32f32).await.unwrap();
println!("sending 4");
b.send(32f32).await.unwrap();
println!("sending 5");
b.send(32f32).await.unwrap();
println!("sending 6");
b.send(32f32).await.unwrap();
println!("sending 7");
b.send(32f32).await.unwrap();
println!("sending 8");
b.send(32f32).await.unwrap();
println!("sending 9");
b.send(32f32).await.unwrap();
println!("sending 10");
b.send(32f32).await.unwrap();
println!("sending 11");
b.send(32f32).await.unwrap();
println!("finish");
poller.await;
}

46
examples/demo_slow.rs Normal file
View File

@ -0,0 +1,46 @@
use messagebus::{receivers, Bus, Handler, Result as MbusResult};
struct TmpReceiver;
impl Handler<f32> for TmpReceiver {
fn handle(&self, msg: f32, _bus: &Bus) -> MbusResult {
println!("---> f32 {}", msg);
std::thread::sleep(std::time::Duration::from_secs(5));
println!("done");
Ok(())
}
}
impl Handler<u16> for TmpReceiver {
fn handle(&self, msg: u16, _bus: &Bus) -> MbusResult {
println!("---> u16 {}", msg);
Ok(())
}
}
impl Handler<u32> for TmpReceiver {
fn handle(&self, msg: u32, _bus: &Bus) -> MbusResult {
println!("---> u32 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_>>(Default::default())
.done()
.build();
b.send(32f32).await.unwrap();
b.send(11u16).await.unwrap();
b.send(32u32).await.unwrap();
poller.await
}

57
examples/non_sync.rs Normal file
View File

@ -0,0 +1,57 @@
use messagebus::{receivers, Bus, Result as MbusResult, SynchronizedHandler};
use receivers::SynchronizedConfig;
struct TmpReceiver;
impl SynchronizedHandler<f32> for TmpReceiver {
fn handle(&mut self, msg: f32, _bus: &Bus) -> MbusResult {
println!("---> f32 {}", msg);
std::thread::sleep(std::time::Duration::from_secs(1));
println!("done");
Ok(())
}
}
impl SynchronizedHandler<i16> for TmpReceiver {
fn handle(&mut self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> i16 {}", msg);
std::thread::sleep(std::time::Duration::from_secs(1));
println!("done");
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register_unsync(TmpReceiver)
.subscribe::<f32, receivers::SynchronizedSync<_>>(SynchronizedConfig { buffer_size: 1 })
.subscribe::<i16, receivers::SynchronizedSync<_>>(Default::default())
.done()
.build();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
println!("finish");
poller.await;
}

160
src/builder.rs Normal file
View File

@ -0,0 +1,160 @@
use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc};
use futures::{Future, FutureExt};
use receiver::ReceiverTrait;
use tokio::sync::Mutex;
use crate::{
receiver::{self, Receiver},
Bus, BusInner, Message, Untyped,
};
pub trait ReceiverSubscriber<T: 'static> {
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
);
}
pub trait ReceiverSubscriberBuilder<M, T: 'static> {
type Entry: ReceiverSubscriber<T>;
type Config: Default;
fn build(cfg: Self::Config) -> Self::Entry;
}
pub struct SyncEntry;
pub struct UnsyncEntry;
#[must_use]
pub struct RegisterEntry<K, T> {
item: Untyped,
builder: BusBuilder,
receivers: HashMap<
TypeId,
Vec<(
Receiver,
Box<
dyn FnOnce(
Untyped,
)
-> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
)>,
>,
_m: PhantomData<(K, T)>,
}
impl<K, T: 'static> RegisterEntry<K, T> {
pub fn done(self) -> BusBuilder {
let mut builder = self.builder;
for (tid, v) in self.receivers {
for (r, poller) in v {
let poller = poller(self.item.clone());
builder.add_recevier((tid, r), poller);
}
}
builder
}
}
impl<T: Send + 'static> RegisterEntry<UnsyncEntry, T> {
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
where
T: Send + 'static,
M: Message + 'static,
R: ReceiverSubscriberBuilder<M, T> + 'static,
{
let (inner, poller) = R::build(cfg).subscribe();
let receiver = Receiver::new(inner);
self.receivers
.entry(TypeId::of::<M>())
.or_insert_with(Vec::new)
.push((receiver, poller));
self
}
}
impl<T: Send + Sync + 'static> RegisterEntry<SyncEntry, T> {
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
where
T: Send + 'static,
M: Message + 'static,
R: ReceiverSubscriberBuilder<M, T> + 'static,
{
let (inner, poller) = R::build(cfg).subscribe();
let receiver = Receiver::new(inner);
self.receivers
.entry(TypeId::of::<M>())
.or_insert_with(Vec::new)
.push((receiver, poller));
self
}
}
pub struct BusBuilder {
receivers: Vec<(TypeId, Receiver)>,
pollings: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
}
impl BusBuilder {
pub fn new() -> Self {
Self {
receivers: Vec::new(),
pollings: Vec::new(),
}
}
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T> {
RegisterEntry {
item: Arc::new(item) as Untyped,
builder: self,
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T> {
RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped,
builder: self,
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn add_recevier(
&mut self,
val: (TypeId, Receiver),
poller: Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
) {
self.receivers.push(val);
self.pollings.push(poller);
}
pub fn build(self) -> (Bus, impl Future<Output = ()>) {
let bus = Bus {
inner: Arc::new(BusInner::new(self.receivers)),
};
let mut futs = Vec::with_capacity(self.pollings.len());
for poller in self.pollings {
futs.push(tokio::task::spawn(poller(bus.clone())));
}
let poller = futures::future::join_all(futs).map(|_| ()).map(|_| ());
(bus, poller)
}
}

102
src/envelop.rs Normal file
View File

@ -0,0 +1,102 @@
use core::any::{self, Any};
use core::fmt;
// use erased_serde::{Deserializer, Serialize};
pub trait Message: Any + fmt::Debug/*Serialize + for<'a> Deserializer<'a> + */ + Unpin + Clone + Send + Sync + 'static {}
impl<T: Any + fmt::Debug + Unpin + Clone + Send + Sync> Message for T {}
trait SafeMessage: Any + fmt::Debug/*+ Serialize + for<'a> Deserializer<'a>*/ + Unpin + Send + Sync + 'static {
fn type_name(&self) -> &'static str;
fn clone_boxed(&self) -> Box<dyn SafeMessage>;
}
impl<T: Message> SafeMessage for T {
fn type_name(&self) -> &'static str {
any::type_name::<T>()
}
fn clone_boxed(&self) -> Box<dyn SafeMessage> {
Box::new(self.clone())
}
}
// pub struct BoxedEnvelop {
// inner: Box<dyn SafeMessage>,
// }
// impl BoxedEnvelop {
// pub fn from_message<M: Message>(m: M) -> Self {
// Self {
// inner: Box::new(m)
// }
// }
// pub fn as_ref(&self) -> Envelop<'_> {
// Envelop { inner: &*self.inner }
// }
// pub fn downcast<T: 'static>(self) -> Option<Box<T>> {
// if (*self.inner).type_id() == TypeId::of::<T>() {
// unsafe {
// let raw: *mut dyn SafeMessage = Box::into_raw(self.inner);
// Some(Box::from_raw(raw as *mut T))
// }
// } else {
// None
// }
// }
// }
#[derive(Copy, Clone)]
pub struct Envelop<'inner> {
inner: &'inner dyn SafeMessage,
}
impl<'inner> fmt::Debug for Envelop<'inner> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Envelop(")?;
self.inner.fmt(f)?;
write!(f, ")")?;
Ok(())
}
}
impl<'inner> Envelop<'inner> {
// pub fn new<T: Message>(inner: &'inner T) -> Self {
// Self { inner }
// }
// #[inline]
// pub fn downcast_to<T: 'static>(&self) -> Option<&T> {
// if self.inner.type_id() == TypeId::of::<T>() {
// unsafe { Some(&*(self.inner as *const dyn SafeMessage as *const T)) }
// } else {
// None
// }
// }
// #[inline]
// pub fn type_id(&self) -> TypeId {
// self.inner.type_id()
// }
// #[inline]
// pub fn type_name(&self) -> &'static str {
// self.inner.type_name()
// }
// #[inline]
// pub fn clone_boxed(&self) -> BoxedEnvelop {
// BoxedEnvelop {
// inner: self.inner.clone_boxed(),
// }
// }
}
// impl<'inner> serde::Serialize for Envelop<'inner> {
// fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
// erased_serde::serialize(self.inner, serializer)
// }
// }

92
src/handler.rs Normal file
View File

@ -0,0 +1,92 @@
use crate::{Bus, Message};
use async_trait::async_trait;
pub trait Handler<M: Message>: Send + Sync {
fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync {
async fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
pub trait SynchronizedHandler<M: Message>: Send {
fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send {
async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
pub trait BatchHandler<M: Message>: Send + Sync {
fn handle(&self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync {
async fn handle(&self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
pub trait BatchSynchronizedHandler<M: Message>: Send {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
pub trait LocalHandler<M: Message> {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
pub trait LocalAsyncHandler<M: Message> {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
pub trait LocalBatchHandler<M: Message> {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
pub trait LocalAsyncBatchHandler<M: Message> {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}

126
src/lib.rs Normal file
View File

@ -0,0 +1,126 @@
mod builder;
mod envelop;
mod handler;
pub mod msgs;
mod receiver;
pub mod receivers;
mod trait_object;
mod utils;
use builder::BusBuilder;
pub use envelop::Message;
pub use handler::*;
pub use receiver::SendError;
use receiver::{Receiver, ReceiverStats};
use utils::binary_search_range_by_key;
use core::any::{Any, TypeId};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
pub type Untyped = Arc<dyn Any + Send + Sync>;
pub type Result = anyhow::Result<()>;
pub struct BusInner {
receivers: Vec<(TypeId, Receiver)>,
closed: AtomicBool,
}
impl BusInner {
pub(crate) fn new(mut receivers: Vec<(TypeId, Receiver)>) -> Self {
receivers.sort_unstable_by_key(|(k, _)| *k);
Self {
receivers,
closed: AtomicBool::new(false),
}
}
pub fn close(&self) {
self.closed.store(true, Ordering::SeqCst);
for (_, r) in &self.receivers {
r.close();
}
}
pub async fn sync(&self) {
for (_, r) in &self.receivers {
r.sync().await;
}
}
pub fn stats(&self) -> impl Iterator<Item = ReceiverStats> + '_ {
self.receivers.iter().map(|(_, r)| r.stats())
}
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) {
println!("Bus closed. Skipping send!");
return Ok(());
}
let tid = TypeId::of::<M>();
let range = binary_search_range_by_key(&self.receivers, &tid, |(k, _)| *k);
for i in (range.start + 1)..range.end {
self.receivers[i].1.try_broadcast(msg.clone())?;
}
if let Some((_, r)) = self.receivers.get(range.start) {
r.try_broadcast(msg.clone())?;
} else {
println!("Unhandled message {:?}", core::any::type_name::<M>());
}
Ok(())
}
#[inline]
pub fn send_blocking<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
futures::executor::block_on(self.send(msg))
}
pub async fn send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg));
}
let tid = TypeId::of::<M>();
let range = binary_search_range_by_key(&self.receivers, &tid, |(k, _)| *k);
for i in (range.start + 1)..range.end {
self.receivers[i].1.broadcast(msg.clone()).await?;
}
if let Some((_, r)) = self.receivers.get(range.start) {
r.broadcast(msg.clone()).await?;
} else {
println!("Unhandled message {:?}", core::any::type_name::<M>());
}
Ok(())
}
}
#[derive(Clone)]
pub struct Bus {
inner: Arc<BusInner>,
}
impl core::ops::Deref for Bus {
type Target = BusInner;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl Bus {
#[inline]
pub fn build() -> BusBuilder {
BusBuilder::new()
}
}

10
src/msgs.rs Normal file
View File

@ -0,0 +1,10 @@
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Error(pub Arc<anyhow::Error>);
impl<T: Into<anyhow::Error>> From<T> for Error {
fn from(e: T) -> Self {
Self(Arc::new(e.into()))
}
}

212
src/receiver.rs Normal file
View File

@ -0,0 +1,212 @@
use crate::{trait_object::TraitObject, Bus, Message};
use core::{
any::TypeId,
fmt,
future::Future,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
use futures::future::poll_fn;
use std::{borrow::Cow, sync::Arc};
pub struct AnyReceiver<'a> {
dyn_typed_receiver_trait_object: TraitObject,
type_id: TypeId,
_m: PhantomData<&'a usize>,
}
impl<'a> AnyReceiver<'a> {
pub fn new<M: Message, R: TypedReceiver<M> + 'static>(rcvr: &'a R) -> Self {
let trcvr = rcvr as &(dyn TypedReceiver<M>);
Self {
dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) },
type_id: TypeId::of::<dyn TypedReceiver<M>>(),
_m: Default::default(),
}
}
pub fn dyn_typed_receiver<M: Message>(&'a self) -> &'a dyn TypedReceiver<M> {
assert_eq!(self.type_id, TypeId::of::<dyn TypedReceiver<M>>());
unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) }
}
}
pub enum SendError<M> {
Full(M),
Closed(M),
}
impl<M: fmt::Debug> fmt::Debug for SendError<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::Full(m) => write!(f, "SendError::Full({:?})", m)?,
SendError::Closed(m) => write!(f, "SendError::Closed({:?})", m)?,
}
Ok(())
}
}
impl<M: fmt::Debug> fmt::Display for SendError<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::Full(m) => write!(f, "SendError::Full({:?})", m)?,
SendError::Closed(m) => write!(f, "SendError::Closed({:?})", m)?,
}
Ok(())
}
}
impl<M: fmt::Debug> std::error::Error for SendError<M> {}
#[derive(Debug, Clone)]
pub struct ReceiverStats {
pub name: Cow<'static, str>,
pub fields: Vec<(Cow<'static, str>, u64)>,
}
impl fmt::Display for ReceiverStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "-- {}: {{ ", self.name)?;
for (idx, (k, v)) in self.fields.iter().enumerate() {
if idx != 0 {
write!(f, ", ")?;
}
write!(f, "{}: {}", k, v)?;
}
write!(f, " }}")?;
Ok(())
}
}
pub trait TypedReceiver<M: Message>: Sync {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()>;
fn try_send(&self, msg: M) -> Result<(), SendError<M>>;
}
pub trait ReceiverTrait: Send + Sync {
fn typed(&self) -> AnyReceiver<'_>;
fn type_id(&self) -> TypeId;
fn close(&self);
fn stats(&self) -> ReceiverStats;
fn sync(&self);
fn poll_synchronized(&self, ctx: &mut Context<'_>) -> Poll<()>;
}
pub trait ReceiverPollerBuilder {
fn build(bus: Bus) -> Box<dyn Future<Output = ()>>;
}
pub struct Receiver {
inner: Arc<dyn ReceiverTrait>,
}
impl fmt::Debug for Receiver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver({:?})", self.inner.type_id())?;
Ok(())
}
}
impl core::cmp::PartialEq for Receiver {
fn eq(&self, other: &Receiver) -> bool {
self.inner.type_id() == other.inner.type_id()
}
}
impl core::cmp::Eq for Receiver {}
pub struct ReceiverPoller<'a, M: Message> {
inner: &'a dyn ReceiverTrait,
msg: Option<M>,
}
impl<'a, M: Message> Unpin for ReceiverPoller<'a, M> {}
impl<'a, M: Message> Future for ReceiverPoller<'a, M> {
type Output = Result<(), SendError<M>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut msg = if let Some(msg) = this.msg.take() {
msg
} else {
return Poll::Ready(Ok(()));
};
let any_receiver = this.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
loop {
match receiver.poll_ready(ctx) {
Poll::Ready(_) => (),
Poll::Pending => {
this.msg = Some(msg);
return Poll::Pending;
}
}
msg = match receiver.try_send(msg) {
Ok(_) => break Poll::Ready(Ok(())),
Err(SendError::Full(m)) => m,
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}
impl Receiver {
#[inline]
pub(crate) fn new(inner: Arc<dyn ReceiverTrait>) -> Self {
Self { inner }
}
#[inline]
pub fn type_id(&self) -> TypeId {
self.inner.type_id()
}
#[inline]
pub fn broadcast<M: Message>(
&self,
msg: M,
) -> impl Future<Output = Result<(), SendError<M>>> + '_ {
ReceiverPoller {
inner: self.inner.as_ref(),
msg: Some(msg),
}
}
#[inline]
pub fn try_broadcast<M: Message>(&self, msg: M) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
receiver.try_send(msg)
}
#[inline]
pub fn close(&self) {
self.inner.close();
}
#[inline]
pub fn stats(&self) -> ReceiverStats {
self.inner.stats()
}
#[inline]
pub fn sync(&self) -> impl Future<Output = ()> + '_ {
self.inner.sync();
poll_fn(move |ctx| self.inner.poll_synchronized(ctx))
}
}

View File

@ -0,0 +1,203 @@
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
AsyncHandler, Bus, Message, Untyped,
};
pub struct BufferUnorderedAsyncSubscriber<T, M>
where
T: AsyncHandler<M> + 'static,
M: Message,
{
cfg: BufferUnorderedConfig,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for BufferUnorderedAsyncSubscriber<T, M>
where
T: AsyncHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
parallel: AtomicU64::new(0),
parallel_total: AtomicU64::new(cfg.max_parallel as _),
});
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let arc = Arc::new(BufferUnorderedAsync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<BufferUnorderedStats>,
cfg: BufferUnorderedConfig,
) where
T: AsyncHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<T>().unwrap();
let mut x = rx
.map(|msg| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn(async move { ut.handle(msg, &bus).await })
})
.buffer_unordered(cfg.max_parallel);
while let Some(err) = x.next().await {
stats.parallel.fetch_sub(1, Ordering::Relaxed);
match err {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
println!(
"[EXIT] BufferUnorderedAsync<{}>",
std::any::type_name::<M>()
);
}
pub struct BufferUnorderedAsync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<BufferUnorderedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for BufferUnorderedAsync<M>
where
T: AsyncHandler<M> + 'static,
M: Message,
{
type Entry = BufferUnorderedAsyncSubscriber<T, M>;
type Config = BufferUnorderedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
BufferUnorderedAsyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for BufferUnorderedAsync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for BufferUnorderedAsync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<BufferUnorderedAsync<M>>()
}
fn close(&self) {
self.tx.close();
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
(
"parallel".into(),
self.stats.parallel.load(Ordering::SeqCst),
),
(
"parallel_total".into(),
self.stats.parallel_total.load(Ordering::SeqCst),
),
],
}
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,30 @@
mod r#async;
mod sync;
use std::sync::atomic::AtomicU64;
pub use r#async::{BufferUnorderedAsync, BufferUnorderedAsyncSubscriber};
pub use sync::{BufferUnorderedSync, BufferUnorderedSyncSubscriber};
#[derive(Debug)]
pub struct BufferUnorderedStats {
pub buffer: AtomicU64,
pub buffer_total: AtomicU64,
pub parallel: AtomicU64,
pub parallel_total: AtomicU64,
}
#[derive(Copy, Clone, Debug)]
pub struct BufferUnorderedConfig {
pub buffer_size: usize,
pub max_parallel: usize,
}
impl Default for BufferUnorderedConfig {
fn default() -> Self {
Self {
buffer_size: 8,
max_parallel: 8,
}
}
}

View File

@ -0,0 +1,203 @@
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, Handler, Message, Untyped,
};
pub struct BufferUnorderedSyncSubscriber<T, M>
where
T: Handler<M> + 'static,
M: Message,
{
cfg: BufferUnorderedConfig,
_m: PhantomData<(M, T)>,
}
impl<T, M> ReceiverSubscriber<T> for BufferUnorderedSyncSubscriber<T, M>
where
T: Handler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
parallel: AtomicU64::new(0),
parallel_total: AtomicU64::new(cfg.max_parallel as _),
});
let arc = Arc::new(BufferUnorderedSync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<BufferUnorderedStats>,
cfg: BufferUnorderedConfig,
) where
T: Handler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<T>().unwrap();
let mut x = rx
.map(|msg| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn_blocking(move || ut.handle(msg, &bus))
})
.buffer_unordered(cfg.max_parallel);
while let Some(err) = x.next().await {
stats.parallel.fetch_sub(1, Ordering::Relaxed);
match err {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
println!(
"[EXIT] BufferUnorderedAsync<{}>",
std::any::type_name::<M>()
);
}
pub struct BufferUnorderedSync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<BufferUnorderedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for BufferUnorderedSync<M>
where
T: Handler<M> + 'static,
M: Message,
{
type Entry = BufferUnorderedSyncSubscriber<T, M>;
type Config = BufferUnorderedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
BufferUnorderedSyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for BufferUnorderedSync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for BufferUnorderedSync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<BufferUnorderedSync<M>>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
(
"parallel".into(),
self.stats.parallel.load(Ordering::SeqCst),
),
(
"parallel_total".into(),
self.stats.parallel_total.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

23
src/receivers/mod.rs Normal file
View File

@ -0,0 +1,23 @@
mod buffer_unordered;
mod mpsc_futures;
mod synchronize_batched;
mod synchronized;
mod mpsc {
pub use super::mpsc_futures::*;
}
pub use buffer_unordered::{
BufferUnorderedAsync, BufferUnorderedAsyncSubscriber, BufferUnorderedConfig,
BufferUnorderedSync, BufferUnorderedSyncSubscriber,
};
pub use synchronized::{
SynchronizedAsync, SynchronizedAsyncSubscriber, SynchronizedConfig, SynchronizedSync,
SynchronizedSyncSubscriber,
};
pub use synchronize_batched::{
SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
};

142
src/receivers/mpsc.rs Normal file
View File

@ -0,0 +1,142 @@
use futures::{Stream, StreamExt};
use core::pin::Pin;
use crossbeam::queue::ArrayQueue;
use crossbeam::atomic::AtomicCell;
use core::task::{Waker, Context, Poll};
use std::sync::{Arc, atomic::*};
use crate::receiver::SendError;
struct ChannelInner<T> {
queue: ArrayQueue<T>,
send_waker: AtomicCell<Option<Box<Waker>>>,
recv_waker: AtomicCell<Option<Box<Waker>>>,
closed: AtomicBool,
}
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(ChannelInner {
queue: ArrayQueue::new(buffer),
send_waker: AtomicCell::new(None),
recv_waker: AtomicCell::new(None),
closed: AtomicBool::new(false),
});
(
Sender {
inner: inner.clone(),
},
Receiver {
inner,
}
)
}
#[derive(Clone)]
pub struct Sender<T> {
inner: Arc<ChannelInner<T>>
}
impl <T> Sender<T> {
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.inner.closed.load(Ordering::SeqCst) {
return Poll::Ready(());
}
if self.inner.queue.is_full() {
self.inner.send_waker.store(Some(Box::new(cx.waker().clone())));
}
let mut counter = 4;
loop {
if self.inner.queue.is_full() {
if counter > 0 {
counter -= 1;
continue;
} else {
break Poll::Pending;
}
} else {
break Poll::Ready(());
}
}
}
pub fn try_send(&self, mut item: T) -> Result<(), SendError<T>> {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(item));
}
let mut counter = 0;
loop {
match self.inner.queue.push(item) {
Ok(_) => {
if let Some(waker) = self.inner.recv_waker.take() {
waker.wake();
}
break Ok(());
}
Err(inner) => {
if counter >= 4 {
break Err(SendError::Full(inner));
} else {
item = inner;
counter += 1;
}
}
}
}
}
pub fn close(&self) {
self.inner.closed.store(true, Ordering::SeqCst);
if let Some(waker) = self.inner.recv_waker.take() {
waker.wake();
}
}
}
pub struct Receiver<T> {
inner: Arc<ChannelInner<T>>
}
impl <T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut counter = 0;
loop {
match this.inner.queue.pop() {
Some(inner) => {
if let Some(waker) = this.inner.send_waker.take() {
waker.wake();
}
break Poll::Ready(Some(inner));
},
None => {
if this.inner.closed.load(Ordering::SeqCst) {
break Poll::Ready(None);
} else {
if counter == 0 {
this.inner.recv_waker.store(Some(Box::new(cx.waker().clone())));
}
if counter >= 8 {
break Poll::Pending;
} else {
counter += 1;
}
}
}
}
}
}
}

View File

@ -0,0 +1,99 @@
use crate::receiver::SendError;
use core::pin::Pin;
use core::task::{Context, Poll};
use crossbeam::queue::SegQueue;
use futures::{channel::mpsc, Stream};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Waker,
};
pub struct State {
buffer: usize,
counter: AtomicUsize,
send_wakers: SegQueue<Waker>,
}
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
let state = Arc::new(State {
buffer,
counter: AtomicUsize::new(0),
send_wakers: SegQueue::new(),
});
let (tx, rx) = mpsc::unbounded();
(
Sender {
inner: tx,
state: state.clone(),
},
Receiver { inner: rx, state },
)
}
pub struct Sender<T> {
inner: mpsc::UnboundedSender<T>,
state: Arc<State>,
}
impl<T> Sender<T> {
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.state.counter.load(Ordering::SeqCst) >= self.state.buffer {
self.state.send_wakers.push(cx.waker().clone());
return Poll::Pending;
}
Poll::Ready(())
}
pub fn try_send(&self, item: T) -> Result<(), SendError<T>> {
if self.state.counter.load(Ordering::Relaxed) >= self.state.buffer {
return Err(SendError::Full(item));
}
self.state.counter.fetch_add(1, Ordering::SeqCst);
match self.inner.unbounded_send(item) {
Ok(_) => Ok(()),
Err(err) if err.is_full() => Err(SendError::Full(err.into_inner())),
Err(err) => Err(SendError::Closed(err.into_inner())),
}
}
#[inline]
pub fn flush(&self) {}
#[inline]
pub fn close(&self) {
self.inner.close_channel();
}
}
pub struct Receiver<T> {
inner: mpsc::UnboundedReceiver<T>,
state: Arc<State>,
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(inner) => {
let val = this.state.counter.fetch_sub(1, Ordering::SeqCst);
if val <= this.state.buffer {
if let Some(waker) = this.state.send_wakers.pop() {
waker.wake();
}
}
Poll::Ready(inner)
}
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -0,0 +1,220 @@
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use tokio::sync::Mutex;
use super::{SynchronizeBatchedConfig, SynchronizeBatchedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
};
pub struct SynchronizeBatchedAsyncSubscriber<T, M>
where
T: AsyncBatchSynchronizedHandler<M> + 'static,
M: Message,
{
cfg: SynchronizeBatchedConfig,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for SynchronizeBatchedAsyncSubscriber<T, M>
where
T: AsyncBatchSynchronizedHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(SynchronizeBatchedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
batch: AtomicU64::new(0),
batch_size: AtomicU64::new(cfg.batch_size as _),
});
let arc = Arc::new(SynchronizeBatchedAsync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
mut rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<SynchronizeBatchedStats>,
cfg: SynchronizeBatchedConfig,
) where
T: AsyncBatchSynchronizedHandler<M> + 'static,
M: Message,
{
let mut batch = Vec::with_capacity(cfg.batch_size);
let ut = ut.downcast::<Mutex<T>>().unwrap();
while let Some(msg) = rx.next().await {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
batch.push(msg);
stats.batch.fetch_add(1, Ordering::Relaxed);
if batch.len() >= cfg.batch_size {
let bus_clone = bus.clone();
let ut = ut.clone();
let msgs = batch.drain(..).collect::<Vec<_>>();
stats.batch.store(0, Ordering::Relaxed);
let res =
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await })
.await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
}
if !batch.is_empty() {
let ut = ut.clone();
let bus_clone = bus.clone();
stats.batch.store(0, Ordering::Relaxed);
let res =
tokio::task::spawn(async move { ut.lock().await.handle(batch, &bus_clone).await })
.await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn(async move { ut.lock().await.sync(&bus_clone).await }).await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
println!(
"[EXIT] SynchronizeBatchedAsync<{}>",
std::any::type_name::<M>()
);
}
pub struct SynchronizeBatchedAsync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<SynchronizeBatchedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for SynchronizeBatchedAsync<M>
where
T: AsyncBatchSynchronizedHandler<M> + 'static,
M: Message,
{
type Entry = SynchronizeBatchedAsyncSubscriber<T, M>;
type Config = SynchronizeBatchedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
SynchronizeBatchedAsyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for SynchronizeBatchedAsync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for SynchronizeBatchedAsync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<Self>()
}
fn close(&self) {
self.tx.close();
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
(
"batch_size".into(),
self.stats.batch_size.load(Ordering::SeqCst),
),
],
}
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,31 @@
mod r#async;
mod sync;
use std::sync::atomic::AtomicU64;
pub use sync::{SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber};
pub use r#async::{SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber};
#[derive(Debug)]
pub struct SynchronizeBatchedStats {
pub buffer: AtomicU64,
pub buffer_total: AtomicU64,
pub batch: AtomicU64,
pub batch_size: AtomicU64,
}
#[derive(Copy, Clone, Debug)]
pub struct SynchronizeBatchedConfig {
pub buffer_size: usize,
pub batch_size: usize,
}
impl Default for SynchronizeBatchedConfig {
fn default() -> Self {
Self {
buffer_size: 4,
batch_size: 16,
}
}
}

View File

@ -0,0 +1,223 @@
use super::{SynchronizeBatchedConfig, SynchronizeBatchedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
BatchSynchronizedHandler, Bus, Message, Untyped,
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::Mutex;
pub struct SynchronizeBatchedSyncSubscriber<T, M>
where
T: BatchSynchronizedHandler<M> + 'static,
M: Message,
{
cfg: SynchronizeBatchedConfig,
_m: PhantomData<(M, T)>,
}
impl<T, M> ReceiverSubscriber<T> for SynchronizeBatchedSyncSubscriber<T, M>
where
T: BatchSynchronizedHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(SynchronizeBatchedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
batch: AtomicU64::new(0),
batch_size: AtomicU64::new(cfg.batch_size as _),
});
let arc = Arc::new(SynchronizeBatchedSync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
mut rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<SynchronizeBatchedStats>,
cfg: SynchronizeBatchedConfig,
) where
T: BatchSynchronizedHandler<M> + 'static,
M: Message,
{
let mut batch = Vec::with_capacity(cfg.batch_size);
let ut = ut.downcast::<Mutex<T>>().unwrap();
while let Some(msg) = rx.next().await {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
batch.push(msg);
stats.batch.fetch_add(1, Ordering::Relaxed);
if batch.len() >= cfg.batch_size {
let ut = ut.clone();
let bus_clone = bus.clone();
let msgs = batch.drain(..).collect::<Vec<_>>();
stats.batch.store(0, Ordering::Relaxed);
let res = tokio::task::spawn_blocking(move || {
let mut uut = futures::executor::block_on(ut.lock());
uut.handle(msgs, &bus_clone)
})
.await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
}
if !batch.is_empty() {
let ut = ut.clone();
let bus_clone = bus.clone();
stats.batch.store(0, Ordering::Relaxed);
let res = tokio::task::spawn_blocking(move || {
futures::executor::block_on(ut.lock()).handle(batch, &bus_clone)
})
.await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || {
futures::executor::block_on(ut.lock()).sync(&bus_clone)
})
.await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
println!(
"[EXIT] SynchronizeBatchedSync<{}>",
std::any::type_name::<M>()
);
}
pub struct SynchronizeBatchedSync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<SynchronizeBatchedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for SynchronizeBatchedSync<M>
where
T: BatchSynchronizedHandler<M> + 'static,
M: Message,
{
type Entry = SynchronizeBatchedSyncSubscriber<T, M>;
type Config = SynchronizeBatchedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
SynchronizeBatchedSyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for SynchronizeBatchedSync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for SynchronizeBatchedSync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<Self>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
(
"batch_size".into(),
self.stats.batch_size.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,186 @@
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use tokio::sync::Mutex;
use super::{SynchronizedConfig, SynchronizedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
AsyncSynchronizedHandler, Bus, Message, Untyped,
};
pub struct SynchronizedAsyncSubscriber<T, M>
where
T: AsyncSynchronizedHandler<M> + 'static,
M: Message,
{
cfg: SynchronizedConfig,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for SynchronizedAsyncSubscriber<T, M>
where
T: AsyncSynchronizedHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(SynchronizedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
});
let arc = Arc::new(SynchronizedAsync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<SynchronizedStats>,
_cfg: SynchronizedConfig,
) where
T: AsyncSynchronizedHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut x = rx.then(|msg| {
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn(async move { ut.lock().await.handle(msg, &bus).await })
});
while let Some(err) = x.next().await {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
match err {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn(async move { ut.lock().await.sync(&bus_clone).await }).await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
println!("[EXIT] SynchronizedAsync<{}>", std::any::type_name::<M>());
}
pub struct SynchronizedAsync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<SynchronizedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for SynchronizedAsync<M>
where
T: AsyncSynchronizedHandler<M> + 'static,
M: Message,
{
type Entry = SynchronizedAsyncSubscriber<T, M>;
type Config = SynchronizedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
SynchronizedAsyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for SynchronizedAsync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for SynchronizedAsync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<SynchronizedAsync<M>>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,25 @@
mod r#async;
mod sync;
use std::sync::atomic::AtomicU64;
pub use sync::{SynchronizedSync, SynchronizedSyncSubscriber};
pub use r#async::{SynchronizedAsync, SynchronizedAsyncSubscriber};
#[derive(Debug)]
pub struct SynchronizedStats {
pub buffer: AtomicU64,
pub buffer_total: AtomicU64,
}
#[derive(Copy, Clone, Debug)]
pub struct SynchronizedConfig {
pub buffer_size: usize,
}
impl Default for SynchronizedConfig {
fn default() -> Self {
Self { buffer_size: 1 }
}
}

View File

@ -0,0 +1,190 @@
use super::{SynchronizedConfig, SynchronizedStats};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{executor::block_on, Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::Mutex;
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, Message, SynchronizedHandler, Untyped,
};
pub struct SynchronizedSyncSubscriber<T, M>
where
T: SynchronizedHandler<M> + 'static,
M: Message,
{
cfg: SynchronizedConfig,
_m: PhantomData<(M, T)>,
}
impl<T, M> ReceiverSubscriber<T> for SynchronizedSyncSubscriber<T, M>
where
T: SynchronizedHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(SynchronizedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
});
let arc = Arc::new(SynchronizedSync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<SynchronizedStats>,
_cfg: SynchronizedConfig,
) where
T: SynchronizedHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut x = rx.then(|msg| {
let ut = ut.clone();
let bus = bus.clone();
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus))
});
while let Some(err) = x.next().await {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
match err {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || {
futures::executor::block_on(ut.lock()).sync(&bus_clone)
})
.await;
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
}
_ => (),
}
println!(
"[EXIT] BufferUnorderedAsync<{}>",
std::any::type_name::<M>()
);
}
pub struct SynchronizedSync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<SynchronizedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for SynchronizedSync<M>
where
T: SynchronizedHandler<M> + 'static,
M: Message,
{
type Entry = SynchronizedSyncSubscriber<T, M>;
type Config = SynchronizedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
SynchronizedSyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for SynchronizedSync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for SynchronizedSync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<SynchronizedSync<M>>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

6
src/trait_object.rs Normal file
View File

@ -0,0 +1,6 @@
#[derive(Debug, Copy, Clone)]
#[repr(C)]
pub struct TraitObject {
pub data: *mut (),
pub vtable: *mut (),
}

33
src/utils.rs Normal file
View File

@ -0,0 +1,33 @@
use core::cmp::{Ord, Ordering};
use core::ops::Range;
pub fn binary_search_range_by_key<'a, T, B, F>(data: &'a [T], item: &B, mut f: F) -> Range<usize>
where
F: FnMut(&'a T) -> B,
B: Ord,
{
if let Ok(index) = data.binary_search_by_key(item, &mut f) {
let mut begin = index;
let mut end = index + 1;
for i in (0..index).rev() {
if f(unsafe { data.get_unchecked(i) }).cmp(item) != Ordering::Equal {
break;
}
begin = i;
}
for i in end..data.len() {
end = i;
if f(unsafe { data.get_unchecked(i) }).cmp(item) != Ordering::Equal {
break;
}
}
begin..end
} else {
data.len()..data.len()
}
}