Compare commits
3 Commits
master
...
9c91d81e89
Author | SHA1 | Date | |
---|---|---|---|
|
9c91d81e89 | ||
|
1bf82e4cc4 | ||
|
2d35840044 |
14
Cargo.toml
14
Cargo.toml
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "messagebus"
|
name = "messagebus"
|
||||||
version = "0.4.6"
|
version = "0.5.2"
|
||||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||||
@ -11,12 +11,12 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "blocking"] }
|
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
|
||||||
parking_lot = "0.11.1"
|
parking_lot = "0.11"
|
||||||
async-trait = "0.1.42"
|
async-trait = "0.1"
|
||||||
futures = "0.3.8"
|
futures = "0.3"
|
||||||
anyhow = "1.0.34"
|
anyhow = "1.0"
|
||||||
crossbeam = "0.8.0"
|
crossbeam = "0.8.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2", features = ["parking_lot", "rt-threaded", "sync", "stream", "macros"] }
|
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "macros"] }
|
||||||
|
41
examples/demo_local.rs
Normal file
41
examples/demo_local.rs
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
use std::rc::Rc;
|
||||||
|
use std::cell::Cell;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use messagebus::{receivers, Bus, LocalHandler, LocalAsyncHandler, Result as MbusResult};
|
||||||
|
|
||||||
|
struct TmpReceiver {
|
||||||
|
_inner: Rc<Cell<i32>>
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl LocalAsyncHandler<f32> for TmpReceiver {
|
||||||
|
async fn handle(&mut self, msg: f32, bus: &Bus) -> MbusResult {
|
||||||
|
println!("---> f32 {}", msg);
|
||||||
|
|
||||||
|
bus.send(11u16).await.unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalHandler<u16> for TmpReceiver {
|
||||||
|
fn handle(&mut self, msg: u16, _bus: &Bus) -> MbusResult {
|
||||||
|
println!("---> u16 {}", msg);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let (b, poller) = Bus::build()
|
||||||
|
.register_local(||TmpReceiver {_inner: Rc::new(Cell::new(12))})
|
||||||
|
.subscribe::<f32, receivers::LocalAsync<_>>(Default::default())
|
||||||
|
.subscribe::<u16, receivers::LocalSync<_>>(Default::default())
|
||||||
|
.done()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
b.send(32f32).await.unwrap();
|
||||||
|
|
||||||
|
poller.await
|
||||||
|
}
|
@ -2,7 +2,6 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync
|
|||||||
|
|
||||||
use futures::{Future, FutureExt};
|
use futures::{Future, FutureExt};
|
||||||
use receiver::ReceiverTrait;
|
use receiver::ReceiverTrait;
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
receiver::{self, Receiver},
|
receiver::{self, Receiver},
|
||||||
@ -30,6 +29,8 @@ pub trait ReceiverSubscriberBuilder<M, T: 'static> {
|
|||||||
pub struct SyncEntry;
|
pub struct SyncEntry;
|
||||||
pub struct UnsyncEntry;
|
pub struct UnsyncEntry;
|
||||||
|
|
||||||
|
pub struct LocalEntry;
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub struct RegisterEntry<K, T> {
|
pub struct RegisterEntry<K, T> {
|
||||||
item: Untyped,
|
item: Untyped,
|
||||||
@ -65,6 +66,27 @@ impl<K, T: 'static> RegisterEntry<K, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<T: 'static> RegisterEntry<LocalEntry, T> {
|
||||||
|
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
|
||||||
|
where
|
||||||
|
T: 'static,
|
||||||
|
M: Message + 'static,
|
||||||
|
R: ReceiverSubscriberBuilder<M, T> + 'static,
|
||||||
|
{
|
||||||
|
let (inner, poller) = R::build(cfg).subscribe();
|
||||||
|
|
||||||
|
let receiver = Receiver::new(inner);
|
||||||
|
self.receivers
|
||||||
|
.entry(TypeId::of::<M>())
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.push((receiver, poller));
|
||||||
|
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<T: Send + 'static> RegisterEntry<UnsyncEntry, T> {
|
impl<T: Send + 'static> RegisterEntry<UnsyncEntry, T> {
|
||||||
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
|
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
|
||||||
where
|
where
|
||||||
@ -118,7 +140,7 @@ impl BusBuilder {
|
|||||||
|
|
||||||
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T> {
|
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T> {
|
||||||
RegisterEntry {
|
RegisterEntry {
|
||||||
item: Arc::new(item) as Untyped,
|
item: Untyped::new_readonly(item),
|
||||||
builder: self,
|
builder: self,
|
||||||
receivers: HashMap::new(),
|
receivers: HashMap::new(),
|
||||||
_m: Default::default(),
|
_m: Default::default(),
|
||||||
@ -127,7 +149,19 @@ impl BusBuilder {
|
|||||||
|
|
||||||
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T> {
|
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T> {
|
||||||
RegisterEntry {
|
RegisterEntry {
|
||||||
item: Arc::new(Mutex::new(item)) as Untyped,
|
item: Untyped::new_mutex(item),
|
||||||
|
builder: self,
|
||||||
|
receivers: HashMap::new(),
|
||||||
|
_m: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_local<T: 'static>(
|
||||||
|
self,
|
||||||
|
item: impl FnOnce() -> T + Send + 'static,
|
||||||
|
) -> RegisterEntry<LocalEntry, T> {
|
||||||
|
RegisterEntry {
|
||||||
|
item: Untyped::new_local(item),
|
||||||
builder: self,
|
builder: self,
|
||||||
receivers: HashMap::new(),
|
receivers: HashMap::new(),
|
||||||
_m: Default::default(),
|
_m: Default::default(),
|
||||||
|
@ -62,15 +62,15 @@ pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub trait LocalHandler<M: Message> {
|
pub trait LocalHandler<M: Message> {
|
||||||
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
|
||||||
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait(?Send)]
|
||||||
pub trait LocalAsyncHandler<M: Message> {
|
pub trait LocalAsyncHandler<M: Message> {
|
||||||
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
|
||||||
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -83,7 +83,7 @@ pub trait LocalBatchHandler<M: Message> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait(?Send)]
|
||||||
pub trait LocalAsyncBatchHandler<M: Message> {
|
pub trait LocalAsyncBatchHandler<M: Message> {
|
||||||
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
|
||||||
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
|
||||||
|
12
src/lib.rs
12
src/lib.rs
@ -5,6 +5,7 @@ pub mod msgs;
|
|||||||
mod receiver;
|
mod receiver;
|
||||||
pub mod receivers;
|
pub mod receivers;
|
||||||
mod trait_object;
|
mod trait_object;
|
||||||
|
mod untyped;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
use builder::BusBuilder;
|
use builder::BusBuilder;
|
||||||
@ -14,13 +15,13 @@ pub use receiver::SendError;
|
|||||||
use receiver::{Receiver, ReceiverStats};
|
use receiver::{Receiver, ReceiverStats};
|
||||||
use utils::binary_search_range_by_key;
|
use utils::binary_search_range_by_key;
|
||||||
|
|
||||||
use core::any::{Any, TypeId};
|
use core::any::TypeId;
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub type Untyped = Arc<dyn Any + Send + Sync>;
|
pub use untyped::Untyped;
|
||||||
pub type Result = anyhow::Result<()>;
|
pub type Result = anyhow::Result<()>;
|
||||||
|
|
||||||
pub struct BusInner {
|
pub struct BusInner {
|
||||||
@ -58,8 +59,7 @@ impl BusInner {
|
|||||||
|
|
||||||
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
|
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
|
||||||
if self.closed.load(Ordering::SeqCst) {
|
if self.closed.load(Ordering::SeqCst) {
|
||||||
println!("Bus closed. Skipping send!");
|
return Err(SendError::Closed(msg));
|
||||||
return Ok(());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let tid = TypeId::of::<M>();
|
let tid = TypeId::of::<M>();
|
||||||
@ -70,7 +70,7 @@ impl BusInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some((_, r)) = self.receivers.get(range.start) {
|
if let Some((_, r)) = self.receivers.get(range.start) {
|
||||||
r.try_broadcast(msg.clone())?;
|
r.try_broadcast(msg)?;
|
||||||
} else {
|
} else {
|
||||||
println!("Unhandled message {:?}", core::any::type_name::<M>());
|
println!("Unhandled message {:?}", core::any::type_name::<M>());
|
||||||
}
|
}
|
||||||
@ -96,7 +96,7 @@ impl BusInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some((_, r)) = self.receivers.get(range.start) {
|
if let Some((_, r)) = self.receivers.get(range.start) {
|
||||||
r.broadcast(msg.clone()).await?;
|
r.broadcast(msg).await?;
|
||||||
} else {
|
} else {
|
||||||
println!("Unhandled message {:?}", core::any::type_name::<M>());
|
println!("Unhandled message {:?}", core::any::type_name::<M>());
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: AsyncHandler<M> + 'static,
|
T: AsyncHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<T>().unwrap();
|
let ut = ut.downcast_sync::<T>().unwrap();
|
||||||
|
|
||||||
let mut x = rx
|
let mut x = rx
|
||||||
.map(|msg| {
|
.map(|msg| {
|
||||||
@ -86,7 +86,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
let bus = bus.clone();
|
let bus = bus.clone();
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
|
|
||||||
tokio::task::spawn(async move { ut.handle(msg, &bus).await })
|
tokio::task::spawn(
|
||||||
|
async move { ut.lock_read().await.get_ref().handle(msg, &bus).await },
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.buffer_unordered(cfg.max_parallel);
|
.buffer_unordered(cfg.max_parallel);
|
||||||
|
|
||||||
@ -103,7 +105,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
|
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
|
let res =
|
||||||
|
tokio::task::spawn(async move { ut.lock_read().await.get_ref().sync(&bus_clone).await })
|
||||||
|
.await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
use futures::{Future, StreamExt};
|
use futures::{executor::block_on, Future, StreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
any::TypeId,
|
any::TypeId,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
@ -76,7 +76,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: Handler<M> + 'static,
|
T: Handler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<T>().unwrap();
|
let ut = ut.downcast_sync::<T>().unwrap();
|
||||||
|
|
||||||
let mut x = rx
|
let mut x = rx
|
||||||
.map(|msg| {
|
.map(|msg| {
|
||||||
@ -86,7 +86,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
let bus = bus.clone();
|
let bus = bus.clone();
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || ut.handle(msg, &bus))
|
tokio::task::spawn_blocking(move || {
|
||||||
|
block_on(ut.lock_read()).get_ref().handle(msg, &bus)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.buffer_unordered(cfg.max_parallel);
|
.buffer_unordered(cfg.max_parallel);
|
||||||
|
|
||||||
@ -103,7 +105,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
|
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
|
let res =
|
||||||
|
tokio::task::spawn_blocking(move || block_on(ut.lock_read()).get_ref().sync(&bus_clone))
|
||||||
|
.await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
@ -112,10 +116,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
println!(
|
println!("[EXIT] BufferUnorderedSync<{}>", std::any::type_name::<M>());
|
||||||
"[EXIT] BufferUnorderedSync<{}>",
|
|
||||||
std::any::type_name::<M>()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BufferUnorderedSync<M: Message> {
|
pub struct BufferUnorderedSync<M: Message> {
|
||||||
|
@ -79,19 +79,16 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: AsyncBatchHandler<M> + 'static,
|
T: AsyncBatchHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<T>().unwrap();
|
let ut = ut.downcast_sync::<T>().unwrap();
|
||||||
let rx = rx
|
let rx = rx.inspect(|_| {
|
||||||
.inspect(|_| {
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
});
|
||||||
});
|
|
||||||
|
|
||||||
let rx = if cfg.when_ready {
|
let rx = if cfg.when_ready {
|
||||||
rx.ready_chunks(cfg.batch_size)
|
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||||
.left_stream()
|
|
||||||
} else {
|
} else {
|
||||||
rx.chunks(cfg.batch_size)
|
rx.chunks(cfg.batch_size).right_stream()
|
||||||
.right_stream()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut rx = rx
|
let mut rx = rx
|
||||||
@ -102,7 +99,13 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
|
|
||||||
tokio::task::spawn(async move { ut.handle(msgs, &bus_clone).await })
|
tokio::task::spawn(async move {
|
||||||
|
ut.lock_read()
|
||||||
|
.await
|
||||||
|
.get_ref()
|
||||||
|
.handle(msgs, &bus_clone)
|
||||||
|
.await
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.buffer_unordered(cfg.max_parallel);
|
.buffer_unordered(cfg.max_parallel);
|
||||||
|
|
||||||
@ -119,7 +122,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
|
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
|
let res =
|
||||||
|
tokio::task::spawn(async move { ut.lock_read().await.get_ref().sync(&bus_clone).await })
|
||||||
|
.await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
|
@ -27,9 +27,9 @@ pub struct BufferUnorderedBatchedConfig {
|
|||||||
impl Default for BufferUnorderedBatchedConfig {
|
impl Default for BufferUnorderedBatchedConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
buffer_size: 8,
|
buffer_size: 8,
|
||||||
max_parallel: 2,
|
max_parallel: 2,
|
||||||
batch_size: 8,
|
batch_size: 8,
|
||||||
when_ready: false,
|
when_ready: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
use futures::{Future, StreamExt};
|
use futures::{executor::block_on, Future, StreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
any::TypeId,
|
any::TypeId,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
@ -16,7 +16,7 @@ use crate::{
|
|||||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||||
msgs,
|
msgs,
|
||||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||||
Bus, BatchHandler, Message, Untyped,
|
BatchHandler, Bus, Message, Untyped,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct BufferUnorderedBatchedSyncSubscriber<T, M>
|
pub struct BufferUnorderedBatchedSyncSubscriber<T, M>
|
||||||
@ -78,19 +78,16 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: BatchHandler<M> + 'static,
|
T: BatchHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<T>().unwrap();
|
let ut = ut.downcast_sync::<T>().unwrap();
|
||||||
let rx = rx
|
let rx = rx.inspect(|_| {
|
||||||
.inspect(|_| {
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
});
|
||||||
});
|
|
||||||
|
|
||||||
let rx = if cfg.when_ready {
|
let rx = if cfg.when_ready {
|
||||||
rx.ready_chunks(cfg.batch_size)
|
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||||
.left_stream()
|
|
||||||
} else {
|
} else {
|
||||||
rx.chunks(cfg.batch_size)
|
rx.chunks(cfg.batch_size).right_stream()
|
||||||
.right_stream()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut rx = rx
|
let mut rx = rx
|
||||||
@ -101,7 +98,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
let bus = bus.clone();
|
let bus = bus.clone();
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || ut.handle(msgs, &bus))
|
tokio::task::spawn_blocking(move || {
|
||||||
|
block_on(ut.lock_read()).get_ref().handle(msgs, &bus)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.buffer_unordered(cfg.max_parallel);
|
.buffer_unordered(cfg.max_parallel);
|
||||||
|
|
||||||
@ -118,7 +117,9 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
|
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
|
let res =
|
||||||
|
tokio::task::spawn_blocking(move || block_on(ut.lock_read()).get_ref().sync(&bus_clone))
|
||||||
|
.await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
|
180
src/receivers/local/async.rs
Normal file
180
src/receivers/local/async.rs
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
use std::{
|
||||||
|
any::TypeId,
|
||||||
|
marker::PhantomData,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
|
use futures::{pin_mut, Future, StreamExt};
|
||||||
|
|
||||||
|
use super::{LocalConfig, LocalStats};
|
||||||
|
use crate::{
|
||||||
|
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||||
|
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||||
|
Bus, LocalAsyncHandler, Message, Untyped,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct LocalAsyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalAsyncHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
cfg: LocalConfig,
|
||||||
|
_m: PhantomData<(T, M)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriber<T> for LocalAsyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalAsyncHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
fn subscribe(
|
||||||
|
self,
|
||||||
|
) -> (
|
||||||
|
Arc<dyn ReceiverTrait>,
|
||||||
|
Box<
|
||||||
|
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
let cfg = self.cfg;
|
||||||
|
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||||
|
let stats = Arc::new(LocalStats {
|
||||||
|
buffer: AtomicU64::new(0),
|
||||||
|
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||||
|
});
|
||||||
|
|
||||||
|
let arc = Arc::new(LocalAsync::<M> {
|
||||||
|
tx,
|
||||||
|
stats: stats.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let poller = Box::new(move |ut| {
|
||||||
|
Box::new(move |bus| {
|
||||||
|
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||||
|
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||||
|
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||||
|
});
|
||||||
|
|
||||||
|
(arc, poller)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn buffer_unordered_poller<T, M>(
|
||||||
|
rx: mpsc::Receiver<M>,
|
||||||
|
bus: Bus,
|
||||||
|
ut: Untyped,
|
||||||
|
stats: Arc<LocalStats>,
|
||||||
|
_cfg: LocalConfig,
|
||||||
|
) where
|
||||||
|
T: LocalAsyncHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
let ut = ut.downcast_local::<T>().unwrap();
|
||||||
|
let bus1 = bus.clone();
|
||||||
|
|
||||||
|
let x = rx.then(|msg| {
|
||||||
|
let bus1 = bus1.clone();
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
Box::pin(async move {
|
||||||
|
let _ = item.handle(msg, &bus1).await;
|
||||||
|
})
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
pin_mut!(x);
|
||||||
|
|
||||||
|
while let Some(_) = x.next().await {
|
||||||
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
let bus_clone = bus.clone();
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
Box::pin(async move {
|
||||||
|
let _ = item.sync(&bus_clone).await;
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
println!("[EXIT] LocalAsync<{}>", std::any::type_name::<M>());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalAsync<M: Message> {
|
||||||
|
tx: mpsc::Sender<M>,
|
||||||
|
stats: Arc<LocalStats>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalAsync<M>
|
||||||
|
where
|
||||||
|
T: LocalAsyncHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
type Entry = LocalAsyncSubscriber<T, M>;
|
||||||
|
type Config = LocalConfig;
|
||||||
|
|
||||||
|
fn build(cfg: Self::Config) -> Self::Entry {
|
||||||
|
LocalAsyncSubscriber {
|
||||||
|
cfg,
|
||||||
|
_m: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> TypedReceiver<M> for LocalAsync<M> {
|
||||||
|
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
match self.tx.poll_ready(ctx) {
|
||||||
|
Poll::Ready(_) => Poll::Ready(()),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||||
|
match self.tx.try_send(m) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> ReceiverTrait for LocalAsync<M> {
|
||||||
|
fn typed(&self) -> AnyReceiver<'_> {
|
||||||
|
AnyReceiver::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn type_id(&self) -> TypeId {
|
||||||
|
TypeId::of::<LocalAsync<M>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stats(&self) -> ReceiverStats {
|
||||||
|
ReceiverStats {
|
||||||
|
name: std::any::type_name::<M>().into(),
|
||||||
|
fields: vec![
|
||||||
|
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||||
|
(
|
||||||
|
"buffer_total".into(),
|
||||||
|
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&self) {
|
||||||
|
self.tx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync(&self) {
|
||||||
|
self.tx.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
25
src/receivers/local/mod.rs
Normal file
25
src/receivers/local/mod.rs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
mod r#async;
|
||||||
|
mod sync;
|
||||||
|
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
|
|
||||||
|
pub use sync::{LocalSync, LocalSyncSubscriber};
|
||||||
|
|
||||||
|
pub use r#async::{LocalAsync, LocalAsyncSubscriber};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LocalStats {
|
||||||
|
pub buffer: AtomicU64,
|
||||||
|
pub buffer_total: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct LocalConfig {
|
||||||
|
pub buffer_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for LocalConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self { buffer_size: 1 }
|
||||||
|
}
|
||||||
|
}
|
177
src/receivers/local/sync.rs
Normal file
177
src/receivers/local/sync.rs
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
use super::{LocalConfig, LocalStats};
|
||||||
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
|
use futures::{pin_mut, Future, StreamExt};
|
||||||
|
use std::{
|
||||||
|
any::TypeId,
|
||||||
|
marker::PhantomData,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||||
|
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||||
|
Bus, LocalHandler, Message, Untyped,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct LocalSyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
cfg: LocalConfig,
|
||||||
|
_m: PhantomData<(M, T)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriber<T> for LocalSyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
fn subscribe(
|
||||||
|
self,
|
||||||
|
) -> (
|
||||||
|
Arc<dyn ReceiverTrait>,
|
||||||
|
Box<
|
||||||
|
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
let cfg = self.cfg;
|
||||||
|
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||||
|
let stats = Arc::new(LocalStats {
|
||||||
|
buffer: AtomicU64::new(0),
|
||||||
|
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||||
|
});
|
||||||
|
|
||||||
|
let arc = Arc::new(LocalSync::<M> {
|
||||||
|
tx,
|
||||||
|
stats: stats.clone(),
|
||||||
|
});
|
||||||
|
let poller = Box::new(move |ut| {
|
||||||
|
Box::new(move |bus| {
|
||||||
|
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||||
|
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||||
|
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||||
|
});
|
||||||
|
|
||||||
|
(arc, poller)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn buffer_unordered_poller<T, M>(
|
||||||
|
rx: mpsc::Receiver<M>,
|
||||||
|
bus: Bus,
|
||||||
|
ut: Untyped,
|
||||||
|
stats: Arc<LocalStats>,
|
||||||
|
_cfg: LocalConfig,
|
||||||
|
) where
|
||||||
|
T: LocalHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
let ut = ut.downcast_local::<T>().unwrap();
|
||||||
|
let bus1 = bus.clone();
|
||||||
|
|
||||||
|
let x = rx.then(|msg| {
|
||||||
|
let bus1 = bus1.clone();
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
let _ = item.handle(msg, &bus1);
|
||||||
|
|
||||||
|
Box::pin(async move {})
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
pin_mut!(x);
|
||||||
|
|
||||||
|
while let Some(_) = x.next().await {
|
||||||
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
let bus_clone = bus.clone();
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
let _ = item.sync(&bus_clone);
|
||||||
|
Box::pin(async move {})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
println!("[EXIT] LocalAsync<{}>", std::any::type_name::<M>());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalSync<M: Message> {
|
||||||
|
tx: mpsc::Sender<M>,
|
||||||
|
stats: Arc<LocalStats>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalSync<M>
|
||||||
|
where
|
||||||
|
T: LocalHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
type Entry = LocalSyncSubscriber<T, M>;
|
||||||
|
type Config = LocalConfig;
|
||||||
|
|
||||||
|
fn build(cfg: Self::Config) -> Self::Entry {
|
||||||
|
LocalSyncSubscriber {
|
||||||
|
cfg,
|
||||||
|
_m: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> TypedReceiver<M> for LocalSync<M> {
|
||||||
|
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
match self.tx.poll_ready(ctx) {
|
||||||
|
Poll::Ready(_) => Poll::Ready(()),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||||
|
match self.tx.try_send(m) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> ReceiverTrait for LocalSync<M> {
|
||||||
|
fn typed(&self) -> AnyReceiver<'_> {
|
||||||
|
AnyReceiver::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn type_id(&self) -> TypeId {
|
||||||
|
TypeId::of::<LocalSync<M>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stats(&self) -> ReceiverStats {
|
||||||
|
ReceiverStats {
|
||||||
|
name: std::any::type_name::<M>().into(),
|
||||||
|
fields: vec![
|
||||||
|
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||||
|
(
|
||||||
|
"buffer_total".into(),
|
||||||
|
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&self) {
|
||||||
|
self.tx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync(&self) {
|
||||||
|
self.tx.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
196
src/receivers/local_batched/async.rs
Normal file
196
src/receivers/local_batched/async.rs
Normal file
@ -0,0 +1,196 @@
|
|||||||
|
use std::{
|
||||||
|
any::TypeId,
|
||||||
|
marker::PhantomData,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
|
use futures::{Future, StreamExt};
|
||||||
|
|
||||||
|
use super::{LocalBatchedConfig, LocalBatchedStats};
|
||||||
|
use crate::{
|
||||||
|
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||||
|
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||||
|
Bus, LocalAsyncBatchHandler, Message, Untyped,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct LocalBatchedAsyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalAsyncBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
cfg: LocalBatchedConfig,
|
||||||
|
_m: PhantomData<(T, M)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriber<T> for LocalBatchedAsyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalAsyncBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
fn subscribe(
|
||||||
|
self,
|
||||||
|
) -> (
|
||||||
|
Arc<dyn ReceiverTrait>,
|
||||||
|
Box<
|
||||||
|
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
let cfg = self.cfg;
|
||||||
|
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||||
|
let stats = Arc::new(LocalBatchedStats {
|
||||||
|
buffer: AtomicU64::new(0),
|
||||||
|
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||||
|
batch: AtomicU64::new(0),
|
||||||
|
batch_size: AtomicU64::new(cfg.batch_size as _),
|
||||||
|
});
|
||||||
|
|
||||||
|
let arc = Arc::new(LocalBatchedAsync::<M> {
|
||||||
|
tx,
|
||||||
|
stats: stats.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let poller = Box::new(move |ut| {
|
||||||
|
Box::new(move |bus| {
|
||||||
|
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||||
|
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||||
|
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||||
|
});
|
||||||
|
|
||||||
|
(arc, poller)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn buffer_unordered_poller<T, M>(
|
||||||
|
rx: mpsc::Receiver<M>,
|
||||||
|
bus: Bus,
|
||||||
|
ut: Untyped,
|
||||||
|
stats: Arc<LocalBatchedStats>,
|
||||||
|
cfg: LocalBatchedConfig,
|
||||||
|
) where
|
||||||
|
T: LocalAsyncBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
let ut = ut.downcast_local::<T>().unwrap();
|
||||||
|
|
||||||
|
let rx = rx.inspect(|_| {
|
||||||
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut rx = if cfg.when_ready {
|
||||||
|
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||||
|
} else {
|
||||||
|
rx.chunks(cfg.batch_size).right_stream()
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(msgs) = rx.next().await {
|
||||||
|
stats.batch.fetch_sub(msgs.len() as _, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let bus_clone = bus.clone();
|
||||||
|
let ut = ut.clone();
|
||||||
|
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
Box::pin(async move {
|
||||||
|
let _ = item.handle(msgs, &bus_clone).await;
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let bus_clone = bus.clone();
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
Box::pin(async move {
|
||||||
|
let _ = item.sync(&bus_clone).await;
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
println!("[EXIT] LocalBatchedAsync<{}>", std::any::type_name::<M>());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalBatchedAsync<M: Message> {
|
||||||
|
tx: mpsc::Sender<M>,
|
||||||
|
stats: Arc<LocalBatchedStats>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalBatchedAsync<M>
|
||||||
|
where
|
||||||
|
T: LocalAsyncBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
type Entry = LocalBatchedAsyncSubscriber<T, M>;
|
||||||
|
type Config = LocalBatchedConfig;
|
||||||
|
|
||||||
|
fn build(cfg: Self::Config) -> Self::Entry {
|
||||||
|
LocalBatchedAsyncSubscriber {
|
||||||
|
cfg,
|
||||||
|
_m: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> TypedReceiver<M> for LocalBatchedAsync<M> {
|
||||||
|
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
match self.tx.poll_ready(ctx) {
|
||||||
|
Poll::Ready(_) => Poll::Ready(()),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||||
|
match self.tx.try_send(m) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> ReceiverTrait for LocalBatchedAsync<M> {
|
||||||
|
fn typed(&self) -> AnyReceiver<'_> {
|
||||||
|
AnyReceiver::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn type_id(&self) -> TypeId {
|
||||||
|
TypeId::of::<Self>()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&self) {
|
||||||
|
self.tx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stats(&self) -> ReceiverStats {
|
||||||
|
ReceiverStats {
|
||||||
|
name: std::any::type_name::<M>().into(),
|
||||||
|
fields: vec![
|
||||||
|
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||||
|
(
|
||||||
|
"buffer_total".into(),
|
||||||
|
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||||
|
),
|
||||||
|
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
|
||||||
|
(
|
||||||
|
"batch_size".into(),
|
||||||
|
self.stats.batch_size.load(Ordering::SeqCst),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync(&self) {
|
||||||
|
self.tx.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
33
src/receivers/local_batched/mod.rs
Normal file
33
src/receivers/local_batched/mod.rs
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
mod r#async;
|
||||||
|
mod sync;
|
||||||
|
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
|
|
||||||
|
pub use sync::{LocalBatchedSync, LocalBatchedSyncSubscriber};
|
||||||
|
|
||||||
|
pub use r#async::{LocalBatchedAsync, LocalBatchedAsyncSubscriber};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LocalBatchedStats {
|
||||||
|
pub buffer: AtomicU64,
|
||||||
|
pub buffer_total: AtomicU64,
|
||||||
|
pub batch: AtomicU64,
|
||||||
|
pub batch_size: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct LocalBatchedConfig {
|
||||||
|
pub buffer_size: usize,
|
||||||
|
pub batch_size: usize,
|
||||||
|
pub when_ready: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for LocalBatchedConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
buffer_size: 4,
|
||||||
|
batch_size: 16,
|
||||||
|
when_ready: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
193
src/receivers/local_batched/sync.rs
Normal file
193
src/receivers/local_batched/sync.rs
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
use super::{LocalBatchedConfig, LocalBatchedStats};
|
||||||
|
use crate::{
|
||||||
|
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||||
|
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||||
|
Bus, LocalBatchHandler, Message, Untyped,
|
||||||
|
};
|
||||||
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
|
use futures::{Future, StreamExt};
|
||||||
|
use std::{
|
||||||
|
any::TypeId,
|
||||||
|
marker::PhantomData,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct LocalBatchedSyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
cfg: LocalBatchedConfig,
|
||||||
|
_m: PhantomData<(M, T)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriber<T> for LocalBatchedSyncSubscriber<T, M>
|
||||||
|
where
|
||||||
|
T: LocalBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
fn subscribe(
|
||||||
|
self,
|
||||||
|
) -> (
|
||||||
|
Arc<dyn ReceiverTrait>,
|
||||||
|
Box<
|
||||||
|
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
let cfg = self.cfg;
|
||||||
|
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||||
|
let stats = Arc::new(LocalBatchedStats {
|
||||||
|
buffer: AtomicU64::new(0),
|
||||||
|
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||||
|
batch: AtomicU64::new(0),
|
||||||
|
batch_size: AtomicU64::new(cfg.batch_size as _),
|
||||||
|
});
|
||||||
|
|
||||||
|
let arc = Arc::new(LocalBatchedSync::<M> {
|
||||||
|
tx,
|
||||||
|
stats: stats.clone(),
|
||||||
|
});
|
||||||
|
let poller = Box::new(move |ut| {
|
||||||
|
Box::new(move |bus| {
|
||||||
|
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||||
|
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||||
|
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||||
|
});
|
||||||
|
|
||||||
|
(arc, poller)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn buffer_unordered_poller<T, M>(
|
||||||
|
rx: mpsc::Receiver<M>,
|
||||||
|
bus: Bus,
|
||||||
|
ut: Untyped,
|
||||||
|
stats: Arc<LocalBatchedStats>,
|
||||||
|
cfg: LocalBatchedConfig,
|
||||||
|
) where
|
||||||
|
T: LocalBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
let ut = ut.downcast_local::<T>().unwrap();
|
||||||
|
|
||||||
|
let rx = rx.inspect(|_| {
|
||||||
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut rx = if cfg.when_ready {
|
||||||
|
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||||
|
} else {
|
||||||
|
rx.chunks(cfg.batch_size).right_stream()
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(msgs) = rx.next().await {
|
||||||
|
stats.batch.fetch_sub(msgs.len() as _, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let bus_clone = bus.clone();
|
||||||
|
let ut = ut.clone();
|
||||||
|
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
let _ = item.handle(msgs, &bus_clone);
|
||||||
|
|
||||||
|
Box::pin(async move {})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let ut = ut.clone();
|
||||||
|
let bus_clone = bus.clone();
|
||||||
|
ut.spawn_local(move |item| {
|
||||||
|
let _ = item.sync(&bus_clone);
|
||||||
|
Box::pin(async move {})
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
println!("[EXIT] LocalBatchedSync<{}>", std::any::type_name::<M>());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct LocalBatchedSync<M: Message> {
|
||||||
|
tx: mpsc::Sender<M>,
|
||||||
|
stats: Arc<LocalBatchedStats>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> ReceiverSubscriberBuilder<M, T> for LocalBatchedSync<M>
|
||||||
|
where
|
||||||
|
T: LocalBatchHandler<M> + 'static,
|
||||||
|
M: Message,
|
||||||
|
{
|
||||||
|
type Entry = LocalBatchedSyncSubscriber<T, M>;
|
||||||
|
type Config = LocalBatchedConfig;
|
||||||
|
|
||||||
|
fn build(cfg: Self::Config) -> Self::Entry {
|
||||||
|
LocalBatchedSyncSubscriber {
|
||||||
|
cfg,
|
||||||
|
_m: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> TypedReceiver<M> for LocalBatchedSync<M> {
|
||||||
|
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
match self.tx.poll_ready(ctx) {
|
||||||
|
Poll::Ready(_) => Poll::Ready(()),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||||
|
match self.tx.try_send(m) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> ReceiverTrait for LocalBatchedSync<M> {
|
||||||
|
fn typed(&self) -> AnyReceiver<'_> {
|
||||||
|
AnyReceiver::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn type_id(&self) -> TypeId {
|
||||||
|
TypeId::of::<Self>()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stats(&self) -> ReceiverStats {
|
||||||
|
ReceiverStats {
|
||||||
|
name: std::any::type_name::<M>().into(),
|
||||||
|
fields: vec![
|
||||||
|
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||||
|
(
|
||||||
|
"buffer_total".into(),
|
||||||
|
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||||
|
),
|
||||||
|
("batch".into(), self.stats.batch.load(Ordering::SeqCst)),
|
||||||
|
(
|
||||||
|
"batch_size".into(),
|
||||||
|
self.stats.batch_size.load(Ordering::SeqCst),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&self) {
|
||||||
|
self.tx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync(&self) {
|
||||||
|
self.tx.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,7 @@
|
|||||||
mod buffer_unordered;
|
mod buffer_unordered;
|
||||||
mod buffer_unordered_batched;
|
mod buffer_unordered_batched;
|
||||||
|
mod local;
|
||||||
|
mod local_batched;
|
||||||
mod mpsc_futures;
|
mod mpsc_futures;
|
||||||
mod synchronize_batched;
|
mod synchronize_batched;
|
||||||
mod synchronized;
|
mod synchronized;
|
||||||
@ -14,8 +16,8 @@ pub use buffer_unordered::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub use buffer_unordered_batched::{
|
pub use buffer_unordered_batched::{
|
||||||
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig,
|
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber,
|
||||||
BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
|
BufferUnorderedBatchedConfig, BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use synchronized::{
|
pub use synchronized::{
|
||||||
@ -27,3 +29,10 @@ pub use synchronize_batched::{
|
|||||||
SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
|
SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
|
||||||
SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
|
SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub use local::{LocalAsync, LocalAsyncSubscriber, LocalConfig, LocalSync, LocalSyncSubscriber};
|
||||||
|
|
||||||
|
pub use local_batched::{
|
||||||
|
LocalBatchedAsync, LocalBatchedAsyncSubscriber, LocalBatchedConfig, LocalBatchedSync,
|
||||||
|
LocalBatchedSyncSubscriber,
|
||||||
|
};
|
||||||
|
@ -99,7 +99,7 @@ impl<T> Stream for Receiver<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,6 @@ use std::{
|
|||||||
|
|
||||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
use futures::{Future, StreamExt};
|
use futures::{Future, StreamExt};
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use super::{SynchronizeBatchedConfig, SynchronizeBatchedStats};
|
use super::{SynchronizeBatchedConfig, SynchronizeBatchedStats};
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -78,20 +77,17 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: AsyncBatchSynchronizedHandler<M> + 'static,
|
T: AsyncBatchSynchronizedHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
let ut = ut.downcast_send::<T>().unwrap();
|
||||||
|
|
||||||
let rx = rx
|
let rx = rx.inspect(|_| {
|
||||||
.inspect(|_|{
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
});
|
||||||
});
|
|
||||||
|
|
||||||
let mut rx = if cfg.when_ready {
|
let mut rx = if cfg.when_ready {
|
||||||
rx.ready_chunks(cfg.batch_size)
|
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||||
.left_stream()
|
|
||||||
} else {
|
} else {
|
||||||
rx.chunks(cfg.batch_size)
|
rx.chunks(cfg.batch_size).right_stream()
|
||||||
.right_stream()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
while let Some(msgs) = rx.next().await {
|
while let Some(msgs) = rx.next().await {
|
||||||
@ -101,8 +97,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
|
|
||||||
let res =
|
let res =
|
||||||
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await })
|
tokio::task::spawn(async move { ut.lock().await.handle(msgs, &bus_clone).await }).await;
|
||||||
.await;
|
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
|
@ -6,7 +6,7 @@ use crate::{
|
|||||||
BatchSynchronizedHandler, Bus, Message, Untyped,
|
BatchSynchronizedHandler, Bus, Message, Untyped,
|
||||||
};
|
};
|
||||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
use futures::{Future, StreamExt};
|
use futures::{executor::block_on, Future, StreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
any::TypeId,
|
any::TypeId,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
@ -17,7 +17,6 @@ use std::{
|
|||||||
},
|
},
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
pub struct SynchronizeBatchedSyncSubscriber<T, M>
|
pub struct SynchronizeBatchedSyncSubscriber<T, M>
|
||||||
where
|
where
|
||||||
@ -75,20 +74,17 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: BatchSynchronizedHandler<M> + 'static,
|
T: BatchSynchronizedHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
let ut = ut.downcast_send::<T>().unwrap();
|
||||||
|
|
||||||
let rx = rx
|
let rx = rx.inspect(|_| {
|
||||||
.inspect(|_|{
|
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
stats.batch.fetch_add(1, Ordering::Relaxed);
|
||||||
stats.batch.fetch_add(1, Ordering::Relaxed);
|
});
|
||||||
});
|
|
||||||
|
|
||||||
let mut rx = if cfg.when_ready {
|
let mut rx = if cfg.when_ready {
|
||||||
rx.ready_chunks(cfg.batch_size)
|
rx.ready_chunks(cfg.batch_size).left_stream()
|
||||||
.left_stream()
|
|
||||||
} else {
|
} else {
|
||||||
rx.chunks(cfg.batch_size)
|
rx.chunks(cfg.batch_size).right_stream()
|
||||||
.right_stream()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
while let Some(msgs) = rx.next().await {
|
while let Some(msgs) = rx.next().await {
|
||||||
@ -98,10 +94,10 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
|
|
||||||
let res = tokio::task::spawn_blocking(move || {
|
let res = tokio::task::spawn_blocking(move || {
|
||||||
let mut uut = futures::executor::block_on(ut.lock());
|
let mut uut = block_on(ut.lock());
|
||||||
|
|
||||||
uut.handle(msgs, &bus_clone)
|
uut.handle(msgs, &bus_clone)
|
||||||
}).await;
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
@ -113,10 +109,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
|
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let res = tokio::task::spawn_blocking(move || {
|
let res = tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await;
|
||||||
futures::executor::block_on(ut.lock()).sync(&bus_clone)
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
|
@ -11,7 +11,6 @@ use std::{
|
|||||||
|
|
||||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||||
use futures::{Future, StreamExt};
|
use futures::{Future, StreamExt};
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use super::{SynchronizedConfig, SynchronizedStats};
|
use super::{SynchronizedConfig, SynchronizedStats};
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -76,10 +75,13 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: AsyncSynchronizedHandler<M> + 'static,
|
T: AsyncSynchronizedHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
let ut = ut.downcast_send::<T>().unwrap();
|
||||||
let mut x = rx.then(|msg| {
|
let ut1 = ut.clone();
|
||||||
let bus = bus.clone();
|
let bus1 = bus.clone();
|
||||||
let ut = ut.clone();
|
|
||||||
|
let mut x = rx.then(move |msg| {
|
||||||
|
let bus = bus1.clone();
|
||||||
|
let ut = ut1.clone();
|
||||||
|
|
||||||
tokio::task::spawn(async move { ut.lock().await.handle(msg, &bus).await })
|
tokio::task::spawn(async move { ut.lock().await.handle(msg, &bus).await })
|
||||||
});
|
});
|
||||||
|
@ -11,7 +11,6 @@ use std::{
|
|||||||
},
|
},
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||||
@ -74,12 +73,15 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
T: SynchronizedHandler<M> + 'static,
|
T: SynchronizedHandler<M> + 'static,
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
let ut = ut.downcast_send::<T>().unwrap();
|
||||||
let mut x = rx.then(|msg| {
|
let mut x = rx.then(|msg| {
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus = bus.clone();
|
let bus = bus.clone();
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus))
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let mut uut = block_on(ut.lock());
|
||||||
|
uut.handle(msg, &bus)
|
||||||
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
while let Some(err) = x.next().await {
|
while let Some(err) = x.next().await {
|
||||||
@ -95,10 +97,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
|
|
||||||
let ut = ut.clone();
|
let ut = ut.clone();
|
||||||
let bus_clone = bus.clone();
|
let bus_clone = bus.clone();
|
||||||
let res = tokio::task::spawn_blocking(move || {
|
let res = tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await;
|
||||||
futures::executor::block_on(ut.lock()).sync(&bus_clone)
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
@ -107,10 +106,7 @@ async fn buffer_unordered_poller<T, M>(
|
|||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
println!(
|
println!("[EXIT] BufferUnorderedSync<{}>", std::any::type_name::<M>());
|
||||||
"[EXIT] BufferUnorderedSync<{}>",
|
|
||||||
std::any::type_name::<M>()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SynchronizedSync<M: Message> {
|
pub struct SynchronizedSync<M: Message> {
|
||||||
|
174
src/untyped.rs
Normal file
174
src/untyped.rs
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
use core::any::Any;
|
||||||
|
use core::future::Future;
|
||||||
|
use core::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Lock<'a, T> {
|
||||||
|
ReadOnly(&'a T),
|
||||||
|
RwRead(RwLockReadGuard<'a, T>),
|
||||||
|
RwWrite(RwLockWriteGuard<'a, T>),
|
||||||
|
WriteOnly(MutexGuard<'a, T>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Lock<'a, T> {
|
||||||
|
pub fn get_ref(&self) -> &T {
|
||||||
|
match self {
|
||||||
|
Lock::ReadOnly(inner) => &inner,
|
||||||
|
Lock::RwRead(inner) => &inner,
|
||||||
|
Lock::RwWrite(inner) => &inner,
|
||||||
|
Lock::WriteOnly(inner) => &inner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_mut(&mut self) -> &mut T {
|
||||||
|
match self {
|
||||||
|
Lock::ReadOnly(_) => panic!("!!"),
|
||||||
|
Lock::RwRead(_) => panic!("!!"),
|
||||||
|
Lock::RwWrite(inner) => &mut *inner,
|
||||||
|
Lock::WriteOnly(inner) => &mut *inner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum Downcasted<T> {
|
||||||
|
ReadOnly(Arc<T>),
|
||||||
|
ReadWrite(Arc<RwLock<T>>),
|
||||||
|
WriteOnly(Arc<Mutex<T>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for Downcasted<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
match self {
|
||||||
|
Downcasted::ReadOnly(inner) => Downcasted::ReadOnly(inner.clone()),
|
||||||
|
Downcasted::ReadWrite(inner) => Downcasted::ReadWrite(inner.clone()),
|
||||||
|
Downcasted::WriteOnly(inner) => Downcasted::WriteOnly(inner.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> Downcasted<T> {
|
||||||
|
pub async fn lock_read(&self) -> Lock<'_, T> {
|
||||||
|
match self {
|
||||||
|
Downcasted::ReadOnly(inner) => Lock::ReadOnly(&inner),
|
||||||
|
Downcasted::ReadWrite(inner) => Lock::RwRead(inner.read().await),
|
||||||
|
Downcasted::WriteOnly(inner) => Lock::WriteOnly(inner.lock().await),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn lock_write(&self) -> Lock<'_, T> {
|
||||||
|
match self {
|
||||||
|
Downcasted::ReadOnly(_) => unimplemented!(),
|
||||||
|
Downcasted::ReadWrite(inner) => Lock::RwWrite(inner.write().await),
|
||||||
|
Downcasted::WriteOnly(inner) => Lock::WriteOnly(inner.lock().await),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Untyped {
|
||||||
|
inner: Arc<dyn Any + Send + Sync>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Untyped {
|
||||||
|
pub fn new_readonly<T: Send + Sync + 'static>(item: T) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(item),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_rwlock<T: Send + Sync + 'static>(item: T) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(RwLock::new(item)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_mutex<T: Send + 'static>(item: T) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(Mutex::new(item)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_local<T: 'static, F: FnOnce() -> T + Send + 'static>(f: F) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(ThreadDedicated::new(f)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn downcast_sync<T: Send + Sync + 'static>(self) -> Option<Downcasted<T>> {
|
||||||
|
let item = match self.inner.clone().downcast::<RwLock<T>>() {
|
||||||
|
Ok(inner) => Downcasted::ReadWrite(inner),
|
||||||
|
Err(_) => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn downcast_send1<T: Send + 'static>(self) -> Option<Downcasted<T>> {
|
||||||
|
let item = match self.inner.clone().downcast::<Mutex<T>>() {
|
||||||
|
Ok(inner) => Downcasted::WriteOnly(inner),
|
||||||
|
Err(_) => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn downcast_send<T: Send + 'static>(self) -> Option<Arc<Mutex<T>>> {
|
||||||
|
self.inner.clone().downcast::<Mutex<T>>().ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn downcast_local<T: 'static>(self) -> Option<Arc<ThreadDedicated<T>>> {
|
||||||
|
self.inner.clone().downcast::<ThreadDedicated<T>>().ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ThreadDedicated<T: 'static> {
|
||||||
|
sender: mpsc::Sender<
|
||||||
|
Box<dyn for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send>,
|
||||||
|
>,
|
||||||
|
notify: Arc<Notify>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> ThreadDedicated<T> {
|
||||||
|
pub fn new<F: FnOnce() -> T + Send + 'static>(builder: F) -> Self {
|
||||||
|
let notify = Arc::new(Notify::new());
|
||||||
|
let (sender, mut receiver) = mpsc::channel(1);
|
||||||
|
|
||||||
|
let sender: mpsc::Sender<
|
||||||
|
Box<dyn for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send>,
|
||||||
|
> = sender;
|
||||||
|
let notify_clone = notify.clone();
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
futures::executor::block_on(async move {
|
||||||
|
let mut obj = builder();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let cb = match receiver.recv().await {
|
||||||
|
Some(x) => x,
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
cb(&mut obj).await;
|
||||||
|
notify_clone.notify_one();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
Self { sender, notify }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn spawn_local<
|
||||||
|
F: for<'a> FnOnce(&'a mut T) -> Pin<Box<dyn Future<Output = ()> + 'a>> + Send + 'static,
|
||||||
|
>(
|
||||||
|
&self,
|
||||||
|
cb: F,
|
||||||
|
) {
|
||||||
|
self.sender.send(Box::new(cb)).await.ok().unwrap();
|
||||||
|
|
||||||
|
self.notify.notified().await
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user