Request/Response

This commit is contained in:
Andrey Tkachenko 2021-06-22 18:43:06 +04:00
parent cf0a50a445
commit db1d22df7f
12 changed files with 639 additions and 372 deletions

View File

@ -11,17 +11,16 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "io-util", "sync"] } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
parking_lot = "0.11.1" parking_lot = "0.11.1"
async-trait = "0.1.42" async-trait = "0.1.42"
futures = "0.3.8" futures = "0.3.8"
anyhow = "1.0.34" anyhow = "1.0.34"
crossbeam = "0.8.1"
uuid = "0.8.2"
tokio-util = "0.6.7" tokio-util = "0.6.7"
async-stream = "0.3.2" async-stream = "0.3.2"
smallvec = "1.6.1" smallvec = "1.6.1"
log = "0.4.14" log = "0.4.14"
sharded-slab = "0.1.1"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] } tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] }

View File

@ -1,79 +0,0 @@
use messagebus::{receivers, Bus, Handler, Result as MbusResult};
struct TmpReceiver;
struct TmpReceiver2;
impl Handler<f32> for TmpReceiver {
fn handle(&self, msg: f32, bus: &Bus) -> MbusResult {
bus.try_send(1u16).unwrap();
println!("---> f32 {}", msg);
Ok(())
}
}
impl Handler<u16> for TmpReceiver {
fn handle(&self, msg: u16, bus: &Bus) -> MbusResult {
bus.try_send(1u32).unwrap();
println!("---> u16 {}", msg);
Ok(())
}
}
impl Handler<u32> for TmpReceiver {
fn handle(&self, msg: u32, bus: &Bus) -> MbusResult {
bus.try_send(2i32).unwrap();
println!("---> u32 {}", msg);
Ok(())
}
}
impl Handler<i32> for TmpReceiver {
fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
bus.try_send(3i16).unwrap();
println!("---> i32 {}", msg);
Ok(())
}
}
impl Handler<i16> for TmpReceiver {
fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> i16 {}", msg);
Ok(())
}
}
impl Handler<i32> for TmpReceiver2 {
fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
bus.try_send(3i16).unwrap();
println!("---> 2 i32 {}", msg);
Ok(())
}
}
impl Handler<i16> for TmpReceiver2 {
fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> 2 i16 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>>(Default::default())
.done()
.register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>>(Default::default())
.done()
.build();
b.send(32f32).await.unwrap();
poller.await
}

View File

@ -144,10 +144,10 @@ async fn main() {
.subscribe::<i16, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<i16, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.done() .done()
.register(TmpReceiver2) .register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default()) .subscribe::<i16, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default())
.done() .done()
.build(); .build();
b.send(0f32).await.unwrap(); b.send(0f32).await.unwrap();

View File

@ -1,5 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{receivers, Bus, AsyncHandler}; use messagebus::{receivers, AsyncHandler, Bus};
struct TmpReceiver; struct TmpReceiver;
@ -22,10 +22,13 @@ impl AsyncHandler<f32> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(1, receivers::BufferUnorderedConfig { .subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(
buffer_size: 1, 1,
max_parallel: 1, receivers::BufferUnorderedConfig {
}) buffer_size: 1,
max_parallel: 1,
},
)
.done() .done()
.build(); .build();

172
examples/demo_req_resp.rs Normal file
View File

