Compare commits
3 Commits
master
...
9c91d81e89
Author | SHA1 | Date | |
---|---|---|---|
|
9c91d81e89 | ||
|
1bf82e4cc4 | ||
|
2d35840044 |
14
Cargo.toml
14
Cargo.toml
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "messagebus"
|
||||
version = "0.4.6"
|
||||
version = "0.5.2"
|
||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||
@ -11,12 +11,12 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "blocking"] }
|
||||
parking_lot = "0.11.1"
|
||||
async-trait = "0.1.42"
|
||||
futures = "0.3.8"
|
||||
anyhow = "1.0.34"
|
||||
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
|
||||
parking_lot = "0.11"
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
anyhow = "1.0"
|
||||
crossbeam = "0.8.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "macros"] }
|
||||
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "macros"] }
|
||||
|
41
examples/demo_local.rs
Normal file
41
examples/demo_local.rs
Normal file
@ -0,0 +1,41 @@
|
||||
use std::rc::Rc;
|
||||
use std::cell::Cell;
|
||||
use async_trait::async_trait;
|
||||
use messagebus::{receivers, Bus, LocalHandler, LocalAsyncHandler, Result as MbusResult};
|
||||
|
||||
struct TmpReceiver {
|
||||
_inner: Rc<Cell<i32>>
|
||||
}
|
||||
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl LocalAsyncHandler<f32> for TmpReceiver {
|
||||
async fn handle(&mut self, msg: f32, bus: &Bus) -> MbusResult {
|
||||
println!("---> f32 {}", msg);
|
||||
|
||||
bus.send(11u16).await.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalHandler<u16> for TmpReceiver {
|
||||
fn handle(&mut self, msg: u16, _bus: &Bus) -> MbusResult {
|
||||
println!("---> u16 {}", msg);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (b, poller) = Bus::build()
|
||||
.register_local(||TmpReceiver {_inner: Rc::new(Cell::new(12))})
|
||||
.subscribe::<f32, receivers::LocalAsync<_>>(Default::default())
|
||||
.subscribe::<u16, receivers::LocalSync<_>>(Default::default())
|
||||
.done()
|
||||
.build();
|
||||
|
||||
b.send(32f32).await.unwrap();
|
||||
|
||||
poller.await
|
||||
}
|
@ -2,7 +2,6 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync
|
||||
|
||||
use futures::{Future, FutureExt};
|
||||
use receiver::ReceiverTrait;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
receiver::{self, Receiver},
|
||||
@ -30,6 +29,8 @@ pub trait ReceiverSubscriberBuilder<M, T: 'static> {
|
||||
pub struct SyncEntry;
|
||||
pub struct UnsyncEntry;
|
||||
|
||||
pub struct LocalEntry;
|
||||
|
||||
#[must_use]
|
||||
pub struct RegisterEntry<K, T> {
|
||||
item: Untyped,
|
||||
@ -65,6 +66,27 @@ impl<K, T: 'static> RegisterEntry<K, T> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T: 'static> RegisterEntry<LocalEntry, T> {
|
||||
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
|
||||
where
|
||||
T: 'static,
|
||||
M: Message + 'static,
|
||||
R: ReceiverSubscriberBuilder<M, T> + 'static,
|
||||
{
|
||||
let (inner, poller) = R::build(cfg).subscribe();
|
||||
|
||||
let receiver = Receiver::new(inner);
|
||||
self.receivers
|
||||
.entry(TypeId::of::<M>())
|
||||
.or_insert_with(Vec::new)
|
||||
.push((receiver, poller));
|
||||
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T: Send + 'static> RegisterEntry<UnsyncEntry, T> {
|
||||
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
|
||||
where
|
||||
@ -118,7 +140,7 @@ impl BusBuilder {
|
||||
|
||||
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T> {
|
||||
RegisterEntry {
|
||||
item: Arc::new(item) as Untyped,
|
||||
item: Untyped::new_readonly(item),
|
||||
builder: self,
|
||||
receivers: HashMap::new(),
|
||||
_m: Default::default(),
|
||||
@ -127,7 +149,19 @@ impl BusBuilder {
|
||||
|
||||
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T> {
|
||||
RegisterEntry {
|
||||
item: Arc::new(Mutex::new(item)) as Untyped,
|
||||
item: Untyped::new_mutex(item),
|
||||
builder: self,
|
||||
receivers: HashMap::new(),
|
||||
_m: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_local<T: 'static>(
|
||||
self,
|
||||
item: impl FnOnce() -> T + Send + 'static,
|
||||
) -> RegisterEntry<LocalEntry, T> {
|
||||
RegisterEntry {
|
||||
item: Untyped::new_local(item),
|
||||
builder: self,
|
||||
receivers: HashMap::new(),
|
||||
_m: Default::default(),
|
||||
|
@ -62,15 +62,15 @@ pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
|
||||
}
|
||||
|
||||
pub trait LocalHandler<M: Message> {
|
||||
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
||||
fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
|
||||
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
#[async_trait(?Send)]
|
||||
pub trait LocalAsyncHandler<M: Message> {
|
||||
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
||||
async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
|
||||
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
@ -83,7 +83,7 @@ pub trait LocalBatchHandler<M: Message> {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
#[async_trait(?Send)]
|
||||
pub trait LocalAsyncBatchHandler<M: Message> {
|
||||
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
||||
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
||||
|
12
src/lib.rs
12
src/lib.rs
@ -5,6 +5,7 @@ pub mod msgs;
|
||||
mod receiver;
|
||||
pub mod receivers;
|
||||
mod trait_object;
|
||||
mod untyped;
|
||||
mod utils;
|
||||
|
||||
use builder::BusBuilder;
|
||||
@ -14,13 +15,13 @@ pub use receiver::SendError;
|
||||
use receiver::{Receiver, ReceiverStats};
|
||||
use utils::binary_search_range_by_key;
|
||||
|
||||
use core::any::{Any, TypeId};
|
||||
use core::any::TypeId;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
pub type Untyped = Arc<dyn Any + Send + Sync>;
|
||||
pub use untyped::Untyped;
|
||||
pub type Result = anyhow::Result<()>;
|
||||
|
||||
pub struct BusInner {
|
||||
@ -58,8 +59,7 @@ impl BusInner {
|
||||
|
||||
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
|
||||
if self.closed.load(Ordering::SeqCst) {
|
||||
println!("Bus closed. Skipping send!");
|
||||
return Ok(());
|
||||
return Err(SendError::Closed(msg));
|
||||
}
|
||||
|
||||
let tid = TypeId::of::<M>();
|
||||
@ -70,7 +70,7 @@ impl BusInner {
|
||||
}
|
||||
|
||||
if let Some((_, r)) = self.receivers.get(range.start) {
|
||||
r.try_broadcast(msg.clone())?;
|
||||
r.try_broadcast(msg)?;
|
||||
} else {
|
||||
println!("Unhandled message {:?}", core::any::type_name::<M>());
|
||||
}
|
||||
@ -96,7 +96,7 @@ impl BusInner {
|
||||
}
|
||||
|
||||
if let Some((_, r)) = self.receivers.get(range.start) {
|
||||
r.broadcast(msg.clone()).await?;
|
||||
r.broadcast(msg).await?;
|
||||
} else {
|
||||
println!("Unhandled message {:?}", core::any::type_name::<M>());
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: AsyncHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<T>().unwrap();
|
||||
let ut = ut.downcast_sync::<T>().unwrap();
|
||||
|
||||
let mut x = rx
|
||||
.map(|msg| {
|
||||
@ -86,7 +86,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
tokio::task::spawn(async move { ut.handle(msg, &bus).await })
|
||||
tokio::task::spawn(
|
||||
async move { ut.lock_read().await.get_ref().handle(msg, &bus).await },
|
||||
)
|
||||
})
|
||||
.buffer_unordered(cfg.max_parallel);
|
||||
|
||||
@ -103,7 +105,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
|
||||
let res =
|
||||
tokio::task::spawn(async move { ut.lock_read().await.get_ref().sync(&bus_clone).await })
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use futures::{executor::block_on, Future, StreamExt};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
@ -76,7 +76,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: Handler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<T>().unwrap();
|
||||
let ut = ut.downcast_sync::<T>().unwrap();
|
||||
|
||||
let mut x = rx
|
||||
.map(|msg| {
|
||||
@ -86,7 +86,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || ut.handle(msg, &bus))
|
||||
tokio::task::spawn_blocking(move || {
|
||||
block_on(ut.lock_read()).get_ref().handle(msg, &bus)
|
||||
})
|
||||
})
|
||||
.buffer_unordered(cfg.max_parallel);
|
||||
|
||||
@ -103,7 +105,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
|
||||
let res =
|
||||
tokio::task::spawn_blocking(move || block_on(ut.lock_read()).get_ref().sync(&bus_clone))
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
@ -112,10 +116,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
_ => (),
|
||||
}
|
||||
|
||||
println!(
|
||||
"[EXIT] BufferUnorderedSync<{}>",
|
||||
std::any::type_name::<M>()
|
||||
);
|
||||
println!("[EXIT] BufferUnorderedSync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct BufferUnorderedSync<M: Message> {
|
||||
|
@ -79,19 +79,16 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: AsyncBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<T>().unwrap();
|
||||
let rx = rx
|
||||
.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
let ut = ut.downcast_sync::<T>().unwrap();
|
||||
let rx = rx.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let rx = if cfg.when_ready {
|
||||
rx.ready_chunks(cfg.batch_size)
|
||||
.left_stream()
|
||||
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||
} else {
|
||||
rx.chunks(cfg.batch_size)
|
||||
.right_stream()
|
||||
rx.chunks(cfg.batch_size).right_stream()
|
||||
};
|
||||
|
||||
let mut rx = rx
|
||||
@ -102,7 +99,13 @@ async fn buffer_unordered_poller<T, M>(
|
||||
let bus_clone = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
tokio::task::spawn(async move { ut.handle(msgs, &bus_clone).await })
|
||||
tokio::task::spawn(async move {
|
||||
ut.lock_read()
|
||||
.await
|
||||
.get_ref()
|
||||
.handle(msgs, &bus_clone)
|
||||
.await
|
||||
})
|
||||
})
|
||||
.buffer_unordered(cfg.max_parallel);
|
||||
|
||||
@ -119,7 +122,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
|
||||
let res =
|
||||
tokio::task::spawn(async move { ut.lock_read().await.get_ref().sync(&bus_clone).await })
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
|
@ -27,9 +27,9 @@ pub struct BufferUnorderedBatchedConfig {
|
||||
impl Default for BufferUnorderedBatchedConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buffer_size: 8,
|
||||
buffer_size: 8,
|
||||
max_parallel: 2,
|
||||
batch_size: 8,
|
||||
batch_size: 8,
|
||||
when_ready: false,
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use futures::{executor::block_on, Future, StreamExt};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
@ -16,7 +16,7 @@ use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
msgs,
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
Bus, BatchHandler, Message, Untyped,
|
||||
BatchHandler, Bus, Message, Untyped,
|
||||
};
|
||||
|
||||
pub struct BufferUnorderedBatchedSyncSubscriber<T, M>
|
||||
@ -78,19 +78,16 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: BatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<T>().unwrap();
|
||||
let rx = rx
|
||||
.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
let ut = ut.downcast_sync::<T>().unwrap();
|
||||
let rx = rx.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let rx = if cfg.when_ready {
|
||||
rx.ready_chunks(cfg.batch_size)
|
||||
.left_stream()
|
||||
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||
} else {
|
||||
rx.chunks(cfg.batch_size)
|
||||
.right_stream()
|
||||
rx.chunks(cfg.batch_size).right_stream()
|
||||
};
|
||||
|
||||
let mut rx = rx
|
||||
@ -101,7 +98,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || ut.handle(msgs, &bus))
|
||||
tokio::task::spawn_blocking(move || {
|
||||
block_on(ut.lock_read()).get_ref().handle(msgs, &bus)
|
||||
})
|
||||
})
|
||||
.buffer_unordered(cfg.max_parallel);
|
||||
|
||||
@ -118,7 +117,9 @@ async fn buffer_unordered_poller<T, M>(
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
|
||||
let res =
|
||||
tokio::task::spawn_blocking(move || block_on(ut.lock_read()).get_ref().sync(&bus_clone))
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
|
180
src/receivers/local/async.rs
Normal file
180
src/receivers/local/async.rs
Normal file
@ -0,0 +1,180 @@
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{pin_mut, Future, StreamExt};
|
||||
|
||||
use super::{LocalConfig, LocalStats};
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
Bus, LocalAsyncHandler, Message, Untyped,
|
||||
};
|
||||
|
||||
pub struct LocalAsyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalAsyncHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
cfg: LocalConfig,
|
||||
_m: PhantomData<(T, M)>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriber<T> for LocalAsyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalAsyncHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
fn subscribe(
|
||||
self,
|
||||
) -> (
|
||||
Arc<dyn ReceiverTrait>,
|
||||
Box<
|
||||
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
>,
|
||||
) {
|
||||
let cfg = self.cfg;
|
||||
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||
let stats = Arc::new(LocalStats {
|
||||
buffer: AtomicU64::new(0),
|
||||
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||
});
|
||||
|
||||
let arc = Arc::new(LocalAsync::<M> {
|
||||
tx,
|
||||
stats: stats.clone(),
|
||||
});
|
||||
|
||||
let poller = Box::new(move |ut| {
|
||||
Box::new(move |bus| {
|
||||
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||
});
|
||||
|
||||
(arc, poller)
|
||||
}
|
||||
}
|
||||
|
||||
async fn buffer_unordered_poller<T, M>(
|
||||
rx: mpsc::Receiver<M>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stats: Arc<LocalStats>,
|
||||
_cfg: LocalConfig,
|
||||
) where
|
||||
T: LocalAsyncHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast_local::<T>().unwrap();
|
||||
let bus1 = bus.clone();
|
||||
|
||||
let x = rx.then(|msg| {
|
||||
let bus1 = bus1.clone();
|
||||
ut.spawn_local(move |item| {
|
||||
Box::pin(async move {
|
||||
let _ = item.handle(msg, &bus1).await;
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
pin_mut!(x);
|
||||
|
||||
while let Some(_) = x.next().await {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
let bus_clone = bus.clone();
|
||||
ut.spawn_local(move |item| {
|
||||
Box::pin(async move {
|
||||
let _ = item.sync(&bus_clone).await;
|
||||
})
|
||||
})
|
||||
.await;
|
||||
|
||||
println!("[EXIT] LocalAsync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct LocalAsync<M: Message> {
|
||||
tx: mpsc::Sender<M>,
|
||||
stats: Arc<LocalStats>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalAsync<M>
|
||||
where
|
||||
T: LocalAsyncHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
type Entry = LocalAsyncSubscriber<T, M>;
|
||||
type Config = LocalConfig;
|
||||
|
||||
fn build(cfg: Self::Config) -> Self::Entry {
|
||||
LocalAsyncSubscriber {
|
||||
cfg,
|
||||
_m: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TypedReceiver<M> for LocalAsync<M> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||
match self.tx.poll_ready(ctx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.try_send(m) {
|
||||
Ok(_) => {
|
||||
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> ReceiverTrait for LocalAsync<M> {
|
||||
fn typed(&self) -> AnyReceiver<'_> {
|
||||
AnyReceiver::new(self)
|
||||
}
|
||||
|
||||
fn type_id(&self) -> TypeId {
|
||||
TypeId::of::<LocalAsync<M>>()
|
||||
}
|
||||
|
||||
fn stats(&self) -> ReceiverStats {
|
||||
ReceiverStats {
|
||||
name: std::any::type_name::<M>().into(),
|
||||
fields: vec![
|
||||
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||
(
|
||||
"buffer_total".into(),
|
||||
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||
),
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.tx.close();
|
||||
}
|
||||
|
||||
fn sync(&self) {
|
||||
self.tx.flush();
|
||||
}
|
||||
|
||||
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
25
src/receivers/local/mod.rs
Normal file
25
src/receivers/local/mod.rs
Normal file
@ -0,0 +1,25 @@
|
||||
mod r#async;
|
||||
mod sync;
|
||||
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
pub use sync::{LocalSync, LocalSyncSubscriber};
|
||||
|
||||
pub use r#async::{LocalAsync, LocalAsyncSubscriber};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LocalStats {
|
||||
pub buffer: AtomicU64,
|
||||
pub buffer_total: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct LocalConfig {
|
||||
pub buffer_size: usize,
|
||||
}
|
||||
|
||||
impl Default for LocalConfig {
|
||||
fn default() -> Self {
|
||||
Self { buffer_size: 1 }
|
||||
}
|
||||
}
|
177
src/receivers/local/sync.rs
Normal file
177
src/receivers/local/sync.rs
Normal file
@ -0,0 +1,177 @@
|
||||
use super::{LocalConfig, LocalStats};
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{pin_mut, Future, StreamExt};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
Bus, LocalHandler, Message, Untyped,
|
||||
};
|
||||
|
||||
pub struct LocalSyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
cfg: LocalConfig,
|
||||
_m: PhantomData<(M, T)>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriber<T> for LocalSyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
fn subscribe(
|
||||
self,
|
||||
) -> (
|
||||
Arc<dyn ReceiverTrait>,
|
||||
Box<
|
||||
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
>,
|
||||
) {
|
||||
let cfg = self.cfg;
|
||||
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||
let stats = Arc::new(LocalStats {
|
||||
buffer: AtomicU64::new(0),
|
||||
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||
});
|
||||
|
||||
let arc = Arc::new(LocalSync::<M> {
|
||||
tx,
|
||||
stats: stats.clone(),
|
||||
});
|
||||
let poller = Box::new(move |ut| {
|
||||
Box::new(move |bus| {
|
||||
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||
});
|
||||
|
||||
(arc, poller)
|
||||
}
|
||||
}
|
||||
|
||||
async fn buffer_unordered_poller<T, M>(
|
||||
rx: mpsc::Receiver<M>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stats: Arc<LocalStats>,
|
||||
_cfg: LocalConfig,
|
||||
) where
|
||||
T: LocalHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast_local::<T>().unwrap();
|
||||
let bus1 = bus.clone();
|
||||
|
||||
let x = rx.then(|msg| {
|
||||
let bus1 = bus1.clone();
|
||||
ut.spawn_local(move |item| {
|
||||
let _ = item.handle(msg, &bus1);
|
||||
|
||||
Box::pin(async move {})
|
||||
})
|
||||
});
|
||||
|
||||
pin_mut!(x);
|
||||
|
||||
while let Some(_) = x.next().await {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
let bus_clone = bus.clone();
|
||||
ut.spawn_local(move |item| {
|
||||
let _ = item.sync(&bus_clone);
|
||||
Box::pin(async move {})
|
||||
})
|
||||
.await;
|
||||
|
||||
println!("[EXIT] LocalAsync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct LocalSync<M: Message> {
|
||||
tx: mpsc::Sender<M>,
|
||||
stats: Arc<LocalStats>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalSync<M>
|
||||
where
|
||||
T: LocalHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
type Entry = LocalSyncSubscriber<T, M>;
|
||||
type Config = LocalConfig;
|
||||
|
||||
fn build(cfg: Self::Config) -> Self::Entry {
|
||||
LocalSyncSubscriber {
|
||||
cfg,
|
||||
_m: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TypedReceiver<M> for LocalSync<M> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||
match self.tx.poll_ready(ctx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.try_send(m) {
|
||||
Ok(_) => {
|
||||
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> ReceiverTrait for LocalSync<M> {
|
||||
fn typed(&self) -> AnyReceiver<'_> {
|
||||
AnyReceiver::new(self)
|
||||
}
|
||||
|
||||
fn type_id(&self) -> TypeId {
|
||||
TypeId::of::<LocalSync<M>>()
|
||||
}
|
||||
|
||||
fn stats(&self) -> ReceiverStats {
|
||||
ReceiverStats {
|
||||
name: std::any::type_name::<M>().into(),
|
||||
fields: vec![
|
||||
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||
(
|
||||
"buffer_total".into(),
|
||||
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||
),
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.tx.close();
|
||||
}
|
||||
|
||||
fn sync(&self) {
|
||||
self.tx.flush();
|
||||
}
|
||||
|
||||
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
196
src/receivers/local_batched/async.rs
Normal file
196
src/receivers/local_batched/async.rs
Normal file
@ -0,0 +1,196 @@
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
|
||||
use super::{LocalBatchedConfig, LocalBatchedStats};
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
Bus, LocalAsyncBatchHandler, Message, Untyped,
|
||||
};
|
||||
|
||||
pub struct LocalBatchedAsyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalAsyncBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
cfg: LocalBatchedConfig,
|
||||
_m: PhantomData<(T, M)>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriber<T> for LocalBatchedAsyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalAsyncBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
fn subscribe(
|
||||
self,
|
||||
) -> (
|
||||
Arc<dyn ReceiverTrait>,
|
||||
Box<
|
||||
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
>,
|
||||
) {
|
||||
let cfg = self.cfg;
|
||||
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||
let stats = Arc::new(LocalBatchedStats {
|
||||
buffer: AtomicU64::new(0),
|
||||
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||
batch: AtomicU64::new(0),
|
||||
batch_size: AtomicU64::new(cfg.batch_size as _),
|
||||
});
|
||||
|
||||
let arc = Arc::new(LocalBatchedAsync::<M> {
|
||||
tx,
|
||||
stats: stats.clone(),
|
||||
});
|
||||
|
||||
let poller = Box::new(move |ut| {
|
||||
Box::new(move |bus| {
|
||||
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||
});
|
||||
|
||||
(arc, poller)
|
||||
}
|
||||
}
|
||||
|
||||
async fn buffer_unordered_poller<T, M>(
|
||||
rx: mpsc::Receiver<M>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stats: Arc<LocalBatchedStats>,
|
||||
cfg: LocalBatchedConfig,
|
||||
) where
|
||||
T: LocalAsyncBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast_local::<T>().unwrap();
|
||||
|
||||
let rx = rx.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let mut rx = if cfg.when_ready {
|
||||
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||
} else {
|
||||
rx.chunks(cfg.batch_size).right_stream()
|
||||
};
|
||||
|
||||
while let Some(msgs) = rx.next().await {
|
||||
stats.batch.fetch_sub(msgs.len() as _, Ordering::Relaxed);
|
||||
|
||||
let bus_clone = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
ut.spawn_local(move |item| {
|
||||
Box::pin(async move {
|
||||
let _ = item.handle(msgs, &bus_clone).await;
|
||||
})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
let bus_clone = bus.clone();
|
||||
ut.spawn_local(move |item| {
|
||||
Box::pin(async move {
|
||||
let _ = item.sync(&bus_clone).await;
|
||||
})
|
||||
})
|
||||
.await;
|
||||
|
||||
println!("[EXIT] LocalBatchedAsync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct LocalBatchedAsync<M: Message> {
|
||||
tx: mpsc::Sender<M>,
|
||||
stats: Arc<LocalBatchedStats>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalBatchedAsync<M>
|
||||
where
|
||||
T: LocalAsyncBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
type Entry = LocalBatchedAsyncSubscriber<T, M>;
|
||||
type Config = LocalBatchedConfig;
|
||||
|
||||
fn build(cfg: Self::Config) -> Self::Entry {
|
||||
LocalBatchedAsyncSubscriber {
|
||||
cfg,
|
||||
_m: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TypedReceiver<M> for LocalBatchedAsync<M> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||
match self.tx.poll_ready(ctx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.try_send(m) {
|
||||
Ok(_) => {
|
||||
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> ReceiverTrait for LocalBatchedAsync<M> {
|
||||
fn typed(&self) -> AnyReceiver<'_> {
|
||||
AnyReceiver::new(self)
|
||||
}
|
||||
|
||||
fn type_id(&self) -> TypeId {
|
||||
TypeId::of::<Self>()
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.tx.close();
|
||||
}
|
||||
|
||||
fn stats(&self) -> ReceiverStats {
|
||||
ReceiverStats {
|
||||
name: std::any::type_name::<M>().into(),
|
||||
fields: vec![
|
||||
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||
(
|
||||
"buffer_total".into(),
|
||||
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||
),
|
||||
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
|
||||
(
|
||||
"batch_size".into(),
|
||||
self.stats.batch_size.load(Ordering::SeqCst),
|
||||
),
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
fn sync(&self) {
|
||||
self.tx.flush();
|
||||
}
|
||||
|
||||
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
33
src/receivers/local_batched/mod.rs
Normal file
33
src/receivers/local_batched/mod.rs
Normal file
@ -0,0 +1,33 @@
|
||||
mod r#async;
|
||||
mod sync;
|
||||
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
pub use sync::{LocalBatchedSync, LocalBatchedSyncSubscriber};
|
||||
|
||||
pub use r#async::{LocalBatchedAsync, LocalBatchedAsyncSubscriber};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LocalBatchedStats {
|
||||
pub buffer: AtomicU64,
|
||||
pub buffer_total: AtomicU64,
|
||||
pub batch: AtomicU64,
|
||||
pub batch_size: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct LocalBatchedConfig {
|
||||
pub buffer_size: usize,
|
||||
pub batch_size: usize,
|
||||
pub when_ready: bool,
|
||||
}
|
||||
|
||||
impl Default for LocalBatchedConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buffer_size: 4,
|
||||
batch_size: 16,
|
||||
when_ready: false,
|
||||
}
|
||||
}
|
||||
}
|
193
src/receivers/local_batched/sync.rs
Normal file
193
src/receivers/local_batched/sync.rs
Normal file
@ -0,0 +1,193 @@
|
||||
use super::{LocalBatchedConfig, LocalBatchedStats};
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
Bus, LocalBatchHandler, Message, Untyped,
|
||||
};
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub struct LocalBatchedSyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
cfg: LocalBatchedConfig,
|
||||
_m: PhantomData<(M, T)>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriber<T> for LocalBatchedSyncSubscriber<T, M>
|
||||
where
|
||||
T: LocalBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
fn subscribe(
|
||||
self,
|
||||
) -> (
|
||||
Arc<dyn ReceiverTrait>,
|
||||
Box<
|
||||
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
>,
|
||||
) {
|
||||
let cfg = self.cfg;
|
||||
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||
let stats = Arc::new(LocalBatchedStats {
|
||||
buffer: AtomicU64::new(0),
|
||||
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||
batch: AtomicU64::new(0),
|
||||
batch_size: AtomicU64::new(cfg.batch_size as _),
|
||||
});
|
||||
|
||||
let arc = Arc::new(LocalBatchedSync::<M> {
|
||||
tx,
|
||||
stats: stats.clone(),
|
||||
});
|
||||
let poller = Box::new(move |ut| {
|
||||
Box::new(move |bus| {
|
||||
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||
});
|
||||
|
||||
(arc, poller)
|
||||
}
|
||||
}
|
||||
|
||||
async fn buffer_unordered_poller<T, M>(
|
||||
rx: mpsc::Receiver<M>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stats: Arc<LocalBatchedStats>,
|
||||
cfg: LocalBatchedConfig,
|
||||
) where
|
||||
T: LocalBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast_local::<T>().unwrap();
|
||||
|
||||
let rx = rx.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let mut rx = if cfg.when_ready {
|
||||
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||
} else {
|
||||
rx.chunks(cfg.batch_size).right_stream()
|
||||
};
|
||||
|
||||
while let Some(msgs) = rx.next().await {
|
||||
stats.batch.fetch_sub(msgs.len() as _, Ordering::Relaxed);
|
||||
|
||||
let bus_clone = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
ut.spawn_local(move |item| {
|
||||
let _ = item.handle(msgs, &bus_clone);
|
||||
|
||||
Box::pin(async move {})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
ut.spawn_local(move |item| {
|
||||
let _ = item.sync(&bus_clone);
|
||||
Box::pin(async move {})
|
||||
})
|
||||
.await;
|
||||
|
||||
println!("[EXIT] LocalBatchedSync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct LocalBatchedSync<M: Message> {
|
||||
tx: mpsc::Sender<M>,
|
||||
stats: Arc<LocalBatchedStats>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalBatchedSync<M>
|
||||
where
|
||||
T: LocalBatchHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
type Entry = LocalBatchedSyncSubscriber<T, M>;
|
||||
type Config = LocalBatchedConfig;
|
||||
|
||||
fn build(cfg: Self::Config) -> Self::Entry {
|
||||
LocalBatchedSyncSubscriber {
|
||||
cfg,
|
||||
_m: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TypedReceiver<M> for LocalBatchedSync<M> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||
match self.tx.poll_ready(ctx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.try_send(m) {
|
||||
Ok(_) => {
|
||||
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> ReceiverTrait for LocalBatchedSync<M> {
|
||||
fn typed(&self) -> AnyReceiver<'_> {
|
||||
AnyReceiver::new(self)
|
||||
}
|
||||
|
||||
fn type_id(&self) -> TypeId {
|
||||
TypeId::of::<Self>()
|
||||
}
|
||||
|
||||
fn stats(&self) -> ReceiverStats {
|
||||
ReceiverStats {
|
||||
name: std::any::type_name::<M>().into(),
|
||||
fields: vec![
|
||||
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||
(
|
||||
"buffer_total".into(),
|
||||
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||
),
|
||||
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
|
||||
(
|
||||
"batch_size".into(),
|
||||
self.stats.batch_size.load(Ordering::SeqCst),
|
||||
),
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.tx.close();
|
||||
}
|
||||
|
||||
fn sync(&self) {
|
||||
self.tx.flush();
|
||||
}
|
||||
|
||||
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
@ -1,5 +1,7 @@
|
||||
mod buffer_unordered;
|
||||
mod buffer_unordered_batched;
|
||||
mod local;
|
||||
mod local_batched;
|
||||
mod mpsc_futures;
|
||||
mod synchronize_batched;
|
||||
mod synchronized;
|
||||
@ -14,8 +16,8 @@ pub use buffer_unordered::{
|
||||
};
|
||||
|
||||
pub use buffer_unordered_batched::{
|
||||
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig,
|
||||
BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
|
||||
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber,
|
||||
BufferUnorderedBatchedConfig, BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
|
||||
};
|
||||
|
||||
pub use synchronized::{
|
||||
@ -27,3 +29,10 @@ pub use synchronize_batched::{
|
||||
SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
|
||||
SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
|
||||
};
|
||||
|
||||
pub use local::{LocalAsync, LocalAsyncSubscriber, LocalConfig, LocalSync, LocalSyncSubscriber};
|
||||
|
||||
pub use local_batched::{
|
||||
LocalBatchedAsync, LocalBatchedAsyncSubscriber, LocalBatchedConfig, LocalBatchedSync,
|
||||
LocalBatchedSyncSubscriber,
|
||||
};
|
||||
|
@ -99,7 +99,7 @@ impl<T> Stream for Receiver<T> {
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ use std::{
|
||||
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use super::{SynchronizeBatchedConfig, SynchronizeBatchedStats};
|
||||
use crate::{
|
||||
@ -78,20 +77,17 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: AsyncBatchSynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let ut = ut.downcast_send::<T>().unwrap();
|
||||
|
||||
let rx = rx
|
||||
.inspect(|_|{
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
let rx = rx.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let mut rx = if cfg.when_ready {
|
||||
rx.ready_chunks(cfg.batch_size)
|
||||
.left_stream()
|
||||
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||
} else {
|
||||
rx.chunks(cfg.batch_size)
|
||||
.right_stream()
|
||||
rx.chunks(cfg.batch_size).right_stream()
|
||||
};
|
||||
|
||||
while let Some(msgs) = rx.next().await {
|
||||
@ -101,8 +97,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
let ut = ut.clone();
|
||||
|
||||
let res =
|
||||
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await })
|
||||
.await;
|
||||
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await }).await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
|
@ -6,7 +6,7 @@ use crate::{
|
||||
BatchSynchronizedHandler, Bus, Message, Untyped,
|
||||
};
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use futures::{executor::block_on, Future, StreamExt};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
@ -17,7 +17,6 @@ use std::{
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct SynchronizeBatchedSyncSubscriber<T, M>
|
||||
where
|
||||
@ -75,20 +74,17 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: BatchSynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let ut = ut.downcast_send::<T>().unwrap();
|
||||
|
||||
let rx = rx
|
||||
.inspect(|_|{
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
let rx = rx.inspect(|_| {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let mut rx = if cfg.when_ready {
|
||||
rx.ready_chunks(cfg.batch_size)
|
||||
.left_stream()
|
||||
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||
} else {
|
||||
rx.chunks(cfg.batch_size)
|
||||
.right_stream()
|
||||
rx.chunks(cfg.batch_size).right_stream()
|
||||
};
|
||||
|
||||
while let Some(msgs) = rx.next().await {
|
||||
@ -98,10 +94,10 @@ async fn buffer_unordered_poller<T, M>(
|
||||
let ut = ut.clone();
|
||||
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
let mut uut = futures::executor::block_on(ut.lock());
|
||||
|
||||
let mut uut = block_on(ut.lock());
|
||||
uut.handle(msgs, &bus_clone)
|
||||
}).await;
|
||||
})
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
@ -113,10 +109,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
futures::executor::block_on(ut.lock()).sync(&bus_clone)
|
||||
})
|
||||
.await;
|
||||
let res = tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
|
@ -11,7 +11,6 @@ use std::{
|
||||
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use super::{SynchronizedConfig, SynchronizedStats};
|
||||
use crate::{
|
||||
@ -76,10 +75,13 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: AsyncSynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let mut x = rx.then(|msg| {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
let ut = ut.downcast_send::<T>().unwrap();
|
||||
let ut1 = ut.clone();
|
||||
let bus1 = bus.clone();
|
||||
|
||||
let mut x = rx.then(move |msg| {
|
||||
let bus = bus1.clone();
|
||||
let ut = ut1.clone();
|
||||
|
||||
tokio::task::spawn(async move { ut.lock().await.handle(msg, &bus).await })
|
||||
});
|
||||
|
@ -11,7 +11,6 @@ use std::{
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
@ -74,12 +73,15 @@ async fn buffer_unordered_poller<T, M>(
|
||||
T: SynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let ut = ut.downcast_send::<T>().unwrap();
|
||||
let mut x = rx.then(|msg| {
|
||||
let ut = ut.clone();
|
||||
let bus = bus.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus))
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut uut = block_on(ut.lock());
|
||||
uut.handle(msg, &bus)
|
||||
})
|
||||
});
|
||||
|
||||
while let Some(err) = x.next().await {
|
||||
@ -95,10 +97,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
futures::executor::block_on(ut.lock()).sync(&bus_clone)
|
||||
})
|
||||
.await;
|
||||
let res = tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
@ -107,10 +106,7 @@ async fn buffer_unordered_poller<T, M>(
|
||||
_ => (),
|
||||
}
|
||||
|
||||
println!(
|
||||
"[EXIT] BufferUnorderedSync<{}>",
|
||||
std::any::type_name::<M>()
|
||||
);
|
||||
println!("[EXIT] BufferUnorderedSync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct SynchronizedSync<M: Message> {
|
||||
|
174
src/untyped.rs
Normal file
174
src/untyped.rs
Normal file
@ -0,0 +1,174 @@
|
||||
use core::any::Any;
|
||||
use core::future::Future;
|
||||
use core::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Lock<'a, T> {
|
||||
ReadOnly(&'a T),
|
||||
RwRead(RwLockReadGuard<'a, T>),
|
||||
RwWrite(RwLockWriteGuard<'a, T>),
|
||||
WriteOnly(MutexGuard<'a, T>),
|
||||
}
|
||||
|
||||
impl<'a, T> Lock<'a, T> {
|
||||
pub fn get_ref(&self) -> &T {
|
||||
match self {
|
||||
Lock::ReadOnly(inner) => &inner,
|
||||
Lock::RwRead(inner) => &inner,
|
||||
Lock::RwWrite(inner) => &inner,
|
||||
Lock::WriteOnly(inner) => &inner,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
match self {
|
||||
Lock::ReadOnly(_) => panic!("!!"),
|
||||
Lock::RwRead(_) => panic!("!!"),
|
||||
Lock::RwWrite(inner) => &mut *inner,
|
||||
Lock::WriteOnly(inner) => &mut *inner,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Downcasted<T> {
|
||||
ReadOnly(Arc<T>),
|
||||
ReadWrite(Arc<RwLock<T>>),
|
||||
WriteOnly(Arc<Mutex<T>>),
|
||||
}
|
||||
|
||||
impl<T> Clone for Downcasted<T> {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Downcasted::ReadOnly(inner) => Downcasted::ReadOnly(inner.clone()),
|
||||
Downcasted::ReadWrite(inner) => Downcasted::ReadWrite(inner.clone()),
|
||||
Downcasted::WriteOnly(inner) => Downcasted::WriteOnly(inner.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> Downcasted<T> {
|
||||
pub async fn lock_read(&self) -> Lock<'_, T> {
|
||||
match self {
|
||||
Downcasted::ReadOnly(inner) => Lock::ReadOnly(&inner),
|
||||
Downcasted::ReadWrite(inner) => Lock::RwRead(inner.read().await),
|
||||
Downcasted::WriteOnly(inner) => Lock::WriteOnly(inner.lock().await),
|
||||
}
|
||||
}
|
||||
pub async fn lock_write(&self) -> Lock<'_, T> {
|
||||
match self {
|
||||
Downcasted::ReadOnly(_) => unimplemented!(),
|
||||
Downcasted::ReadWrite(inner) => Lock::RwWrite(inner.write().await),
|
||||
Downcasted::WriteOnly(inner) => Lock::WriteOnly(inner.lock().await),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Untyped {
|
||||
inner: Arc<dyn Any + Send + Sync>,
|
||||
}
|
||||
|
||||
impl Untyped {
|
||||
pub fn new_readonly<T: Send + Sync + 'static>(item: T) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(item),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_rwlock<T: Send + Sync + 'static>(item: T) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(item)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_mutex<T: Send + 'static>(item: T) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(item)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_local<T: 'static, F: FnOnce() -> T + Send + 'static>(f: F) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(ThreadDedicated::new(f)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn downcast_sync<T: Send + Sync + 'static>(self) -> Option<Downcasted<T>> {
|
||||
let item = match self.inner.clone().downcast::<RwLock<T>>() {
|
||||
Ok(inner) => Downcasted::ReadWrite(inner),
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
Some(item)
|
||||
}
|
||||
|
||||
pub fn downcast_send1<T: Send + 'static>(self) -> Option<Downcasted<T>> {
|
||||
let item = match self.inner.clone().downcast::<Mutex<T>>() {
|
||||
Ok(inner) => Downcasted::WriteOnly(inner),
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
Some(item)
|
||||
}
|
||||
|
||||
pub fn downcast_send<T: Send + 'static>(self) -> Option<Arc<Mutex<T>>> {
|
||||
self.inner.clone().downcast::<Mutex<T>>().ok()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn downcast_local<T: 'static>(self) -> Option<Arc<ThreadDedicated<T>>> {
|
||||
self.inner.clone().downcast::<ThreadDedicated<T>>().ok()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ThreadDedicated<T: 'static> {
|
||||
sender: mpsc::Sender<
|
||||
Box<dyn for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send>,
|
||||
>,
|
||||
notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl<T: 'static> ThreadDedicated<T> {
|
||||
pub fn new<F: FnOnce() -> T + Send + 'static>(builder: F) -> Self {
|
||||
let notify = Arc::new(Notify::new());
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
|
||||
let sender: mpsc::Sender<
|
||||
Box<dyn for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send>,
|
||||
> = sender;
|
||||
let notify_clone = notify.clone();
|
||||
std::thread::spawn(move || {
|
||||
futures::executor::block_on(async move {
|
||||
let mut obj = builder();
|
||||
|
||||
loop {
|
||||
let cb = match receiver.recv().await {
|
||||
Some(x) => x,
|
||||
None => break,
|
||||
};
|
||||
|
||||
cb(&mut obj).await;
|
||||
notify_clone.notify_one();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Self { sender, notify }
|
||||
}
|
||||
|
||||
pub async fn spawn_local<
|
||||
F: for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send + 'static,
|
||||
>(
|
||||
&self,
|
||||
cb: F,
|
||||
) {
|
||||
self.sender.send(Box::new(cb)).await.ok().unwrap();
|
||||
|
||||
self.notify.notified().await
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user