Local Handlers

This commit is contained in:
Andrey Tkachenko 2021-01-15 13:52:50 +04:00
parent d36b9b6cdc
commit 2d35840044
23 changed files with 1166 additions and 106 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.4.6"
version = "0.5.0"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]
@ -11,12 +11,12 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
[dependencies]
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "blocking"] }
parking_lot = "0.11.1"
async-trait = "0.1.42"
futures = "0.3.8"
anyhow = "1.0.34"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
parking_lot = "0.11"
async-trait = "0.1"
futures = "0.3"
anyhow = "1.0"
crossbeam = "0.8.0"
[dev-dependencies]
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "macros"] }
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "macros"] }

41
examples/demo_local.rs Normal file
View File

@ -0,0 +1,41 @@
use std::rc::Rc;
use std::cell::Cell;
use async_trait::async_trait;
use messagebus::{receivers, Bus, LocalHandler, LocalAsyncHandler, Result as MbusResult};
struct TmpReceiver {
_inner: Rc<Cell<i32>>
}
#[async_trait(?Send)]
impl LocalAsyncHandler<f32> for TmpReceiver {
async fn handle(&mut self, msg: f32, bus: &Bus) -> MbusResult {
println!("---> f32 {}", msg);
bus.send(11u16).await.unwrap();
Ok(())
}
}
impl LocalHandler<u16> for TmpReceiver {
fn handle(&mut self, msg: u16, _bus: &Bus) -> MbusResult {
println!("---> u16 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register_local(||TmpReceiver {_inner: Rc::new(Cell::new(12))})
.subscribe::<f32, receivers::LocalAsync<_>>(Default::default())
.subscribe::<u16, receivers::LocalSync<_>>(Default::default())
.done()
.build();
b.send(32f32).await.unwrap();
poller.await
}

View File

@ -2,7 +2,6 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync
use futures::{Future, FutureExt};
use receiver::ReceiverTrait;
use tokio::sync::Mutex;
use crate::{
receiver::{self, Receiver},
@ -30,6 +29,8 @@ pub trait ReceiverSubscriberBuilder<M, T: 'static> {
pub struct SyncEntry;
pub struct UnsyncEntry;
pub struct LocalEntry;
#[must_use]
pub struct RegisterEntry<K, T> {
item: Untyped,
@ -65,6 +66,27 @@ impl<K, T: 'static> RegisterEntry<K, T> {
}
}
impl<T: 'static> RegisterEntry<LocalEntry, T> {
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
where
T: 'static,
M: Message + 'static,
R: ReceiverSubscriberBuilder<M, T> + 'static,
{
let (inner, poller) = R::build(cfg).subscribe();
let receiver = Receiver::new(inner);
self.receivers
.entry(TypeId::of::<M>())
.or_insert_with(Vec::new)
.push((receiver, poller));
self
}
}
impl<T: Send + 'static> RegisterEntry<UnsyncEntry, T> {
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
where
@ -118,7 +140,7 @@ impl BusBuilder {
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T> {
RegisterEntry {
item: Arc::new(item) as Untyped,
item: Untyped::new_rwlock(item),
builder: self,
receivers: HashMap::new(),
_m: Default::default(),
@ -127,7 +149,19 @@ impl BusBuilder {
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T> {
RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped,
item: Untyped::new_mutex(item),
builder: self,
receivers: HashMap::new(),
_m: Default::default(),
}
}
pub fn register_local<T: 'static>(
self,
item: impl FnOnce() -> T + Send + 'static,
) -> RegisterEntry<LocalEntry, T> {
RegisterEntry {
item: Untyped::new_local(item),
builder: self,
receivers: HashMap::new(),
_m: Default::default(),

View File

@ -62,15 +62,15 @@ pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
}
pub trait LocalHandler<M: Message> {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait]
#[async_trait(?Send)]
pub trait LocalAsyncHandler<M: Message> {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
Ok(())
}
@ -83,7 +83,7 @@ pub trait LocalBatchHandler<M: Message> {
}
}
#[async_trait]
#[async_trait(?Send)]
pub trait LocalAsyncBatchHandler<M: Message> {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {

View File

@ -5,6 +5,7 @@ pub mod msgs;
mod receiver;
pub mod receivers;
mod trait_object;
mod untyped;
mod utils;
use builder::BusBuilder;
@ -14,13 +15,13 @@ pub use receiver::SendError;
use receiver::{Receiver, ReceiverStats};
use utils::binary_search_range_by_key;
use core::any::{Any, TypeId};
use core::any::TypeId;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
pub type Untyped = Arc<dyn Any + Send + Sync>;
pub use untyped::Untyped;
pub type Result = anyhow::Result<()>;
pub struct BusInner {

View File

@ -77,7 +77,7 @@ async fn buffer_unordered_poller<T, M>(
T: AsyncHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<T>().unwrap();
let ut = ut.downcast_sync::<T>().unwrap();
let mut x = rx
.map(|msg| {
@ -86,7 +86,9 @@ async fn buffer_unordered_poller<T, M>(
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn(async move { ut.handle(msg, &bus).await })
tokio::task::spawn(
async move { ut.lock_read().await.get_ref().handle(msg, &bus).await },
)
})
.buffer_unordered(cfg.max_parallel);
@ -103,7 +105,9 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
let res =
tokio::task::spawn(async move { ut.lock_read().await.get_ref().sync(&bus_clone).await })
.await;
match res {
Ok(Err(err)) => {

View File

@ -1,5 +1,5 @@
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use futures::{executor::block_on, Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
@ -76,7 +76,7 @@ async fn buffer_unordered_poller<T, M>(
T: Handler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<T>().unwrap();
let ut = ut.downcast_sync::<T>().unwrap();
let mut x = rx
.map(|msg| {
@ -86,7 +86,9 @@ async fn buffer_unordered_poller<T, M>(
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn_blocking(move || ut.handle(msg, &bus))
tokio::task::spawn_blocking(move || {
block_on(ut.lock_read()).get_ref().handle(msg, &bus)
})
})
.buffer_unordered(cfg.max_parallel);
@ -103,7 +105,9 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
let res =
tokio::task::spawn_blocking(move || block_on(ut.lock_read()).get_ref().sync(&bus_clone))
.await;
match res {
Ok(Err(err)) => {
@ -112,10 +116,7 @@ async fn buffer_unordered_poller<T, M>(
_ => (),
}
println!(
"[EXIT] BufferUnorderedSync<{}>",
std::any::type_name::<M>()
);
println!("[EXIT] BufferUnorderedSync<{}>", std::any::type_name::<M>());
}
pub struct BufferUnorderedSync<M: Message> {

View File

@ -79,19 +79,16 @@ async fn buffer_unordered_poller<T, M>(
T: AsyncBatchHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<T>().unwrap();
let rx = rx
.inspect(|_| {
let ut = ut.downcast_sync::<T>().unwrap();
let rx = rx.inspect(|_| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.batch.fetch_add(1, Ordering::Relaxed);
});
let rx = if cfg.when_ready {
rx.ready_chunks(cfg.batch_size)
.left_stream()
rx.ready_chunks(cfg.batch_size).left_stream()
} else {
rx.chunks(cfg.batch_size)
.right_stream()
rx.chunks(cfg.batch_size).right_stream()
};
let mut rx = rx
@ -102,7 +99,13 @@ async fn buffer_unordered_poller<T, M>(
let bus_clone = bus.clone();
let ut = ut.clone();
tokio::task::spawn(async move { ut.handle(msgs, &bus_clone).await })
tokio::task::spawn(async move {
ut.lock_read()
.await
.get_ref()
.handle(msgs, &bus_clone)
.await
})
})
.buffer_unordered(cfg.max_parallel);
@ -119,7 +122,9 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
let res =
tokio::task::spawn(async move { ut.lock_read().await.get_ref().sync(&bus_clone).await })
.await;
match res {
Ok(Err(err)) => {

View File

@ -1,5 +1,5 @@
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use futures::{executor::block_on, Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
@ -16,7 +16,7 @@ use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, BatchHandler, Message, Untyped,
BatchHandler, Bus, Message, Untyped,
};
pub struct BufferUnorderedBatchedSyncSubscriber<T, M>
@ -78,19 +78,16 @@ async fn buffer_unordered_poller<T, M>(
T: BatchHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<T>().unwrap();
let rx = rx
.inspect(|_| {
let ut = ut.downcast_sync::<T>().unwrap();
let rx = rx.inspect(|_| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.batch.fetch_add(1, Ordering::Relaxed);
});
let rx = if cfg.when_ready {
rx.ready_chunks(cfg.batch_size)
.left_stream()
rx.ready_chunks(cfg.batch_size).left_stream()
} else {
rx.chunks(cfg.batch_size)
.right_stream()
rx.chunks(cfg.batch_size).right_stream()
};
let mut rx = rx
@ -101,7 +98,9 @@ async fn buffer_unordered_poller<T, M>(
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn_blocking(move || ut.handle(msgs, &bus))
tokio::task::spawn_blocking(move || {
block_on(ut.lock_read()).get_ref().handle(msgs, &bus)
})
})
.buffer_unordered(cfg.max_parallel);
@ -118,7 +117,9 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
let res =
tokio::task::spawn_blocking(move || block_on(ut.lock_read()).get_ref().sync(&bus_clone))
.await;
match res {
Ok(Err(err)) => {

View File

@ -0,0 +1,180 @@
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{pin_mut, Future, StreamExt};
use super::{LocalConfig, LocalStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, LocalAsyncHandler, Message, Untyped,
};
pub struct LocalAsyncSubscriber<T, M>
where
T: LocalAsyncHandler<M> + 'static,
M: Message,
{
cfg: LocalConfig,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for LocalAsyncSubscriber<T, M>
where
T: LocalAsyncHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(LocalStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
});
let arc = Arc::new(LocalAsync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<LocalStats>,
_cfg: LocalConfig,
) where
T: LocalAsyncHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast_local::<T>().unwrap();
let bus1 = bus.clone();
let x = rx.then(|msg| {
let bus1 = bus1.clone();
ut.spawn_local(move |item| {
Box::pin(async move {
let _ = item.handle(msg, &bus1).await;
})
})
});
pin_mut!(x);
while let Some(_) = x.next().await {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
}
let bus_clone = bus.clone();
ut.spawn_local(move |item| {
Box::pin(async move {
let _ = item.sync(&bus_clone).await;
})
})
.await;
println!("[EXIT] LocalAsync<{}>", std::any::type_name::<M>());
}
pub struct LocalAsync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<LocalStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalAsync<M>
where
T: LocalAsyncHandler<M> + 'static,
M: Message,
{
type Entry = LocalAsyncSubscriber<T, M>;
type Config = LocalConfig;
fn build(cfg: Self::Config) -> Self::Entry {
LocalAsyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for LocalAsync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for LocalAsync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<LocalAsync<M>>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,25 @@
mod r#async;
mod sync;
use std::sync::atomic::AtomicU64;
pub use sync::{LocalSync, LocalSyncSubscriber};
pub use r#async::{LocalAsync, LocalAsyncSubscriber};
#[derive(Debug)]
pub struct LocalStats {
pub buffer: AtomicU64,
pub buffer_total: AtomicU64,
}
#[derive(Copy, Clone, Debug)]
pub struct LocalConfig {
pub buffer_size: usize,
}
impl Default for LocalConfig {
fn default() -> Self {
Self { buffer_size: 1 }
}
}

177
src/receivers/local/sync.rs Normal file
View File

@ -0,0 +1,177 @@
use super::{LocalConfig, LocalStats};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{pin_mut, Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, LocalHandler, Message, Untyped,
};
pub struct LocalSyncSubscriber<T, M>
where
T: LocalHandler<M> + 'static,
M: Message,
{
cfg: LocalConfig,
_m: PhantomData<(M, T)>,
}
impl<T, M> ReceiverSubscriber<T> for LocalSyncSubscriber<T, M>
where
T: LocalHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(LocalStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
});
let arc = Arc::new(LocalSync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<LocalStats>,
_cfg: LocalConfig,
) where
T: LocalHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast_local::<T>().unwrap();
let bus1 = bus.clone();
let x = rx.then(|msg| {
let bus1 = bus1.clone();
ut.spawn_local(move |item| {
let _ = item.handle(msg, &bus1);
Box::pin(async move {})
})
});
pin_mut!(x);
while let Some(_) = x.next().await {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
}
let bus_clone = bus.clone();
ut.spawn_local(move |item| {
let _ = item.sync(&bus_clone);
Box::pin(async move {})
})
.await;
println!("[EXIT] LocalAsync<{}>", std::any::type_name::<M>());
}
pub struct LocalSync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<LocalStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalSync<M>
where
T: LocalHandler<M> + 'static,
M: Message,
{
type Entry = LocalSyncSubscriber<T, M>;
type Config = LocalConfig;
fn build(cfg: Self::Config) -> Self::Entry {
LocalSyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for LocalSync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for LocalSync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<LocalSync<M>>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,196 @@
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use super::{LocalBatchedConfig, LocalBatchedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, LocalAsyncBatchHandler, Message, Untyped,
};
pub struct LocalBatchedAsyncSubscriber<T, M>
where
T: LocalAsyncBatchHandler<M> + 'static,
M: Message,
{
cfg: LocalBatchedConfig,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for LocalBatchedAsyncSubscriber<T, M>
where
T: LocalAsyncBatchHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(LocalBatchedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
batch: AtomicU64::new(0),
batch_size: AtomicU64::new(cfg.batch_size as _),
});
let arc = Arc::new(LocalBatchedAsync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<LocalBatchedStats>,
cfg: LocalBatchedConfig,
) where
T: LocalAsyncBatchHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast_local::<T>().unwrap();
let rx = rx.inspect(|_| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.batch.fetch_add(1, Ordering::Relaxed);
});
let mut rx = if cfg.when_ready {
rx.ready_chunks(cfg.batch_size).left_stream()
} else {
rx.chunks(cfg.batch_size).right_stream()
};
while let Some(msgs) = rx.next().await {
stats.batch.fetch_sub(msgs.len() as _, Ordering::Relaxed);
let bus_clone = bus.clone();
let ut = ut.clone();
ut.spawn_local(move |item| {
Box::pin(async move {
let _ = item.handle(msgs, &bus_clone).await;
})
})
.await;
}
let bus_clone = bus.clone();
ut.spawn_local(move |item| {
Box::pin(async move {
let _ = item.sync(&bus_clone).await;
})
})
.await;
println!("[EXIT] LocalBatchedAsync<{}>", std::any::type_name::<M>());
}
pub struct LocalBatchedAsync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<LocalBatchedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalBatchedAsync<M>
where
T: LocalAsyncBatchHandler<M> + 'static,
M: Message,
{
type Entry = LocalBatchedAsyncSubscriber<T, M>;
type Config = LocalBatchedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
LocalBatchedAsyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for LocalBatchedAsync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for LocalBatchedAsync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<Self>()
}
fn close(&self) {
self.tx.close();
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
(
"batch_size".into(),
self.stats.batch_size.load(Ordering::SeqCst),
),
],
}
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -0,0 +1,33 @@
mod r#async;
mod sync;
use std::sync::atomic::AtomicU64;
pub use sync::{LocalBatchedSync, LocalBatchedSyncSubscriber};
pub use r#async::{LocalBatchedAsync, LocalBatchedAsyncSubscriber};
#[derive(Debug)]
pub struct LocalBatchedStats {
pub buffer: AtomicU64,
pub buffer_total: AtomicU64,
pub batch: AtomicU64,
pub batch_size: AtomicU64,
}
#[derive(Copy, Clone, Debug)]
pub struct LocalBatchedConfig {
pub buffer_size: usize,
pub batch_size: usize,
pub when_ready: bool,
}
impl Default for LocalBatchedConfig {
fn default() -> Self {
Self {
buffer_size: 4,
batch_size: 16,
when_ready: false,
}
}
}

View File

@ -0,0 +1,193 @@
use super::{LocalBatchedConfig, LocalBatchedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, LocalBatchHandler, Message, Untyped,
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
pub struct LocalBatchedSyncSubscriber<T, M>
where
T: LocalBatchHandler<M> + 'static,
M: Message,
{
cfg: LocalBatchedConfig,
_m: PhantomData<(M, T)>,
}
impl<T, M> ReceiverSubscriber<T> for LocalBatchedSyncSubscriber<T, M>
where
T: LocalBatchHandler<M> + 'static,
M: Message,
{
fn subscribe(
self,
) -> (
Arc<dyn ReceiverTrait>,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(LocalBatchedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
batch: AtomicU64::new(0),
batch_size: AtomicU64::new(cfg.batch_size as _),
});
let arc = Arc::new(LocalBatchedSync::<M> {
tx,
stats: stats.clone(),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(arc, poller)
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
bus: Bus,
ut: Untyped,
stats: Arc<LocalBatchedStats>,
cfg: LocalBatchedConfig,
) where
T: LocalBatchHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast_local::<T>().unwrap();
let rx = rx.inspect(|_| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.batch.fetch_add(1, Ordering::Relaxed);
});
let mut rx = if cfg.when_ready {
rx.ready_chunks(cfg.batch_size).left_stream()
} else {
rx.chunks(cfg.batch_size).right_stream()
};
while let Some(msgs) = rx.next().await {
stats.batch.fetch_sub(msgs.len() as _, Ordering::Relaxed);
let bus_clone = bus.clone();
let ut = ut.clone();
ut.spawn_local(move |item| {
let _ = item.handle(msgs, &bus_clone);
Box::pin(async move {})
})
.await;
}
let ut = ut.clone();
let bus_clone = bus.clone();
ut.spawn_local(move |item| {
let _ = item.sync(&bus_clone);
Box::pin(async move {})
})
.await;
println!("[EXIT] LocalBatchedSync<{}>", std::any::type_name::<M>());
}
pub struct LocalBatchedSync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<LocalBatchedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalBatchedSync<M>
where
T: LocalBatchHandler<M> + 'static,
M: Message,
{
type Entry = LocalBatchedSyncSubscriber<T, M>;
type Config = LocalBatchedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
LocalBatchedSyncSubscriber {
cfg,
_m: Default::default(),
}
}
}
impl<M: Message> TypedReceiver<M> for LocalBatchedSync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
}
}
}
impl<M: Message> ReceiverTrait for LocalBatchedSync<M> {
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<Self>()
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
(
"batch_size".into(),
self.stats.batch_size.load(Ordering::SeqCst),
),
],
}
}
fn close(&self) {
self.tx.close();
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

View File

@ -1,5 +1,7 @@
mod buffer_unordered;
mod buffer_unordered_batched;
mod local;
mod local_batched;
mod mpsc_futures;
mod synchronize_batched;
mod synchronized;
@ -14,8 +16,8 @@ pub use buffer_unordered::{
};
pub use buffer_unordered_batched::{
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig,
BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber,
BufferUnorderedBatchedConfig, BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
};
pub use synchronized::{
@ -27,3 +29,10 @@ pub use synchronize_batched::{
SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
};
pub use local::{LocalAsync, LocalAsyncSubscriber, LocalConfig, LocalSync, LocalSyncSubscriber};
pub use local_batched::{
LocalBatchedAsync, LocalBatchedAsyncSubscriber, LocalBatchedConfig, LocalBatchedSync,
LocalBatchedSyncSubscriber,
};

View File

@ -99,7 +99,7 @@ impl<T> Stream for Receiver<T> {
}
Poll::Pending
},
}
}
}
}

View File

@ -11,7 +11,6 @@ use std::{
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use tokio::sync::Mutex;
use super::{SynchronizeBatchedConfig, SynchronizeBatchedStats};
use crate::{
@ -78,20 +77,17 @@ async fn buffer_unordered_poller<T, M>(
T: AsyncBatchSynchronizedHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let ut = ut.downcast_send::<T>().unwrap();
let rx = rx
.inspect(|_|{
let rx = rx.inspect(|_| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.batch.fetch_add(1, Ordering::Relaxed);
});
let mut rx = if cfg.when_ready {
rx.ready_chunks(cfg.batch_size)
.left_stream()
rx.ready_chunks(cfg.batch_size).left_stream()
} else {
rx.chunks(cfg.batch_size)
.right_stream()
rx.chunks(cfg.batch_size).right_stream()
};
while let Some(msgs) = rx.next().await {
@ -101,8 +97,7 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let res =
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await })
.await;
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await }).await;
match res {
Ok(Err(err)) => {

View File

@ -6,7 +6,7 @@ use crate::{
BatchSynchronizedHandler, Bus, Message, Untyped,
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use futures::{executor::block_on, Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
@ -17,7 +17,6 @@ use std::{
},
task::{Context, Poll},
};
use tokio::sync::Mutex;
pub struct SynchronizeBatchedSyncSubscriber<T, M>
where
@ -75,20 +74,17 @@ async fn buffer_unordered_poller<T, M>(
T: BatchSynchronizedHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let ut = ut.downcast_send::<T>().unwrap();
let rx = rx
.inspect(|_|{
let rx = rx.inspect(|_| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.batch.fetch_add(1, Ordering::Relaxed);
});
let mut rx = if cfg.when_ready {
rx.ready_chunks(cfg.batch_size)
.left_stream()
rx.ready_chunks(cfg.batch_size).left_stream()
} else {
rx.chunks(cfg.batch_size)
.right_stream()
rx.chunks(cfg.batch_size).right_stream()
};
while let Some(msgs) = rx.next().await {
@ -98,10 +94,10 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let res = tokio::task::spawn_blocking(move || {
let mut uut = futures::executor::block_on(ut.lock());
let mut uut = block_on(ut.lock());
uut.handle(msgs, &bus_clone)
}).await;
})
.await;
match res {
Ok(Err(err)) => {
@ -113,10 +109,7 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || {
futures::executor::block_on(ut.lock()).sync(&bus_clone)
})
.await;
let res = tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await;
match res {
Ok(Err(err)) => {

View File

@ -11,7 +11,6 @@ use std::{
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use tokio::sync::Mutex;
use super::{SynchronizedConfig, SynchronizedStats};
use crate::{
@ -76,10 +75,13 @@ async fn buffer_unordered_poller<T, M>(
T: AsyncSynchronizedHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut x = rx.then(|msg| {
let bus = bus.clone();
let ut = ut.clone();
let ut = ut.downcast_send::<T>().unwrap();
let ut1 = ut.clone();
let bus1 = bus.clone();
let mut x = rx.then(move |msg| {
let bus = bus1.clone();
let ut = ut1.clone();
tokio::task::spawn(async move { ut.lock().await.handle(msg, &bus).await })
});

View File

@ -11,7 +11,6 @@ use std::{
},
task::{Context, Poll},
};
use tokio::sync::Mutex;
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
@ -74,12 +73,15 @@ async fn buffer_unordered_poller<T, M>(
T: SynchronizedHandler<M> + 'static,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let ut = ut.downcast_send::<T>().unwrap();
let mut x = rx.then(|msg| {
let ut = ut.clone();
let bus = bus.clone();
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus))
tokio::task::spawn_blocking(move || {
let mut uut = block_on(ut.lock());
uut.handle(msg, &bus)
})
});
while let Some(err) = x.next().await {
@ -95,10 +97,7 @@ async fn buffer_unordered_poller<T, M>(
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || {
futures::executor::block_on(ut.lock()).sync(&bus_clone)
})
.await;
let res = tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await;
match res {
Ok(Err(err)) => {
@ -107,10 +106,7 @@ async fn buffer_unordered_poller<T, M>(
_ => (),
}
println!(
"[EXIT] BufferUnorderedSync<{}>",
std::any::type_name::<M>()
);
println!("[EXIT] BufferUnorderedSync<{}>", std::any::type_name::<M>());
}
pub struct SynchronizedSync<M: Message> {

174
src/untyped.rs Normal file
View File

@ -0,0 +1,174 @@
use core::any::Any;
use core::future::Future;
use core::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Notify;
use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
#[derive(Debug)]
pub enum Lock<'a, T> {
ReadOnly(&'a T),
RwRead(RwLockReadGuard<'a, T>),
RwWrite(RwLockWriteGuard<'a, T>),
WriteOnly(MutexGuard<'a, T>),
}
impl<'a, T> Lock<'a, T> {
pub fn get_ref(&self) -> &T {
match self {
Lock::ReadOnly(inner) => &inner,
Lock::RwRead(inner) => &inner,
Lock::RwWrite(inner) => &inner,
Lock::WriteOnly(inner) => &inner,
}
}
pub fn get_mut(&mut self) -> &mut T {
match self {
Lock::ReadOnly(_) => panic!("!!"),
Lock::RwRead(_) => panic!("!!"),
Lock::RwWrite(inner) => &mut *inner,
Lock::WriteOnly(inner) => &mut *inner,
}
}
}
pub enum Downcasted<T> {
ReadOnly(Arc<T>),
ReadWrite(Arc<RwLock<T>>),
WriteOnly(Arc<Mutex<T>>),
}
impl<T> Clone for Downcasted<T> {
fn clone(&self) -> Self {
match self {
Downcasted::ReadOnly(inner) => Downcasted::ReadOnly(inner.clone()),
Downcasted::ReadWrite(inner) => Downcasted::ReadWrite(inner.clone()),
Downcasted::WriteOnly(inner) => Downcasted::WriteOnly(inner.clone()),
}
}
}
impl<T: 'static> Downcasted<T> {
pub async fn lock_read(&self) -> Lock<'_, T> {
match self {
Downcasted::ReadOnly(inner) => Lock::ReadOnly(&inner),
Downcasted::ReadWrite(inner) => Lock::RwRead(inner.read().await),
Downcasted::WriteOnly(inner) => Lock::WriteOnly(inner.lock().await),
}
}
pub async fn lock_write(&self) -> Lock<'_, T> {
match self {
Downcasted::ReadOnly(_) => unimplemented!(),
Downcasted::ReadWrite(inner) => Lock::RwWrite(inner.write().await),
Downcasted::WriteOnly(inner) => Lock::WriteOnly(inner.lock().await),
}
}
}
#[derive(Clone)]
pub struct Untyped {
inner: Arc<dyn Any + Send + Sync>,
}
impl Untyped {
pub fn new_readonly<T: Send + Sync + 'static>(item: T) -> Self {
Self {
inner: Arc::new(item),
}
}
pub fn new_rwlock<T: Send + Sync + 'static>(item: T) -> Self {
Self {
inner: Arc::new(RwLock::new(item)),
}
}
pub fn new_mutex<T: Send + 'static>(item: T) -> Self {
Self {
inner: Arc::new(Mutex::new(item)),
}
}
pub fn new_local<T: 'static, F: FnOnce() -> T + Send + 'static>(f: F) -> Self {
Self {
inner: Arc::new(ThreadDedicated::new(f)),
}
}
pub fn downcast_sync<T: Send + Sync + 'static>(self) -> Option<Downcasted<T>> {
let item = match self.inner.clone().downcast::<RwLock<T>>() {
Ok(inner) => Downcasted::ReadWrite(inner),
Err(_) => return None,
};
Some(item)
}
pub fn downcast_send1<T: Send + 'static>(self) -> Option<Downcasted<T>> {
let item = match self.inner.clone().downcast::<Mutex<T>>() {
Ok(inner) => Downcasted::WriteOnly(inner),
Err(_) => return None,
};
Some(item)
}
pub fn downcast_send<T: Send + 'static>(self) -> Option<Arc<Mutex<T>>> {
self.inner.clone().downcast::<Mutex<T>>().ok()
}
#[inline]
pub fn downcast_local<T: 'static>(self) -> Option<Arc<ThreadDedicated<T>>> {
self.inner.clone().downcast::<ThreadDedicated<T>>().ok()
}
}
pub struct ThreadDedicated<T: 'static> {
sender: mpsc::Sender<
Box<dyn for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send>,
>,
notify: Arc<Notify>,
}
impl<T: 'static> ThreadDedicated<T> {
pub fn new<F: FnOnce() -> T + Send + 'static>(builder: F) -> Self {
let notify = Arc::new(Notify::new());
let (sender, mut receiver) = mpsc::channel(1);
let sender: mpsc::Sender<
Box<dyn for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send>,
> = sender;
let notify_clone = notify.clone();
std::thread::spawn(move || {
futures::executor::block_on(async move {
let mut obj = builder();
loop {
let cb = match receiver.recv().await {
Some(x) => x,
None => break,
};
cb(&mut obj).await;
notify_clone.notify_one();
}
});
});
Self { sender, notify }
}
pub async fn spawn_local<
F: for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send + 'static,
>(
&self,
cb: F,
) {
self.sender.send(Box::new(cb)).await.ok().unwrap();
self.notify.notified().await
}
}