@ -0,0 +1,172 @@
use async_trait::async_trait;
use messagebus::{receivers, AsyncHandler, Bus};
struct TmpReceiver1;
struct TmpReceiver2;
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver1 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus.request::<_, f32>(10i16, Default::default()).await?;
let resp2 = bus.request::<_, f32>(20u16, Default::default()).await?;
Ok(msg as f32 + resp1 + resp2)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 i32: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<u32> for TmpReceiver1 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: u32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 u32: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i16> for TmpReceiver1 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: i16, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus.request::<_, f32>(1i8, Default::default()).await?;
let resp2 = bus.request::<_, f32>(2u8, Default::default()).await?;
Ok(msg as f32 + resp1 + resp2)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver i16: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<u16> for TmpReceiver1 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: u16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver i16: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i8> for TmpReceiver1 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: i8, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 i8: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<u8> for TmpReceiver1 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: u8, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 u8: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<f64> for TmpReceiver2 {
type Error = anyhow::Error;
type Response = f64;
async fn handle(&self, msg: f64, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus.request::<_, f32>(100i32, Default::default()).await? as f64;
let resp2 = bus.request::<_, f32>(200u32, Default::default()).await? as f64;
let resp3 = bus.request::<_, f32>(300f32, Default::default()).await? as f64;
Ok(msg + resp1 + resp2 + resp3)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 f64: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver2 {
type Error = anyhow::Error;
type Response = f32;
async fn handle(&self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver2: f32: sync");
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver1)
.subscribe::<i32, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.subscribe::<i8, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.subscribe::<u8, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.done()
.register(TmpReceiver2)
.subscribe::<f32, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default())
.subscribe::<f64, receivers::BufferUnorderedAsync<_, f64>, _, _>(8, Default::default())
.done()
.build();
println!(
"{:?}",
b.request::<_, f64>(1000f64, Default::default()).await
);
println!("flush");
b.flush().await;
println!("close");
b.close().await;
poller.await;
println!("[done]");
}

View File

@ -3,19 +3,31 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync
use futures::{Future, FutureExt}; use futures::{Future, FutureExt};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::{Bus, BusInner, Message, Untyped, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}}; use crate::{
receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
Bus, BusInner, Message, Untyped,
};
pub trait ReceiverSubscriberBuilder<T, M, R, E>:
pub trait ReceiverSubscriberBuilder<T, M, R, E>: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
where where
T: 'static, T: 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
type Config: Default; type Config: Default;
fn build(cfg: Self::Config) -> (Self, Box<dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>) where Self: Sized; fn build(
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
)
where
Self: Sized;
} }
pub struct SyncEntry; pub struct SyncEntry;
@ -35,7 +47,7 @@ pub struct RegisterEntry<K, T> {
) )
-> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>, >,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
)>, )>,
>, >,
_m: PhantomData<(K, T)>, _m: PhantomData<(K, T)>,
@ -68,7 +80,7 @@ impl<T> RegisterEntry<UnsyncEntry, T> {
{ {
let (inner, poller) = S::build(cfg); let (inner, poller) = S::build(cfg);
let receiver = Receiver::new(queue, inner); let receiver = Receiver::new::<M, R, E, S>(queue, inner);
let poller2 = receiver.start_polling_events::<R, E>(); let poller2 = receiver.start_polling_events::<R, E>();
self.receivers self.receivers
.entry(TypeId::of::<M>()) .entry(TypeId::of::<M>())
@ -90,7 +102,7 @@ impl<T> RegisterEntry<SyncEntry, T> {
{ {
let (inner, poller) = S::build(cfg); let (inner, poller) = S::build(cfg);
let receiver = Receiver::new(queue, inner); let receiver = Receiver::new::<M, R, E, S>(queue, inner);
let poller2 = receiver.start_polling_events::<R, E>(); let poller2 = receiver.start_polling_events::<R, E>();
self.receivers self.receivers
.entry(TypeId::of::<M>()) .entry(TypeId::of::<M>())

View File

@ -106,7 +106,6 @@ pub trait LocalAsyncHandler<M: Message> {
} }
} }
pub trait LocalBatchHandler<M: Message> { pub trait LocalBatchHandler<M: Message> {
type Error: crate::Error; type Error: crate::Error;
type Response: Message; type Response: Message;

View File

@ -9,24 +9,30 @@ mod trait_object;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
use anyhow::bail;
use builder::BusBuilder; use builder::BusBuilder;
pub use envelop::Message; pub use envelop::Message;
use futures::{Future, FutureExt, future::poll_fn};
pub use handler::*; pub use handler::*;
pub use receiver::SendError; pub use receiver::SendError;
use receiver::{Receiver, ReceiverStats}; use receiver::{Receiver, ReceiverStats};
use smallvec::SmallVec; use smallvec::SmallVec;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::oneshot;
use core::any::{Any, TypeId};
use std::{collections::HashMap, sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}};
use crate::receiver::Permit; use crate::receiver::Permit;
use core::any::{Any, TypeId};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
pub type Untyped = Arc<dyn Any + Send + Sync>; pub type Untyped = Arc<dyn Any + Send + Sync>;
// pub trait ErrorTrait: std::error::Error + Send + Sync + 'static {} // pub trait ErrorTrait: std::error::Error + Send + Sync + 'static {}
pub trait Error: Into<anyhow::Error> + Send + Sync + 'static {} pub trait Error: Into<anyhow::Error> + Send + Sync + 'static {}
impl <T: Into<anyhow::Error> + Send + Sync + 'static> Error for T {} impl<T: Into<anyhow::Error> + Send + Sync + 'static> Error for T {}
static ID_COUNTER: AtomicU64 = AtomicU64::new(1); static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
@ -54,7 +60,8 @@ impl BusInner {
let mut receivers = HashMap::new(); let mut receivers = HashMap::new();
for (key, value) in input { for (key, value) in input {
receivers.entry(key) receivers
.entry(key)
.or_insert_with(SmallVec::new) .or_insert_with(SmallVec::new)
.push(value); .push(value);
} }
@ -83,10 +90,9 @@ impl BusInner {
iters += 1; iters += 1;
let mut flushed = false; let mut flushed = false;
for (_, rs) in &self.receivers { for (_, rs) in &self.receivers {
for r in rs { for r in rs {
if r.need_flush() { if r.need_flush() {
flushed = true; flushed = true;
r.flush().await; r.flush().await;
} }
} }
@ -99,11 +105,14 @@ impl BusInner {
} }
if !breaked { if !breaked {
warn!("!!! WARNING: unable to reach equilibrium in {} iterations !!!", fuse_count); warn!(
"!!! WARNING: unable to reach equilibrium in {} iterations !!!",
fuse_count
);
} else { } else {
info!("flushed in {} iterations !!!", iters); info!("flushed in {} iterations !!!", iters);
} }
} }
pub async fn flash_and_sync(&self) { pub async fn flash_and_sync(&self) {
self.flush().await; self.flush().await;
@ -122,7 +131,7 @@ impl BusInner {
// .map(|r| r.stats()) // .map(|r| r.stats())
// } // }
fn try_reserve(&self, rs: &[Receiver]) -> Option<SmallVec::<[Permit; 32]>> { fn try_reserve(&self, rs: &[Receiver]) -> Option<SmallVec<[Permit; 32]>> {
let mut permits = SmallVec::<[Permit; 32]>::new(); let mut permits = SmallVec::<[Permit; 32]>::new();
for r in rs { for r in rs {
@ -135,13 +144,17 @@ impl BusInner {
Some(permits) Some(permits)
} }
#[inline] #[inline]
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> { pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
self.try_send_ext(msg, SendOptions::Broadcast) self.try_send_ext(msg, SendOptions::Broadcast)
} }
pub fn try_send_ext<M: Message>(&self, msg: M, _options: SendOptions) -> core::result::Result<(), SendError<M>> { pub fn try_send_ext<M: Message>(
&self,
msg: M,
_options: SendOptions,
) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) { if self.closed.load(Ordering::SeqCst) {
warn!("Bus closed. Skipping send!"); warn!("Bus closed. Skipping send!");
return Ok(()); return Ok(());
@ -174,7 +187,10 @@ impl BusInner {
} }
} }
warn!("Unhandled message {:?}: no receivers", core::any::type_name::<M>()); warn!(
"Unhandled message {:?}: no receivers",
core::any::type_name::<M>()
);
Ok(()) Ok(())
} }
@ -185,16 +201,24 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn send_blocking_ext<M: Message>(&self, msg: M, options: SendOptions) -> core::result::Result<(), SendError<M>> { pub fn send_blocking_ext<M: Message>(
&self,
msg: M,
options: SendOptions,
) -> core::result::Result<(), SendError<M>> {
futures::executor::block_on(self.send_ext(msg, options)) futures::executor::block_on(self.send_ext(msg, options))
} }
#[inline] #[inline]
pub async fn send<M: Message>(&self, msg: M, ) -> core::result::Result<(), SendError<M>> { pub async fn send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
self.send_ext(msg, SendOptions::Broadcast).await self.send_ext(msg, SendOptions::Broadcast).await
} }
pub async fn send_ext<M: Message>(&self, msg: M, _options: SendOptions) -> core::result::Result<(), SendError<M>> { pub async fn send_ext<M: Message>(
&self,
msg: M,
_options: SendOptions,
) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) { if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg)); return Err(SendError::Closed(msg));
} }
@ -214,7 +238,10 @@ impl BusInner {
} }
} }
warn!("Unhandled message {:?}: no receivers", core::any::type_name::<M>()); warn!(
"Unhandled message {:?}: no receivers",
core::any::type_name::<M>()
);
Ok(()) Ok(())
} }
@ -224,7 +251,11 @@ impl BusInner {
self.force_send_ext(msg, SendOptions::Broadcast) self.force_send_ext(msg, SendOptions::Broadcast)
} }
pub fn force_send_ext<M: Message>(&self, msg: M, _options: SendOptions) -> core::result::Result<(), SendError<M>> { pub fn force_send_ext<M: Message>(
&self,
msg: M,
_options: SendOptions,
) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) { if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg)); return Err(SendError::Closed(msg));
} }
@ -244,32 +275,56 @@ impl BusInner {
} }
} }
warn!("Unhandled message {:?}: no receivers", core::any::type_name::<M>()); warn!(
"Unhandled message {:?}: no receivers",
core::any::type_name::<M>()
);
Ok(()) Ok(())
} }
// pub fn request<M: Message, R: Message>(&self, req: M, options: SendOptions) -> impl Future<Output = anyhow::Result<R>> { pub async fn request<M: Message, R: Message>(
// let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); &self,
// let tid = TypeId::of::<M>(); req: M,
// let rid = TypeId::of::<R>(); options: SendOptions,
) -> anyhow::Result<R> {
let tid = TypeId::of::<M>();
let rid = TypeId::of::<R>();
// let mut iter = self.select_receivers(options, Some(rid)); let mut iter = self.select_receivers(tid, options, Some(rid));
// let first = iter.next(); if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
rc.send(mid, rc.reserve().await, req)?;
// for rs in iter { Ok(rx.await?)
// let _ = rs.send(mid, rs.reserve().await, req.clone()); } else {
// } bail!("No Receivers!");
}
// first.send(mid, first.reserve().await, req); }
// let (tx, rx) = tokio::sync::oneshot::channel(); #[inline]
// self.response_waiters.insert(mid, tx); fn select_receivers(
&self,
tid: TypeId,
_options: SendOptions,
rid: Option<TypeId>,
) -> impl Iterator<Item = &Receiver> + '_ {
self.receivers
.get(&tid)
.into_iter()
.map(|item| item.iter())
.flatten()
.filter(move |x| {
let ret_ty = if let Some(rid) = rid {
x.resp_type_id() == rid
} else {
true
};
// poll_fn(move |cx| { ret_ty
// rx.poll_unpin(cx) })
// }) }
// }
} }
#[derive(Clone)] #[derive(Clone)]

View File

@ -1,10 +1,30 @@
use crate::{Bus, Error, Message, msgs, trait_object::TraitObject}; use crate::{msgs, trait_object::TraitObject, Bus, Error, Message};
use core::{any::TypeId, fmt, marker::PhantomData, mem, pin::Pin, task::{Context, Poll}}; use core::{
any::TypeId,
fmt,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
use futures::future::poll_fn; use futures::future::poll_fn;
use tokio::sync::Notify;
use std::{borrow::Cow, sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}};
use futures::Future; use futures::Future;
use std::{
any::Any,
borrow::Cow,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::{oneshot, Notify};
struct SlabCfg;
impl sharded_slab::Config for SlabCfg {
const RESERVED_BITS: usize = 1;
}
type Slab<T> = sharded_slab::Slab<T, SlabCfg>;
pub trait SendUntypedReceiver: Send + Sync { pub trait SendUntypedReceiver: Send + Sync {
fn send(&self, msg: Action) -> Result<(), SendError<Action>>; fn send(&self, msg: Action) -> Result<(), SendError<Action>>;
@ -14,9 +34,10 @@ pub trait SendTypedReceiver<M: Message>: Sync {
fn send(&self, mid: u64, msg: M) -> Result<(), SendError<M>>; fn send(&self, mid: u64, msg: M) -> Result<(), SendError<M>>;
} }
pub trait ReciveTypedReceiver<M, E>: Sync pub trait ReciveTypedReceiver<M, E>: Sync
where M: Message, where
E: crate::Error M: Message,
E: crate::Error,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>; fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>;
} }
@ -24,7 +45,6 @@ pub trait ReciveTypedReceiver<M, E>: Sync
pub trait ReceiverTrait: Send + Sync { pub trait ReceiverTrait: Send + Sync {
fn typed(&self) -> AnyReceiver<'_>; fn typed(&self) -> AnyReceiver<'_>;
fn poller(&self) -> AnyPoller<'_>; fn poller(&self) -> AnyPoller<'_>;
fn type_id(&self) -> TypeId;
fn stats(&self) -> Result<(), SendError<()>>; fn stats(&self) -> Result<(), SendError<()>>;
fn close(&self) -> Result<(), SendError<()>>; fn close(&self) -> Result<(), SendError<()>>;
fn sync(&self) -> Result<(), SendError<()>>; fn sync(&self) -> Result<(), SendError<()>>;
@ -39,7 +59,6 @@ pub trait PermitDrop {
fn permit_drop(&self); fn permit_drop(&self);
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Stats { pub struct Stats {
pub has_queue: bool, pub has_queue: bool,
@ -75,20 +94,22 @@ pub enum Event<M, E> {
} }
struct ReceiverWrapper<M, R, E, S> struct ReceiverWrapper<M, R, E, S>
where M: Message, where
R: Message, M: Message,
E: Error, R: Message,
S: 'static E: Error,
{ S: 'static,
inner: S, {
_m: PhantomData<(M, R, E)> inner: S,
_m: PhantomData<(M, R, E)>,
} }
impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S> impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S>
where M: Message, where
R: Message, M: Message,
E: Error, R: Message,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static E: Error,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{ {
fn typed(&self) -> AnyReceiver<'_> { fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(&self.inner) AnyReceiver::new(&self.inner)
@ -98,30 +119,26 @@ impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S>
AnyPoller::new(&self.inner) AnyPoller::new(&self.inner)
} }
fn type_id(&self) -> TypeId {
TypeId::of::<S>()
}
fn stats(&self) -> Result<(), SendError<()>> { fn stats(&self) -> Result<(), SendError<()>> {
SendUntypedReceiver::send(&self.inner, Action::Stats).map_err(|_|SendError::Closed(())) SendUntypedReceiver::send(&self.inner, Action::Stats).map_err(|_| SendError::Closed(()))
} }
fn close(&self) -> Result<(), SendError<()>> { fn close(&self) -> Result<(), SendError<()>> {
SendUntypedReceiver::send(&self.inner, Action::Close).map_err(|_|SendError::Closed(())) SendUntypedReceiver::send(&self.inner, Action::Close).map_err(|_| SendError::Closed(()))
} }
fn sync(&self) -> Result<(), SendError<()>> { fn sync(&self) -> Result<(), SendError<()>> {
SendUntypedReceiver::send(&self.inner, Action::Sync).map_err(|_|SendError::Closed(())) SendUntypedReceiver::send(&self.inner, Action::Sync).map_err(|_| SendError::Closed(()))
} }
fn flush(&self) -> Result<(), SendError<()>> { fn flush(&self) -> Result<(), SendError<()>> {
SendUntypedReceiver::send(&self.inner, Action::Flush).map_err(|_|SendError::Closed(())) SendUntypedReceiver::send(&self.inner, Action::Flush).map_err(|_| SendError::Closed(()))
} }
} }
pub struct Permit { pub struct Permit {
pub(crate) fuse: bool, pub(crate) fuse: bool,
pub(crate) inner: Arc<dyn PermitDrop> pub(crate) inner: Arc<dyn PermitDrop>,
} }
impl Drop for Permit { impl Drop for Permit {
@ -167,11 +184,11 @@ pub struct AnyPoller<'a> {
unsafe impl Send for AnyPoller<'_> {} unsafe impl Send for AnyPoller<'_> {}
impl<'a> AnyPoller<'a> { impl<'a> AnyPoller<'a> {
pub fn new<M, E, R>(rcvr: &'a R) -> Self pub fn new<M, E, R>(rcvr: &'a R) -> Self
where where
M: Message, M: Message,
E: crate::Error, E: crate::Error,
R: ReciveTypedReceiver<M, E> + 'static R: ReciveTypedReceiver<M, E> + 'static,
{ {
let trcvr = rcvr as &(dyn ReciveTypedReceiver<M, E>); let trcvr = rcvr as &(dyn ReciveTypedReceiver<M, E>);
@ -182,7 +199,9 @@ impl<'a> AnyPoller<'a> {
} }
} }
pub fn dyn_typed_receiver<M: Message, E: crate::Error>(&'a self) -> &'a dyn ReciveTypedReceiver<M, E> { pub fn dyn_typed_receiver<M: Message, E: crate::Error>(
&'a self,
) -> &'a dyn ReciveTypedReceiver<M, E> {
assert_eq!(self.type_id, TypeId::of::<dyn ReciveTypedReceiver<M, E>>()); assert_eq!(self.type_id, TypeId::of::<dyn ReciveTypedReceiver<M, E>>());
unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) } unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) }
@ -242,6 +261,7 @@ impl fmt::Display for ReceiverStats {
} }
struct ReceiverContext { struct ReceiverContext {
resp_type_id: TypeId,
limit: u64, limit: u64,
processing: AtomicU64, processing: AtomicU64,
need_flush: AtomicBool, need_flush: AtomicBool,
@ -261,6 +281,7 @@ impl PermitDrop for ReceiverContext {
pub struct Receiver { pub struct Receiver {
inner: Arc<dyn ReceiverTrait>, inner: Arc<dyn ReceiverTrait>,
context: Arc<ReceiverContext>, context: Arc<ReceiverContext>,
waiters: Arc<dyn Any + Send + Sync>,
} }
impl fmt::Debug for Receiver { impl fmt::Debug for Receiver {
@ -280,32 +301,38 @@ impl core::cmp::Eq for Receiver {}
impl Receiver { impl Receiver {
#[inline] #[inline]
pub(crate) fn new<M, R, E, S>(limit: u64, inner: S) -> Self pub(crate) fn new<M, R, E, S>(limit: u64, inner: S) -> Self
where M: Message, where
R: Message, M: Message,
E: Error, R: Message,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static E: Error,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{ {
let context = Arc::new(ReceiverContext { Self {
limit, context: Arc::new(ReceiverContext {
processing: AtomicU64::new(0), resp_type_id: TypeId::of::<R>(),
need_flush: AtomicBool::new(false), limit,
flushed: Notify::new(), processing: AtomicU64::new(0),
synchronized: Notify::new(), need_flush: AtomicBool::new(false),
closed: Notify::new(), flushed: Notify::new(),
response: Notify::new(), synchronized: Notify::new(),
statistics: Notify::new(), closed: Notify::new(),
}); response: Notify::new(),
statistics: Notify::new(),
Self { inner: Arc::new(ReceiverWrapper{ }),
inner, inner: Arc::new(ReceiverWrapper {
_m: Default::default() inner,
}), context } _m: Default::default(),
}),
waiters: Arc::new(sharded_slab::Slab::<oneshot::Sender<R>>::new_with_config::<
SlabCfg,
>()),
}
} }
#[inline] #[inline]
pub fn type_id(&self) -> TypeId { pub fn resp_type_id(&self) -> TypeId {
self.inner.type_id() self.context.resp_type_id
} }
#[inline] #[inline]
@ -313,12 +340,16 @@ impl Receiver {
self.context.need_flush.load(Ordering::SeqCst) self.context.need_flush.load(Ordering::SeqCst)
} }
#[inline]
pub async fn reserve(&self) -> Permit { pub async fn reserve(&self) -> Permit {
loop { loop {
let count = self.context.processing.load(Ordering::Relaxed); let count = self.context.processing.load(Ordering::Relaxed);
if count < self.context.limit { if count < self.context.limit {
let res = self.context.processing.compare_exchange(count, count + 1, Ordering::SeqCst, Ordering::SeqCst); let res = self.context.processing.compare_exchange(
count,
count + 1,
Ordering::SeqCst,
Ordering::SeqCst,
);
if res.is_ok() { if res.is_ok() {
break Permit { break Permit {
fuse: false, fuse: false,
@ -328,19 +359,22 @@ impl Receiver {
// continue // continue
} else { } else {
self.context.response.notified() self.context.response.notified().await
.await
} }
} }
} }
#[inline]
pub fn try_reserve(&self) -> Option<Permit> { pub fn try_reserve(&self) -> Option<Permit> {
loop { loop {
let count = self.context.processing.load(Ordering::Relaxed); let count = self.context.processing.load(Ordering::Relaxed);
if count < self.context.limit { if count < self.context.limit {
let res = self.context.processing.compare_exchange(count, count + 1, Ordering::SeqCst, Ordering::SeqCst); let res = self.context.processing.compare_exchange(
count,
count + 1,
Ordering::SeqCst,
Ordering::SeqCst,
);
if res.is_ok() { if res.is_ok() {
break Some(Permit { break Some(Permit {
fuse: false, fuse: false,
@ -356,7 +390,12 @@ impl Receiver {
} }
#[inline] #[inline]
pub fn send<M: Message>(&self, mid: u64, mut permit: Permit, msg: M) -> Result<(), SendError<M>> { pub fn send<M: Message>(
&self,
mid: u64,
mut permit: Permit,
msg: M,
) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed(); let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>(); let receiver = any_receiver.dyn_typed_receiver::<M>();
let res = receiver.send(mid, msg); let res = receiver.send(mid, msg);
@ -374,6 +413,7 @@ impl Receiver {
let any_receiver = self.inner.typed(); let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>(); let receiver = any_receiver.dyn_typed_receiver::<M>();
let res = receiver.send(mid, msg); let res = receiver.send(mid, msg);
self.context.processing.fetch_add(1, Ordering::SeqCst);
if !res.is_err() { if !res.is_err() {
self.context.need_flush.store(true, Ordering::SeqCst); self.context.need_flush.store(true, Ordering::SeqCst);
@ -382,43 +422,75 @@ impl Receiver {
res res
} }
pub fn start_polling_events<M, E>(&self) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> pub fn start_polling_events<R, E>(
where &self,
M: Message, ) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
E: crate::Error where
R: Message,
E: crate::Error,
{ {
let ctx_clone = self.context.clone(); let ctx_clone = self.context.clone();
let inner_clone = self.inner.clone(); let inner_clone = self.inner.clone();
let waiters = self
.waiters
.clone()
.downcast::<Slab<oneshot::Sender<R>>>()
.unwrap();
Box::new(move |bus| Box::pin(async move { Box::new(move |bus| {
let any_receiver = inner_clone.poller(); Box::pin(async move {
let receiver = any_receiver.dyn_typed_receiver::<M, E>(); let any_receiver = inner_clone.poller();
let receiver = any_receiver.dyn_typed_receiver::<R, E>();
loop { loop {
let event = poll_fn(move |ctx| receiver.poll_events(ctx)) let event = poll_fn(move |ctx| receiver.poll_events(ctx)).await;
.await;
match event { match event {
Event::Exited => { Event::Exited => {
ctx_clone.closed.notify_waiters(); ctx_clone.closed.notify_waiters();
break; break;
},
Event::Flushed => ctx_clone.flushed.notify_waiters(),
Event::Synchronized(_res) => ctx_clone.synchronized.notify_waiters(),
Event::Response(_mid, resp) => {
ctx_clone.processing.fetch_sub(1, Ordering::SeqCst);
ctx_clone.response.notify_one();
match resp {
Ok(_msg) => (),
Err(err) => { bus.try_send(msgs::Error(Arc::new(err.into()))).ok(); }
} }
},
_ => unimplemented!() Event::Flushed => ctx_clone.flushed.notify_waiters(),
Event::Synchronized(_res) => ctx_clone.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
ctx_clone.processing.fetch_sub(1, Ordering::SeqCst);
ctx_clone.response.notify_one();
match resp {
Ok(msg) => {
if let Some(waiter) = waiters.take(mid as usize) {
if let Err(_msg) = waiter.send(msg) {
error!("Response cannot be processed!");
}
} else if TypeId::of::<R>() != TypeId::of::<()>() {
warn!("Non-void response has no listeners!");
}
}
Err(err) => {
bus.try_send(msgs::Error(Arc::new(err.into()))).ok();
}
}
}
_ => unimplemented!(),
}
} }
} })
})) })
}
#[inline]
pub(crate) fn add_response_waiter<R: Message>(
&self,
waiter: oneshot::Sender<R>,
) -> Option<usize> {
let idx = self
.waiters
.downcast_ref::<Slab<oneshot::Sender<R>>>()
.unwrap()
.insert(waiter)?;
Some(idx)
} }
// #[inline] // #[inline]
@ -434,8 +506,7 @@ impl Receiver {
#[inline] #[inline]
pub async fn close(&self) { pub async fn close(&self) {
if self.inner.close().is_ok() { if self.inner.close().is_ok() {
self.context.closed.notified() self.context.closed.notified().await
.await
} else { } else {
warn!("close failed!"); warn!("close failed!");
} }
@ -444,8 +515,7 @@ impl Receiver {
#[inline] #[inline]
pub async fn sync(&self) { pub async fn sync(&self) {
if self.inner.sync().is_ok() { if self.inner.sync().is_ok() {
self.context.synchronized.notified() self.context.synchronized.notified().await
.await
} else { } else {
warn!("sync failed!"); warn!("sync failed!");
} }
@ -454,9 +524,8 @@ impl Receiver {
#[inline] #[inline]
pub async fn flush(&self) { pub async fn flush(&self) {
if self.inner.flush().is_ok() { if self.inner.flush().is_ok() {
self.context.flushed.notified() self.context.flushed.notified().await;
.await;
self.context.need_flush.store(false, Ordering::SeqCst); self.context.need_flush.store(false, Ordering::SeqCst);
} else { } else {
warn!("flush failed!"); warn!("flush failed!");

View File

@ -7,9 +7,12 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver}, receivers::Request}; use crate::{
receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver},
receivers::Request,
};
use anyhow::Result; use anyhow::Result;
use futures::{Future, StreamExt, stream::FuturesUnordered}; use futures::{stream::FuturesUnordered, Future, StreamExt};
use super::{BufferUnorderedConfig, BufferUnorderedStats}; use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{ use crate::{
@ -20,7 +23,6 @@ use crate::{
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
fn buffer_unordered_poller<T, M, R, E>( fn buffer_unordered_poller<T, M, R, E>(
mut rx: mpsc::UnboundedReceiver<Request<M>>, mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus, bus: Bus,
@ -33,7 +35,7 @@ where
T: AsyncHandler<M, Response = R, Error = E> + 'static, T: AsyncHandler<M, Response = R, Error = E> + 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
let ut = ut.downcast::<T>().unwrap(); let ut = ut.downcast::<T>().unwrap();
let mut queue = FuturesUnordered::new(); let mut queue = FuturesUnordered::new();
@ -46,33 +48,33 @@ where
if !rx_closed && !need_flush && !need_sync { if !rx_closed && !need_flush && !need_sync {
while queue.len() < cfg.max_parallel { while queue.len() < cfg.max_parallel {
match rx.poll_recv(cx) { match rx.poll_recv(cx) {
Poll::Ready(Some(a)) => { Poll::Ready(Some(a)) => match a {
match a { Request::Request(mid, msg) => {
Request::Request(mid, msg) => { stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.buffer.fetch_sub(1, Ordering::Relaxed); stats.parallel.fetch_add(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
let bus = bus.clone(); let bus = bus.clone();
let ut = ut.clone(); let ut = ut.clone();
queue.push(tokio::task::spawn(async move { (mid, ut.handle(msg, &bus).await) })); queue.push(tokio::task::spawn(async move {
}, (mid, ut.handle(msg, &bus).await)
Request::Action(Action::Flush) => need_flush = true, }));
Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => rx.close(),
_ => unimplemented!()
} }
Request::Action(Action::Flush) => need_flush = true,
Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => rx.close(),
_ => unimplemented!(),
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
need_sync = true; need_sync = true;
rx_closed = true; rx_closed = true;
}, }
Poll::Pending => break, Poll::Pending => break,
} }
} }
} }
let queue_len = queue.len(); let queue_len = queue.len();
loop { loop {
if queue_len != 0 { if queue_len != 0 {
loop { loop {
@ -80,7 +82,7 @@ where
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok((mid, res)))) => { Poll::Ready(Some(Ok((mid, res)))) => {
stx.send(Event::Response(mid, res)).ok(); stx.send(Event::Response(mid, res)).ok();
}, }
Poll::Ready(None) => break, Poll::Ready(None) => break,
_ => {} _ => {}
} }
@ -98,7 +100,7 @@ where
Poll::Pending => { Poll::Pending => {
sync_future = Some(fut); sync_future = Some(fut);
return Poll::Pending; return Poll::Pending;
}, }
Poll::Ready(res) => { Poll::Ready(res) => {
need_sync = false; need_sync = false;
stx.send(Event::Synchronized(res)).ok(); stx.send(Event::Synchronized(res)).ok();
@ -107,9 +109,7 @@ where
} else { } else {
let ut = ut.clone(); let ut = ut.clone();
let bus_clone = bus.clone(); let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move { sync_future.replace(Box::pin(async move { ut.sync(&bus_clone).await }));
ut.sync(&bus_clone).await
}));
} }
} else { } else {
break; break;
@ -117,16 +117,20 @@ where
} }
if queue_len == queue.len() { if queue_len == queue.len() {
return if rx_closed { Poll::Ready(()) } else { Poll::Pending }; return if rx_closed {
Poll::Ready(())
} else {
Poll::Pending
};
} }
}) })
} }
pub struct BufferUnorderedAsync<M, R = (), E = anyhow::Error> pub struct BufferUnorderedAsync<M, R = (), E = anyhow::Error>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedStats>, stats: Arc<BufferUnorderedStats>,
@ -134,16 +138,22 @@ pub struct BufferUnorderedAsync<M, R = (), E = anyhow::Error>
} }
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedAsync<M, R, E> impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedAsync<M, R, E>
where where
T: AsyncHandler<M, Response = R, Error = E> + 'static, T: AsyncHandler<M, Response = R, Error = E> + 'static,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error E: crate::Error,
{ {
type Config = BufferUnorderedConfig; type Config = BufferUnorderedConfig;
fn build(cfg: Self::Config) -> (Self, Box<dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>) fn build(
{ cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let stats = Arc::new(BufferUnorderedStats { let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0), buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _), buffer_total: AtomicU64::new(cfg.buffer_size as _),
@ -157,39 +167,48 @@ impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedAsync<
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M, R, E>(rx, bus, ut, stats_clone, cfg, stx)) Box::pin(buffer_unordered_poller::<T, M, R, E>(
as Pin<Box<dyn Future<Output = ()> + Send>> rx,
bus,
ut,
stats_clone,
cfg,
stx,
)) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
(BufferUnorderedAsync::<M, R, E> { (
tx, BufferUnorderedAsync::<M, R, E> {
stats, tx,
srx: Mutex::new(srx), stats,
}, poller) srx: Mutex::new(srx),
},
poller,
)
} }
} }
impl<M, R, E> SendUntypedReceiver for BufferUnorderedAsync<M, R, E> impl<M, R, E> SendUntypedReceiver for BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
fn send(&self, m: Action) -> Result<(), SendError<Action>> { fn send(&self, m: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(m)) { match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!() _ => unimplemented!(),
} }
} }
} }
impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedAsync<M, R, E> impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -199,16 +218,16 @@ impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedAsync<M, R, E>
Ok(()) Ok(())
} }
Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)), Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!() _ => unimplemented!(),
} }
} }
} }
impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedAsync<M, R, E> impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);
@ -218,4 +237,4 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedAsync<M, R, E>
Poll::Ready(None) => Poll::Ready(Event::Exited), Poll::Ready(None) => Poll::Ready(Event::Exited),
} }
} }
} }

View File

@ -7,20 +7,22 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver}, receivers::Request}; use crate::{
receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver},
receivers::Request,
};
use anyhow::Result; use anyhow::Result;
use futures::{Future, StreamExt, stream::FuturesUnordered}; use futures::{stream::FuturesUnordered, Future, StreamExt};
use super::{BufferUnorderedConfig, BufferUnorderedStats}; use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{ use crate::{
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, receiver::{SendError, SendTypedReceiver},
Handler, Bus, Message, Untyped, Bus, Handler, Message, Untyped,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
fn buffer_unordered_poller<T, M, R, E>( fn buffer_unordered_poller<T, M, R, E>(
mut rx: mpsc::UnboundedReceiver<Request<M>>, mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus, bus: Bus,
@ -33,7 +35,7 @@ where
T: Handler<M, Response = R, Error = E> + 'static, T: Handler<M, Response = R, Error = E> + 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
let ut = ut.downcast::<T>().unwrap(); let ut = ut.downcast::<T>().unwrap();
let mut queue = FuturesUnordered::new(); let mut queue = FuturesUnordered::new();
@ -46,33 +48,33 @@ where
if !rx_closed && !need_flush && !need_sync { if !rx_closed && !need_flush && !need_sync {
while queue.len() < cfg.max_parallel { while queue.len() < cfg.max_parallel {
match rx.poll_recv(cx) { match rx.poll_recv(cx) {
Poll::Ready(Some(a)) => { Poll::Ready(Some(a)) => match a {
match a { Request::Request(mid, msg) => {
Request::Request(mid, msg) => { stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.buffer.fetch_sub(1, Ordering::Relaxed); stats.parallel.fetch_add(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
let bus = bus.clone(); let bus = bus.clone();
let ut = ut.clone(); let ut = ut.clone();
queue.push( tokio::task::spawn_blocking(move || (mid, ut.handle(msg, &bus)))); queue.push(tokio::task::spawn_blocking(move || {
}, (mid, ut.handle(msg, &bus))
Request::Action(Action::Flush) => need_flush = true, }));
Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => rx.close(),
_ => unimplemented!()
} }
Request::Action(Action::Flush) => need_flush = true,
Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => rx.close(),
_ => unimplemented!(),
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
need_sync = true; need_sync = true;
rx_closed = true; rx_closed = true;
}, }
Poll::Pending => break, Poll::Pending => break,
} }
} }
} }
let queue_len = queue.len(); let queue_len = queue.len();
loop { loop {
if queue_len != 0 { if queue_len != 0 {
loop { loop {
@ -80,7 +82,7 @@ where
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok((mid, res)))) => { Poll::Ready(Some(Ok((mid, res)))) => {
stx.send(Event::Response(mid, res)).ok(); stx.send(Event::Response(mid, res)).ok();
}, }
Poll::Ready(None) => break, Poll::Ready(None) => break,
_ => {} _ => {}
} }
@ -98,7 +100,7 @@ where
Poll::Pending => { Poll::Pending => {
sync_future = Some(fut); sync_future = Some(fut);
return Poll::Pending; return Poll::Pending;
}, }
Poll::Ready(res) => { Poll::Ready(res) => {
need_sync = false; need_sync = false;
stx.send(Event::Synchronized(res)).ok(); stx.send(Event::Synchronized(res)).ok();
@ -108,7 +110,9 @@ where
let ut = ut.clone(); let ut = ut.clone();
let bus_clone = bus.clone(); let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move { sync_future.replace(Box::pin(async move {
tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await.unwrap() tokio::task::spawn_blocking(move || ut.sync(&bus_clone))
.await
.unwrap()
})); }));
} }
} else { } else {
@ -117,16 +121,20 @@ where
} }
if queue_len == queue.len() { if queue_len == queue.len() {
return if rx_closed { Poll::Ready(()) } else { Poll::Pending }; return if rx_closed {
Poll::Ready(())
} else {
Poll::Pending
};
} }
}) })
} }
pub struct BufferUnorderedSync<M, R = (), E = anyhow::Error> pub struct BufferUnorderedSync<M, R = (), E = anyhow::Error>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedStats>, stats: Arc<BufferUnorderedStats>,
@ -134,15 +142,22 @@ pub struct BufferUnorderedSync<M, R = (), E = anyhow::Error>
} }
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedSync<M, R, E> impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedSync<M, R, E>
where where
T: Handler<M, Response = R, Error = E> + 'static, T: Handler<M, Response = R, Error = E> + 'static,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error E: crate::Error,
{ {
type Config = BufferUnorderedConfig; type Config = BufferUnorderedConfig;
fn build(cfg: Self::Config) -> (Self, Box<dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>) { fn build(
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let stats = Arc::new(BufferUnorderedStats { let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0), buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _), buffer_total: AtomicU64::new(cfg.buffer_size as _),
@ -156,40 +171,48 @@ impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedSync<M
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M, R, E>(rx, bus, ut, stats_clone, cfg, stx)) Box::pin(buffer_unordered_poller::<T, M, R, E>(
as Pin<Box<dyn Future<Output = ()> + Send>> rx,
bus,
ut,
stats_clone,
cfg,
stx,
)) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
(BufferUnorderedSync::<M, R, E> { (
tx, BufferUnorderedSync::<M, R, E> {
stats, tx,
srx: Mutex::new(srx), stats,
}, poller) srx: Mutex::new(srx),
},
poller,
)
} }
} }
impl<M, R, E> SendUntypedReceiver for BufferUnorderedSync<M, R, E> impl<M, R, E> SendUntypedReceiver for BufferUnorderedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
fn send(&self, msg: Action) -> Result<(), SendError<Action>> { fn send(&self, msg: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(msg)) { match self.tx.send(Request::Action(msg)) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!() _ => unimplemented!(),
} }
} }
} }
impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedSync<M, R, E>
impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedSync<M, R, E> where
where M: Message,
M: Message, R: Message,
R: Message, E: crate::Error,
E: crate::Error
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -199,16 +222,16 @@ impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedSync<M, R, E>
Ok(()) Ok(())
} }
Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)), Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!() _ => unimplemented!(),
} }
} }
} }
impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedSync<M, R, E> impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error E: crate::Error,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);
@ -218,4 +241,4 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedSync<M, R, E>
Poll::Ready(None) => Poll::Ready(Event::Exited), Poll::Ready(None) => Poll::Ready(Event::Exited),
} }
} }
} }

View File

@ -4,25 +4,20 @@ mod buffer_unordered;
// mod synchronize_batched; // mod synchronize_batched;
// mod synchronized; // mod synchronized;
// mod mpsc; // mod mpsc;
// mod mpsc { // mod mpsc {
// pub use super::mpsc_futures::*; // pub use super::mpsc_futures::*;
// } // }
pub use buffer_unordered::{ pub use buffer_unordered::{BufferUnorderedAsync, BufferUnorderedConfig, BufferUnorderedSync};
BufferUnorderedAsync, BufferUnorderedConfig,
BufferUnorderedSync,
};
use crate::receiver::Action; use crate::receiver::Action;
pub(crate) enum Request<M> { pub(crate) enum Request<M> {
Action(Action), Action(Action),
Request(u64, M) Request(u64, M),
} }
// pub use buffer_unordered_batched::{ // pub use buffer_unordered_batched::{
// BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig, // BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig,
// BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber, // BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,