Message response and error types; fix issues

This commit is contained in:
Andrey Tkachenko 2021-06-18 21:01:13 +04:00
parent 6b24f39c50
commit dc56bd8a8d
14 changed files with 1071 additions and 686 deletions

View File

@ -17,6 +17,11 @@ async-trait = "0.1.42"
futures = "0.3.8"
anyhow = "1.0.34"
crossbeam = "0.8.1"
uuid = "0.8.2"
tokio-util = "0.6.7"
async-stream = "0.3.2"
smallvec = "1.6.1"
log = "0.4.14"
[dev-dependencies]
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] }

View File

@ -1,15 +1,24 @@
use async_trait::async_trait;
use messagebus::{receivers, AsyncHandler, Bus, Handler, Result as MbusResult};
use messagebus::{receivers, AsyncHandler, Bus, Handler};
struct TmpReceiver;
struct TmpReceiver2;
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver {
async fn handle(&self, msg: f32, bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: f32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(1u16).await?;
println!("---> f32 {}", msg);
println!("TmpReceiver ---> f32 {}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver f32: sync");
Ok(())
}
@ -17,9 +26,18 @@ impl AsyncHandler<f32> for TmpReceiver {
#[async_trait]
impl AsyncHandler<u16> for TmpReceiver {
async fn handle(&self, msg: u16, bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: u16, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(2u32).await?;
println!("---> u16 {}", msg);
println!("TmpReceiver ---> u16 {}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver u16: sync");
Ok(())
}
@ -27,9 +45,17 @@ impl AsyncHandler<u16> for TmpReceiver {
#[async_trait]
impl AsyncHandler<u32> for TmpReceiver {
async fn handle(&self, msg: u32, bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: u32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(3i32).await?;
println!("---> u32 {}", msg);
println!("TmpReceiver ---> u32 {}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver u32: sync");
Ok(())
}
@ -37,9 +63,18 @@ impl AsyncHandler<u32> for TmpReceiver {
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
async fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(4i16).await?;
println!("---> i32 {}", msg);
println!("TmpReceiver ---> i32 {}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver i32: sync");
Ok(())
}
@ -47,8 +82,16 @@ impl AsyncHandler<i32> for TmpReceiver {
#[async_trait]
impl AsyncHandler<i16> for TmpReceiver {
async fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> i16 {}", msg);
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver ---> i16 {}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver i16: sync");
Ok(())
}
@ -56,17 +99,35 @@ impl AsyncHandler<i16> for TmpReceiver {
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver2 {
async fn handle(&self, msg: i32, bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("!!!! TmpReceiver2: ---> 2 i32 {}", msg);
bus.send(5i16).await?;
println!("---> 2 i32 {}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver2: i32: sync");
Ok(())
}
}
impl Handler<i16> for TmpReceiver2 {
fn handle(&self, msg: i16, _bus: &Bus) -> MbusResult {
println!("---> 2 i16 {}", msg);
type Error = anyhow::Error;
type Response = ();
fn handle(&self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver2: ---> 2 i16 {}", msg);
Ok(())
}
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver2: i16: sync");
Ok(())
}
@ -76,18 +137,26 @@ impl Handler<i16> for TmpReceiver2 {
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.done()
.register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedAsync<_>>(Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default())
.done()
.build();
.build();
b.send(0f32).await.unwrap();
poller.await
println!("flush");
b.flush().await;
println!("close");
b.close().await;
poller.await;
println!("[done]");
}

View File

@ -1,9 +1,14 @@
use messagebus::{receivers, Bus, Handler, Result as MbusResult};
use async_trait::async_trait;
use messagebus::{receivers, Bus, AsyncHandler};
struct TmpReceiver;
impl Handler<f32> for TmpReceiver {
fn handle(&self, msg: f32, _bus: &Bus) -> MbusResult {
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver {
type Error = anyhow::Error;
type Response = ();
async fn handle(&self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {}", msg);
std::thread::sleep(std::time::Duration::from_secs(1));
@ -17,7 +22,7 @@ impl Handler<f32> for TmpReceiver {
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>>(receivers::BufferUnorderedConfig {
.subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(1, receivers::BufferUnorderedConfig {
buffer_size: 1,
max_parallel: 1,
})
@ -57,7 +62,12 @@ async fn main() {
println!("sending 11");
b.send(32f32).await.unwrap();
println!("finish");
println!("flush");
b.flush().await;
println!("close");
b.close().await;
poller.await;
println!("[done]");
}

View File

@ -1,9 +1,12 @@
use messagebus::{receivers, Bus, Handler, Result as MbusResult};
use messagebus::{receivers, Bus, Handler};
struct TmpReceiver;
impl Handler<f32> for TmpReceiver {
fn handle(&self, msg: f32, _bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
fn handle(&self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {}", msg);
std::thread::sleep(std::time::Duration::from_secs(5));
@ -15,14 +18,20 @@ impl Handler<f32> for TmpReceiver {
}
impl Handler<u16> for TmpReceiver {
fn handle(&self, msg: u16, _bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
fn handle(&self, msg: u16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u16 {}", msg);
Ok(())
}
}
impl Handler<u32> for TmpReceiver {
fn handle(&self, msg: u32, _bus: &Bus) -> MbusResult {
type Error = anyhow::Error;
type Response = ();
fn handle(&self, msg: u32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u32 {}", msg);
Ok(())
}
@ -32,9 +41,9 @@ impl Handler<u32> for TmpReceiver {
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_>>(Default::default())
.subscribe::<f32, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default())
.done()
.build();
@ -42,5 +51,12 @@ async fn main() {
b.send(11u16).await.unwrap();
b.send(32u32).await.unwrap();
poller.await
println!("flush");
b.flush().await;
println!("close");
b.close().await;
poller.await;
println!("[done]");
}

View File

@ -20,7 +20,13 @@ pub trait ReceiverSubscriber<T: 'static> {
);
}
pub trait ReceiverSubscriberBuilder<M, T: 'static> {
pub trait ReceiverSubscriberBuilder<T, M, R, E>
where
T: 'static,
M: Message,
R: Message,
E: crate::Error
{
type Entry: ReceiverSubscriber<T>;
type Config: Default;
@ -44,6 +50,7 @@ pub struct RegisterEntry<K, T> {
)
-> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
)>,
>,
_m: PhantomData<(K, T)>,
@ -54,10 +61,10 @@ impl<K, T: 'static> RegisterEntry<K, T> {
let mut builder = self.builder;
for (tid, v) in self.receivers {
for (r, poller) in v {
for (r, poller, poller2) in v {
let poller = poller(self.item.clone());
builder.add_recevier((tid, r), poller);
builder.add_recevier((tid, r), poller, poller2);
}
}
@ -65,39 +72,45 @@ impl<K, T: 'static> RegisterEntry<K, T> {
}
}
impl<T: Send + 'static> RegisterEntry<UnsyncEntry, T> {
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
impl<T> RegisterEntry<UnsyncEntry, T> {
pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self
where
T: Send + 'static,
M: Message + 'static,
R: ReceiverSubscriberBuilder<M, T> + 'static,
M: Message,
R: Message,
E: crate::Error,
S: ReceiverSubscriberBuilder<T, M, R, E> + 'static,
{
let (inner, poller) = R::build(cfg).subscribe();
let (inner, poller) = S::build(cfg).subscribe();
let receiver = Receiver::new(inner);
let receiver = Receiver::new(queue, inner);
let poller2 = receiver.start_polling_events::<R, E>();
self.receivers
.entry(TypeId::of::<M>())
.or_insert_with(Vec::new)
.push((receiver, poller));
.push((receiver, poller, poller2));
self
}
}
impl<T: Send + Sync + 'static> RegisterEntry<SyncEntry, T> {
pub fn subscribe<M, R>(mut self, cfg: R::Config) -> Self
impl<T> RegisterEntry<SyncEntry, T> {
pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self
where
T: Send + 'static,
M: Message + 'static,
R: ReceiverSubscriberBuilder<M, T> + 'static,
T: Send + Sync + 'static,
M: Message,
R: Message,
E: crate::Error,
S: ReceiverSubscriberBuilder<T, M, R, E> + 'static,
{
let (inner, poller) = R::build(cfg).subscribe();
let (inner, poller) = S::build(cfg).subscribe();
let receiver = Receiver::new(inner);
let receiver = Receiver::new(queue, inner);
let poller2 = receiver.start_polling_events::<R, E>();
self.receivers
.entry(TypeId::of::<M>())
.or_insert_with(Vec::new)
.push((receiver, poller));
.push((receiver, poller, poller2));
self
}
@ -138,9 +151,11 @@ impl BusBuilder {
&mut self,
val: (TypeId, Receiver),
poller: Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
poller2: Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
) {
self.receivers.push(val);
self.pollings.push(poller);
self.pollings.push(poller2);
}
pub fn build(self) -> (Bus, impl Future<Output = ()>) {
@ -148,7 +163,7 @@ impl BusBuilder {
inner: Arc::new(BusInner::new(self.receivers)),
};
let mut futs = Vec::with_capacity(self.pollings.len());
let mut futs = Vec::with_capacity(self.pollings.len() * 2);
for poller in self.pollings {
futs.push(tokio::task::spawn(poller(bus.clone())));
}

View File

@ -2,91 +2,128 @@ use crate::{Bus, Message};
use async_trait::async_trait;
pub trait Handler<M: Message>: Send + Sync {
fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync {
async fn handle(&self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait SynchronizedHandler<M: Message>: Send {
fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send {
async fn handle(&mut self, msg: M, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait BatchHandler<M: Message>: Send + Sync {
fn handle(&self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync {
async fn handle(&self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
async fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait BatchSynchronizedHandler<M: Message>: Send {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait LocalHandler<M: Message> {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait LocalAsyncHandler<M: Message> {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait LocalBatchHandler<M: Message> {
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait LocalAsyncBatchHandler<M: Message> {
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> anyhow::Result<()>;
async fn sync(&mut self, _bus: &Bus) -> anyhow::Result<()> {
type Error: crate::Error;
type Response: Message;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}

View File

@ -5,32 +5,44 @@ pub mod msgs;
mod receiver;
pub mod receivers;
mod trait_object;
mod utils;
#[macro_use]
extern crate log;
use builder::BusBuilder;
pub use envelop::Message;
pub use handler::*;
pub use receiver::SendError;
use receiver::{Receiver, ReceiverStats};
use utils::binary_search_range_by_key;
use smallvec::SmallVec;
use core::any::{Any, TypeId};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::{collections::HashMap, sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}};
use crate::receivers::Permit;
pub type Untyped = Arc<dyn Any + Send + Sync>;
pub type Result = anyhow::Result<()>;
// pub trait ErrorTrait: std::error::Error + Send + Sync + 'static {}
pub trait Error: Into<anyhow::Error> + Send + Sync + 'static {}
impl <T: Into<anyhow::Error> + Send + Sync + 'static> Error for T {}
static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
pub struct BusInner {
receivers: Vec<(TypeId, Receiver)>,
receivers: HashMap<TypeId, SmallVec<[Receiver; 4]>>,
closed: AtomicBool,
}
impl BusInner {
pub(crate) fn new(mut receivers: Vec<(TypeId, Receiver)>) -> Self {
receivers.sort_unstable_by_key(|(k, _)| *k);
pub(crate) fn new(input: Vec<(TypeId, Receiver)>) -> Self {
let mut receivers = HashMap::new();
for (key, value) in input {
receivers.entry(key)
.or_insert_with(SmallVec::new)
.push(value);
}
Self {
receivers,
@ -38,42 +50,101 @@ impl BusInner {
}
}
pub fn close(&self) {
pub async fn close(&self) {
self.closed.store(true, Ordering::SeqCst);
for (_, r) in &self.receivers {
r.close();
for (_, rs) in &self.receivers {
for r in rs {
r.close().await;
}
}
}
pub async fn sync(&self) {
for (_, r) in &self.receivers {
r.sync().await;
pub async fn flush(&self) {
let fuse_count = 32i32;
let mut breaked = false;
let mut iters = 0usize;
for _ in 0..fuse_count {
iters += 1;
let mut flushed = false;
for (_, rs) in &self.receivers {
for r in rs {
if r.need_flush() {
flushed = true;
r.flush().await;
}
}
}
if !flushed {
breaked = true;
break;
}
}
if !breaked {
warn!("!!! WARNING: unable to reach equilibrium in {} iterations !!!", fuse_count);
} else {
info!("flushed in {} iterations !!!", iters);
}
}
pub fn stats(&self) -> impl Iterator<Item = ReceiverStats> + '_ {
self.receivers.iter().map(|(_, r)| r.stats())
pub async fn flash_and_sync(&self) {
self.flush().await;
for (_, rs) in &self.receivers {
for r in rs {
r.sync().await;
}
}
}
// pub fn stats(&self) -> impl Iterator<Item = ReceiverStats> + '_ {
// self.receivers.iter()
// .map(|(_, i)|i.iter())
// .flatten()
// .map(|r| r.stats())
// }
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) {
println!("Bus closed. Skipping send!");
warn!("Bus closed. Skipping send!");
return Ok(());
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
let range = binary_search_range_by_key(&self.receivers, &tid, |(k, _)| *k);
for i in (range.start + 1)..range.end {
self.receivers[i].1.try_broadcast(msg.clone())?;
if let Some(rs) = self.receivers.get(&tid) {
let mut permits = SmallVec::<[Permit; 32]>::new();
for r in rs {
if let Some(prmt) = r.try_reserve() {
permits.push(prmt);
} else {
return Err(SendError::Full(msg));
};
}
let mut iter = permits.into_iter().zip(rs.iter());
let mut counter = 1;
let total = rs.len();
while counter < total {
let (p, r) = iter.next().unwrap();
let _ = r.send(mid, p, msg.clone());
counter += 1;
}
if let Some((p, r)) = iter.next() {
let _ = r.send(mid, p, msg);
return Ok(());
}
}
if let Some((_, r)) = self.receivers.get(range.start) {
r.try_broadcast(msg.clone())?;
} else {
println!("Unhandled message {:?}", core::any::type_name::<M>());
}
warn!("Unhandled message {:?}: no receivers", core::any::type_name::<M>());
Ok(())
}
@ -88,19 +159,48 @@ impl BusInner {
return Err(SendError::Closed(msg));
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
let range = binary_search_range_by_key(&self.receivers, &tid, |(k, _)| *k);
for i in (range.start + 1)..range.end {
self.receivers[i].1.broadcast(msg.clone()).await?;
if let Some(rs) = self.receivers.get(&tid) {
if let Some((last, head)) = rs.split_last() {
for r in head {
let _ = r.send(mid, r.reserve().await, msg.clone());
}
let _ = last.send(mid, last.reserve().await, msg.clone());
return Ok(());
}
}
if let Some((_, r)) = self.receivers.get(range.start) {
r.broadcast(msg.clone()).await?;
} else {
println!("Unhandled message {:?}", core::any::type_name::<M>());
warn!("Unhandled message {:?}: no receivers", core::any::type_name::<M>());
Ok(())
}
pub async fn force_send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg));
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid) {
if let Some((last, head)) = rs.split_last() {
for r in head {
let _ = r.force_send(mid, msg.clone());
}
let _ = last.force_send(mid, msg.clone());
return Ok(());
}
}
warn!("Unhandled message {:?}: no receivers", core::any::type_name::<M>());
Ok(())
}
}

View File

@ -1,15 +1,10 @@
use crate::{trait_object::TraitObject, Bus, Message};
use core::{
any::TypeId,
fmt,
future::Future,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
use crate::{Bus, Message, msgs, receivers::{Event, Permit, PermitDrop}, trait_object::TraitObject};
use core::{any::TypeId, fmt, marker::PhantomData, mem, pin::Pin, task::{Context, Poll}};
use futures::future::poll_fn;
use std::{borrow::Cow, sync::Arc};
use tokio::sync::Notify;
use std::{borrow::Cow, sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}};
use futures::Future;
pub struct AnyReceiver<'a> {
dyn_typed_receiver_trait_object: TraitObject,
@ -17,19 +12,52 @@ pub struct AnyReceiver<'a> {
_m: PhantomData<&'a usize>,
}
unsafe impl Send for AnyReceiver<'_> {}
impl<'a> AnyReceiver<'a> {
pub fn new<M: Message, R: TypedReceiver<M> + 'static>(rcvr: &'a R) -> Self {
let trcvr = rcvr as &(dyn TypedReceiver<M>);
pub fn new<M: Message, R: SendTypedReceiver<M> + 'static>(rcvr: &'a R) -> Self {
let trcvr = rcvr as &(dyn SendTypedReceiver<M>);
Self {
dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) },
type_id: TypeId::of::<dyn TypedReceiver<M>>(),
type_id: TypeId::of::<dyn SendTypedReceiver<M>>(),
_m: Default::default(),
}
}
pub fn dyn_typed_receiver<M: Message>(&'a self) -> &'a dyn TypedReceiver<M> {
assert_eq!(self.type_id, TypeId::of::<dyn TypedReceiver<M>>());
pub fn dyn_typed_receiver<M: Message>(&'a self) -> &'a dyn SendTypedReceiver<M> {
assert_eq!(self.type_id, TypeId::of::<dyn SendTypedReceiver<M>>());
unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) }
}
}
pub struct AnyPoller<'a> {
dyn_typed_receiver_trait_object: TraitObject,
type_id: TypeId,
_m: PhantomData<&'a usize>,
}
unsafe impl Send for AnyPoller<'_> {}
impl<'a> AnyPoller<'a> {
pub fn new<M, E, R>(rcvr: &'a R) -> Self
where
M: Message,
E: crate::Error,
R: ReciveTypedReceiver<M, E> + 'static
{
let trcvr = rcvr as &(dyn ReciveTypedReceiver<M, E>);
Self {
dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) },
type_id: TypeId::of::<dyn ReciveTypedReceiver<M, E>>(),
_m: Default::default(),
}
}
pub fn dyn_typed_receiver<M: Message, E: crate::Error>(&'a self) -> &'a dyn ReciveTypedReceiver<M, E> {
assert_eq!(self.type_id, TypeId::of::<dyn ReciveTypedReceiver<M, E>>());
unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) }
}
@ -87,26 +115,51 @@ impl fmt::Display for ReceiverStats {
}
}
pub trait TypedReceiver<M: Message>: Sync {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()>;
fn try_send(&self, msg: M) -> Result<(), SendError<M>>;
pub trait SendTypedReceiver<M: Message>: Sync {
fn send(&self, mid: u64, msg: M) -> Result<(), SendError<M>>;
}
pub trait ReciveTypedReceiver<M, E>: Sync
where M: Message,
E: crate::Error
{
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>;
}
pub trait ReceiverTrait: Send + Sync {
fn typed(&self) -> AnyReceiver<'_>;
fn poller(&self) -> AnyPoller<'_>;
fn type_id(&self) -> TypeId;
fn close(&self);
fn stats(&self) -> ReceiverStats;
fn sync(&self);
fn poll_synchronized(&self, ctx: &mut Context<'_>) -> Poll<()>;
fn stats(&self) -> Result<(), SendError<()>>;
fn close(&self) -> Result<(), SendError<()>>;
fn sync(&self) -> Result<(), SendError<()>>;
fn flush(&self) -> Result<(), SendError<()>>;
}
pub trait ReceiverPollerBuilder {
fn build(bus: Bus) -> Box<dyn Future<Output = ()>>;
}
struct ReceiverContext {
limit: u64,
processing: AtomicU64,
need_flush: AtomicBool,
flushed: Notify,
synchronized: Notify,
closed: Notify,
response: Notify,
statistics: Notify,
}
impl PermitDrop for ReceiverContext {
fn permit_drop(&self) {
self.processing.fetch_sub(1, Ordering::SeqCst);
}
}
pub struct Receiver {
inner: Arc<dyn ReceiverTrait>,
context: Arc<ReceiverContext>,
}
impl fmt::Debug for Receiver {
@ -124,49 +177,21 @@ impl core::cmp::PartialEq for Receiver {
impl core::cmp::Eq for Receiver {}
pub struct ReceiverPoller<'a, M: Message> {
inner: &'a dyn ReceiverTrait,
msg: Option<M>,
}
impl<'a, M: Message> Unpin for ReceiverPoller<'a, M> {}
impl<'a, M: Message> Future for ReceiverPoller<'a, M> {
type Output = Result<(), SendError<M>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut msg = if let Some(msg) = this.msg.take() {
msg
} else {
return Poll::Ready(Ok(()));
};
let any_receiver = this.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
loop {
match receiver.poll_ready(ctx) {
Poll::Ready(_) => (),
Poll::Pending => {
this.msg = Some(msg);
return Poll::Pending;
}
}
msg = match receiver.try_send(msg) {
Ok(_) => break Poll::Ready(Ok(())),
Err(SendError::Full(m)) => m,
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}
impl Receiver {
#[inline]
pub(crate) fn new(inner: Arc<dyn ReceiverTrait>) -> Self {
Self { inner }
pub(crate) fn new(limit: u64, inner: Arc<dyn ReceiverTrait>) -> Self {
let context = Arc::new(ReceiverContext {
limit,
processing: AtomicU64::new(0),
need_flush: AtomicBool::new(false),
flushed: Notify::new(),
synchronized: Notify::new(),
closed: Notify::new(),
response: Notify::new(),
statistics: Notify::new(),
});
Self { inner, context }
}
#[inline]
@ -175,38 +200,157 @@ impl Receiver {
}
#[inline]
pub fn broadcast<M: Message>(
&self,
msg: M,
) -> impl Future<Output = Result<(), SendError<M>>> + '_ {
ReceiverPoller {
inner: self.inner.as_ref(),
msg: Some(msg),
pub fn need_flush(&self) -> bool {
self.context.need_flush.load(Ordering::SeqCst)
}
#[inline]
pub async fn reserve(&self) -> Permit {
loop {
let count = self.context.processing.load(Ordering::Relaxed);
if count < self.context.limit {
let res = self.context.processing.compare_exchange(count, count + 1, Ordering::SeqCst, Ordering::SeqCst);
if res.is_ok() {
break Permit {
fuse: false,
inner: self.context.clone(),
};
}
// continue
} else {
self.context.response.notified()
.await
}
}
}
#[inline]
pub fn try_broadcast<M: Message>(&self, msg: M) -> Result<(), SendError<M>> {
pub fn try_reserve(&self) -> Option<Permit> {
loop {
let count = self.context.processing.load(Ordering::Relaxed);
if count < self.context.limit {
let res = self.context.processing.compare_exchange(count, count + 1, Ordering::SeqCst, Ordering::SeqCst);
if res.is_ok() {
break Some(Permit {
fuse: false,
inner: self.context.clone(),
});
}
// continue
} else {
break None;
}
}
}
#[inline]
pub fn send<M: Message>(&self, mid: u64, mut permit: Permit, msg: M) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
let res = receiver.send(mid, msg);
permit.fuse = true;
receiver.try_send(msg)
if !res.is_err() {
self.context.need_flush.store(true, Ordering::SeqCst);
}
res
}
#[inline]
pub fn close(&self) {
self.inner.close();
pub fn force_send<M: Message>(&self, mid: u64, msg: M) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
let res = receiver.send(mid, msg);
if !res.is_err() {
self.context.need_flush.store(true, Ordering::SeqCst);
}
res
}
pub fn start_polling_events<M, E>(&self) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
where
M: Message,
E: crate::Error
{
let ctx_clone = self.context.clone();
let inner_clone = self.inner.clone();
Box::new(move |bus| Box::pin(async move {
let any_receiver = inner_clone.poller();
let receiver = any_receiver.dyn_typed_receiver::<M, E>();
loop {
let event = poll_fn(move |ctx| receiver.poll_events(ctx))
.await;
match event {
Event::Exited => {
ctx_clone.closed.notify_waiters();
break;
},
Event::Flushed => ctx_clone.flushed.notify_waiters(),
Event::Synchronized => ctx_clone.synchronized.notify_waiters(),
Event::Response(_mid, resp) => {
ctx_clone.processing.fetch_sub(1, Ordering::SeqCst);
ctx_clone.response.notify_one();
match resp {
Ok(_msg) => (),
Err(err) => { bus.try_send(msgs::Error(Arc::new(err.into()))).ok(); }
}
},
_ => unimplemented!()
}
}
}))
}
// #[inline]
// pub fn stats(&self) -> ReceiverStats {
// if self.inner.stats().is_ok() {
// self.context.stats.notified()
// .await
// } else {
// warn!("close failed!");
// }
// }
#[inline]
pub async fn close(&self) {
if self.inner.close().is_ok() {
self.context.closed.notified()
.await
} else {
warn!("close failed!");
}
}
#[inline]
pub fn stats(&self) -> ReceiverStats {
self.inner.stats()
pub async fn sync(&self) {
if self.inner.sync().is_ok() {
self.context.synchronized.notified()
.await
} else {
warn!("sync failed!");
}
}
#[inline]
pub fn sync(&self) -> impl Future<Output = ()> + '_ {
self.inner.sync();
pub async fn flush(&self) {
if self.inner.flush().is_ok() {
self.context.flushed.notified()
.await;
poll_fn(move |ctx| self.inner.poll_synchronized(ctx))
self.context.need_flush.store(false, Ordering::SeqCst);
} else {
warn!("flush failed!");
}
}
}

View File

@ -9,30 +9,37 @@ use std::{
task::{Context, Poll},
};
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use crate::{receiver::{AnyPoller, ReceiverStats, ReciveTypedReceiver}, receivers::{Action, Event}};
use anyhow::Result;
use futures::{Future, StreamExt, stream::FuturesUnordered};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
receiver::{AnyReceiver, ReceiverTrait, SendError, SendTypedReceiver},
AsyncHandler, Bus, Message, Untyped,
};
use parking_lot::Mutex;
use tokio::sync::mpsc;
pub struct BufferUnorderedAsyncSubscriber<T, M>
pub struct BufferUnorderedAsyncSubscriber<T, M, R, E>
where
T: AsyncHandler<M> + 'static,
T: AsyncHandler<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: crate::Error
{
cfg: BufferUnorderedConfig,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for BufferUnorderedAsyncSubscriber<T, M>
impl<T, M, R, E> ReceiverSubscriber<T> for BufferUnorderedAsyncSubscriber<T, M, R, E>
where
T: AsyncHandler<M> + 'static,
T: AsyncHandler<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: crate::Error
{
fn subscribe(
self,
@ -50,15 +57,17 @@ where
parallel_total: AtomicU64::new(cfg.max_parallel as _),
});
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let arc = Arc::new(BufferUnorderedAsync::<M> {
let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
let arc = Arc::new(BufferUnorderedAsync::<M, R, E> {
tx,
stats: stats.clone(),
srx: Mutex::new(srx),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
Box::pin(buffer_unordered_poller::<T, M, R, E>(rx, bus, ut, stats, cfg, stx))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
@ -67,68 +76,129 @@ where
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
fn buffer_unordered_poller<T, M, R, E>(
mut rx: mpsc::UnboundedReceiver<Action<M>>,
bus: Bus,
ut: Untyped,
stats: Arc<BufferUnorderedStats>,
cfg: BufferUnorderedConfig,
) where
T: AsyncHandler<M> + 'static,
stx: mpsc::UnboundedSender<Event<R, E>>,
) -> impl Future<Output = ()>
where
T: AsyncHandler<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: crate::Error
{
let ut = ut.downcast::<T>().unwrap();
let mut queue = FuturesUnordered::new();
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None;
let mut need_sync = false;
let mut rx_closed = false;
let mut need_flush = false;
let mut x = rx
.map(|msg| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
let bus = bus.clone();
let ut = ut.clone();
futures::future::poll_fn(move |cx| loop {
if !rx_closed && !need_flush && !need_sync {
while queue.len() < cfg.max_parallel {
match rx.poll_recv(cx) {
Poll::Ready(Some(a)) => {
match a {
Action::Request(mid, msg) => {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
tokio::task::spawn(async move { ut.handle(msg, &bus).await })
})
.buffer_unordered(cfg.max_parallel);
while let Some(err) = x.next().await {
stats.parallel.fetch_sub(1, Ordering::Relaxed);
match err {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
let bus = bus.clone();
let ut = ut.clone();
queue.push(tokio::task::spawn(async move { (mid, ut.handle(msg, &bus).await) }));
},
Action::Flush => need_flush = true,
Action::Sync => need_sync = true,
Action::Close => rx.close(),
_ => unimplemented!()
}
},
Poll::Ready(None) => {
need_sync = true;
rx_closed = true;
},
Poll::Pending => break,
}
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn(async move { ut.sync(&bus_clone).await }).await;
let queue_len = queue.len();
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
loop {
if queue_len != 0 {
loop {
match queue.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok((mid, res)))) => {
stx.send(Event::Response(mid, res)).ok();
},
Poll::Ready(None) => break,
_ => {}
}
}
}
if need_flush {
need_flush = false;
stx.send(Event::Flushed).ok();
}
if need_sync {
if let Some(mut fut) = sync_future.take() {
match fut.as_mut().poll(cx) {
Poll::Pending => {
sync_future = Some(fut);
return Poll::Pending;
},
Poll::Ready(res) => {
need_sync = false;
if let Err(err) = res {
stx.send(Event::SyncResponse(err)).ok();
}
}
}
} else {
let ut = ut.clone();
let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move {
ut.sync(&bus_clone).await
}));
}
} else {
break;
}
}
_ => (),
}
println!(
"[EXIT] BufferUnorderedAsync<{}>",
std::any::type_name::<M>()
);
if queue_len == queue.len() {
return if rx_closed { Poll::Ready(()) } else { Poll::Pending };
}
})
}
pub struct BufferUnorderedAsync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<BufferUnorderedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for BufferUnorderedAsync<M>
where
T: AsyncHandler<M> + 'static,
M: Message,
pub struct BufferUnorderedAsync<M, R = (), E = anyhow::Error>
where
M: Message,
R: Message,
E: crate::Error
{
type Entry = BufferUnorderedAsyncSubscriber<T, M>;
tx: mpsc::UnboundedSender<Action<M>>,
stats: Arc<BufferUnorderedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedAsync<M, R, E>
where
T: AsyncHandler<M, Response = R, Error = E> + 'static,
R: Message,
M: Message,
E: crate::Error
{
type Entry = BufferUnorderedAsyncSubscriber<T, M, R, E>;
type Config = BufferUnorderedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
@ -139,65 +209,102 @@ where
}
}
impl<M: Message> TypedReceiver<M> for BufferUnorderedAsync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedAsync<M, R, E>
where
M: Message,
R: Message,
E: crate::Error
{
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Action::Request(mid, m)) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
Err(mpsc::error::SendError(Action::Request(_, msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!()
}
}
}
impl<M: Message> ReceiverTrait for BufferUnorderedAsync<M> {
impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedAsync<M, R, E>
where
M: Message,
R: Message,
E: crate::Error
{
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
}
}
impl<M, R, E> ReceiverTrait for BufferUnorderedAsync<M, R, E>
where
M: Message,
R: Message,
E: crate::Error
{
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn poller(&self) -> AnyPoller<'_> {
AnyPoller::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<BufferUnorderedAsync<M>>()
TypeId::of::<BufferUnorderedAsync<M, R, E>>()
}
fn close(&self) {
self.tx.close();
fn stats(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Stats) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
// ReceiverStats {
// name: std::any::type_name::<M>().into(),
// fields: vec![
// ("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
// (
// "buffer_total".into(),
// self.stats.buffer_total.load(Ordering::SeqCst),
// ),
// (
// "parallel".into(),
// self.stats.parallel.load(Ordering::SeqCst),
// ),
// (
// "parallel_total".into(),
// self.stats.parallel_total.load(Ordering::SeqCst),
// ),
// ],
// }
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
(
"parallel".into(),
self.stats.parallel.load(Ordering::SeqCst),
),
(
"parallel_total".into(),
self.stats.parallel_total.load(Ordering::SeqCst),
),
],
fn close(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Close) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
}
fn sync(&self) {
self.tx.flush();
fn sync(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Sync) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
fn flush(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Flush) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
}
}

View File

@ -1,5 +1,3 @@
use crate::{receiver::ReceiverStats, receivers::mpsc};
use futures::{Future, StreamExt};
use std::{
any::TypeId,
marker::PhantomData,
@ -11,27 +9,37 @@ use std::{
task::{Context, Poll},
};
use crate::{receiver::{AnyPoller, ReceiverStats, ReciveTypedReceiver}, receivers::{Action, Event}};
use anyhow::Result;
use futures::{Future, StreamExt, stream::FuturesUnordered};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
msgs,
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
Bus, Handler, Message, Untyped,
receiver::{AnyReceiver, ReceiverTrait, SendError, SendTypedReceiver},
Handler, Bus, Message, Untyped,
};
use parking_lot::Mutex;
use tokio::sync::mpsc;
pub struct BufferUnorderedSyncSubscriber<T, M>
pub struct BufferUnorderedSyncSubscriber<T, M, R, E>
where
T: Handler<M> + 'static,
T: Handler<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: crate::Error
{
cfg: BufferUnorderedConfig,
_m: PhantomData<(M, T)>,
_m: PhantomData<(T, M)>,
}
impl<T, M> ReceiverSubscriber<T> for BufferUnorderedSyncSubscriber<T, M>
impl<T, M, R, E> ReceiverSubscriber<T> for BufferUnorderedSyncSubscriber<T, M, R, E>
where
T: Handler<M> + 'static,
T: Handler<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: crate::Error
{
fn subscribe(
self,
@ -42,7 +50,6 @@ where
>,
) {
let cfg = self.cfg;
let (tx, rx) = mpsc::channel(cfg.buffer_size);
let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
@ -50,14 +57,17 @@ where
parallel_total: AtomicU64::new(cfg.max_parallel as _),
});
let arc = Arc::new(BufferUnorderedSync::<M> {
let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
let arc = Arc::new(BufferUnorderedSync::<M, R, E> {
tx,
stats: stats.clone(),
srx: Mutex::new(srx),
});
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
Box::pin(buffer_unordered_poller::<T, M, R, E>(rx, bus, ut, stats, cfg, stx))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
@ -66,69 +76,129 @@ where
}
}
async fn buffer_unordered_poller<T, M>(
rx: mpsc::Receiver<M>,
fn buffer_unordered_poller<T, M, R, E>(
mut rx: mpsc::UnboundedReceiver<Action<M>>,
bus: Bus,
ut: Untyped,
stats: Arc<BufferUnorderedStats>,
cfg: BufferUnorderedConfig,
) where
T: Handler<M> + 'static,
stx: mpsc::UnboundedSender<Event<R, E>>,
) -> impl Future<Output = ()>
where
T: Handler<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: crate::Error
{
let ut = ut.downcast::<T>().unwrap();
let mut queue = FuturesUnordered::new();
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None;
let mut need_sync = false;
let mut rx_closed = false;
let mut need_flush = false;
let mut x = rx
.map(|msg| {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
futures::future::poll_fn(move |cx| loop {
if !rx_closed && !need_flush && !need_sync {
while queue.len() < cfg.max_parallel {
match rx.poll_recv(cx) {
Poll::Ready(Some(a)) => {
match a {
Action::Request(mid, msg) => {
stats.buffer.fetch_sub(1, Ordering::Relaxed);
stats.parallel.fetch_add(1, Ordering::Relaxed);
let bus = bus.clone();
let ut = ut.clone();
tokio::task::spawn_blocking(move || ut.handle(msg, &bus))
})
.buffer_unordered(cfg.max_parallel);
while let Some(err) = x.next().await {
stats.parallel.fetch_sub(1, Ordering::Relaxed);
match err {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
let bus = bus.clone();
let ut = ut.clone();
queue.push( tokio::task::spawn_blocking(move || (mid, ut.handle(msg, &bus))));
},
Action::Flush => need_flush = true,
Action::Sync => need_sync = true,
Action::Close => rx.close(),
_ => unimplemented!()
}
},
Poll::Ready(None) => {
need_sync = true;
rx_closed = true;
},
Poll::Pending => break,
}
}
_ => (),
}
}
let ut = ut.clone();
let bus_clone = bus.clone();
let res = tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await;
let queue_len = queue.len();
match res {
Ok(Err(err)) => {
let _ = bus.send(msgs::Error(Arc::new(err))).await;
loop {
if queue_len != 0 {
loop {
match queue.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok((mid, res)))) => {
stx.send(Event::Response(mid, res)).ok();
},
Poll::Ready(None) => break,
_ => {}
}
}
}
if need_flush {
need_flush = false;
stx.send(Event::Flushed).ok();
}
if need_sync {
if let Some(mut fut) = sync_future.take() {
match fut.as_mut().poll(cx) {
Poll::Pending => {
sync_future = Some(fut);
return Poll::Pending;
},
Poll::Ready(res) => {
need_sync = false;
if let Err(err) = res {
stx.send(Event::SyncResponse(err)).ok();
}
}
}
} else {
let ut = ut.clone();
let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move {
tokio::task::spawn_blocking(move || ut.sync(&bus_clone)).await.unwrap()
}));
}
} else {
break;
}
}
_ => (),
}
println!(
"[EXIT] BufferUnorderedSync<{}>",
std::any::type_name::<M>()
);
if queue_len == queue.len() {
return if rx_closed { Poll::Ready(()) } else { Poll::Pending };
}
})
}
pub struct BufferUnorderedSync<M: Message> {
tx: mpsc::Sender<M>,
stats: Arc<BufferUnorderedStats>,
}
impl<T, M> ReceiverSubscriberBuilder<M, T> for BufferUnorderedSync<M>
where
T: Handler<M> + 'static,
M: Message,
pub struct BufferUnorderedSync<M, R = (), E = anyhow::Error>
where
M: Message,
R: Message,
E: crate::Error
{
type Entry = BufferUnorderedSyncSubscriber<T, M>;
tx: mpsc::UnboundedSender<Action<M>>,
stats: Arc<BufferUnorderedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedSync<M, R, E>
where
T: Handler<M, Response = R, Error = E> + 'static,
R: Message,
M: Message,
E: crate::Error
{
type Entry = BufferUnorderedSyncSubscriber<T, M, R, E>;
type Config = BufferUnorderedConfig;
fn build(cfg: Self::Config) -> Self::Entry {
@ -139,65 +209,102 @@ where
}
}
impl<M: Message> TypedReceiver<M> for BufferUnorderedSync<M> {
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
match self.tx.poll_ready(ctx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
match self.tx.try_send(m) {
impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedSync<M, R, E>
where
M: Message,
R: Message,
E: crate::Error
{
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Action::Request(mid, m)) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(err) => Err(err),
Err(mpsc::error::SendError(Action::Request(_, msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!()
}
}
}
impl<M: Message> ReceiverTrait for BufferUnorderedSync<M> {
impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedSync<M, R, E>
where
M: Message,
R: Message,
E: crate::Error
{
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
}
}
impl<M, R, E> ReceiverTrait for BufferUnorderedSync<M, R, E>
where
M: Message,
R: Message,
E: crate::Error
{
fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(self)
}
fn type_id(&self) -> TypeId {
TypeId::of::<BufferUnorderedSync<M>>()
fn poller(&self) -> AnyPoller<'_> {
AnyPoller::new(self)
}
fn stats(&self) -> ReceiverStats {
ReceiverStats {
name: std::any::type_name::<M>().into(),
fields: vec![
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
(
"buffer_total".into(),
self.stats.buffer_total.load(Ordering::SeqCst),
),
(
"parallel".into(),
self.stats.parallel.load(Ordering::SeqCst),
),
(
"parallel_total".into(),
self.stats.parallel_total.load(Ordering::SeqCst),
),
],
fn type_id(&self) -> TypeId {
TypeId::of::<BufferUnorderedSync<M, R, E>>()
}
fn stats(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Stats) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
// ReceiverStats {
// name: std::any::type_name::<M>().into(),
// fields: vec![
// ("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
// (
// "buffer_total".into(),
// self.stats.buffer_total.load(Ordering::SeqCst),
// ),
// (
// "parallel".into(),
// self.stats.parallel.load(Ordering::SeqCst),
// ),
// (
// "parallel_total".into(),
// self.stats.parallel_total.load(Ordering::SeqCst),
// ),
// ],
// }
}
fn close(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Close) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
}
fn close(&self) {
self.tx.close();
fn sync(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Sync) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
}
fn sync(&self) {
self.tx.flush();
}
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
fn flush(&self) -> Result<(), SendError<()>> {
match self.tx.send(Action::Flush) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::Closed(()))
}
}
}

View File

@ -1,11 +1,68 @@
mod buffer_unordered;
mod buffer_unordered_batched;
mod mpsc_futures;
mod synchronize_batched;
mod synchronized;
// mod buffer_unordered_batched;
// mod mpsc_futures;
// mod synchronize_batched;
// mod synchronized;
mod mpsc {
pub use super::mpsc_futures::*;
// mod mpsc;
// mod mpsc {
// pub use super::mpsc_futures::*;
// }
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct Stats {
pub has_queue: bool,
pub queue_capacity: u64,
pub queue_size: u64,
pub has_parallel: bool,
pub parallel_capacity: u64,
pub parallel_size: u64,
pub has_batch: bool,
pub batch_capacity: u64,
pub batch_size: u64,
}
#[non_exhaustive]
#[derive(Debug)]
pub enum Action<M> {
Request(u64, M),
Flush,
Sync,
Close,
Stats,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum Event<M, E> {
Response(u64, Result<M, E>),
SyncResponse(E),
Stats(Stats),
Synchronized,
Flushed,
Exited,
}
pub struct Permit {
pub(crate) fuse: bool,
pub(crate) inner: Arc<dyn PermitDrop>
}
pub trait PermitDrop {
fn permit_drop(&self);
}
impl Drop for Permit {
fn drop(&mut self) {
if !self.fuse {
self.inner.permit_drop();
}
}
}
pub use buffer_unordered::{
@ -13,17 +70,17 @@ pub use buffer_unordered::{
BufferUnorderedSync, BufferUnorderedSyncSubscriber,
};
pub use buffer_unordered_batched::{
BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig,
BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
};
// pub use buffer_unordered_batched::{
// BufferUnorderedBatchedAsync, BufferUnorderedBatchedAsyncSubscriber, BufferUnorderedBatchedConfig,
// BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
// };
pub use synchronized::{
SynchronizedAsync, SynchronizedAsyncSubscriber, SynchronizedConfig, SynchronizedSync,
SynchronizedSyncSubscriber,
};
// pub use synchronized::{
// SynchronizedAsync, SynchronizedAsyncSubscriber, SynchronizedConfig, SynchronizedSync,
// SynchronizedSyncSubscriber,
// };
pub use synchronize_batched::{
SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
};
// pub use synchronize_batched::{
// SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
// SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
// };

View File

@ -1,142 +0,0 @@
use futures::{Stream, StreamExt};
use core::pin::Pin;
use crossbeam::queue::ArrayQueue;
use crossbeam::atomic::AtomicCell;
use core::task::{Waker, Context, Poll};
use std::sync::{Arc, atomic::*};
use crate::receiver::SendError;
struct ChannelInner<T> {
queue: ArrayQueue<T>,
send_waker: AtomicCell<Option<Box<Waker>>>,
recv_waker: AtomicCell<Option<Box<Waker>>>,
closed: AtomicBool,
}
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(ChannelInner {
queue: ArrayQueue::new(buffer),
send_waker: AtomicCell::new(None),
recv_waker: AtomicCell::new(None),
closed: AtomicBool::new(false),
});
(
Sender {
inner: inner.clone(),
},
Receiver {
inner,
}
)
}
#[derive(Clone)]
pub struct Sender<T> {
inner: Arc<ChannelInner<T>>
}
impl <T> Sender<T> {
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.inner.closed.load(Ordering::SeqCst) {
return Poll::Ready(());
}
if self.inner.queue.is_full() {
self.inner.send_waker.store(Some(Box::new(cx.waker().clone())));
}
let mut counter = 4;
loop {
if self.inner.queue.is_full() {
if counter > 0 {
counter -= 1;
continue;
} else {
break Poll::Pending;
}
} else {
break Poll::Ready(());
}
}
}
pub fn try_send(&self, mut item: T) -> Result<(), SendError<T>> {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(item));
}
let mut counter = 0;
loop {
match self.inner.queue.push(item) {
Ok(_) => {
if let Some(waker) = self.inner.recv_waker.take() {
waker.wake();
}
break Ok(());
}
Err(inner) => {
if counter >= 4 {
break Err(SendError::Full(inner));
} else {
item = inner;
counter += 1;
}
}
}
}
}
pub fn close(&self) {
self.inner.closed.store(true, Ordering::SeqCst);
if let Some(waker) = self.inner.recv_waker.take() {
waker.wake();
}
}
}
pub struct Receiver<T> {
inner: Arc<ChannelInner<T>>
}
impl <T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut counter = 0;
loop {
match this.inner.queue.pop() {
Some(inner) => {
if let Some(waker) = this.inner.send_waker.take() {
waker.wake();
}
break Poll::Ready(Some(inner));
},
None => {
if this.inner.closed.load(Ordering::SeqCst) {
break Poll::Ready(None);
} else {
if counter == 0 {
this.inner.recv_waker.store(Some(Box::new(cx.waker().clone())));
}
if counter >= 8 {
break Poll::Pending;
} else {
counter += 1;
}
}
}
}
}
}
}

View File

@ -1,107 +0,0 @@
use crate::receiver::SendError;
use core::pin::Pin;
use core::task::{Context, Poll};
use crossbeam::queue::SegQueue;
use futures::{channel::mpsc, Stream};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Waker,
};
pub struct State {
buffer: usize,
counter: AtomicUsize,
send_wakers: SegQueue<Waker>,
}
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
let state = Arc::new(State {
buffer,
counter: AtomicUsize::new(0),
send_wakers: SegQueue::new(),
});
let (tx, rx) = mpsc::unbounded();
(
Sender {
inner: tx,
state: state.clone(),
},
Receiver { inner: rx, state },
)
}
pub struct Sender<T> {
inner: mpsc::UnboundedSender<T>,
state: Arc<State>,
}
impl<T> Sender<T> {
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.state.counter.load(Ordering::SeqCst) >= self.state.buffer {
self.state.send_wakers.push(cx.waker().clone());
return Poll::Pending;
}
Poll::Ready(())
}
pub fn try_send(&self, item: T) -> Result<(), SendError<T>> {
if self.state.counter.load(Ordering::Relaxed) >= self.state.buffer {
return Err(SendError::Full(item));
}
self.state.counter.fetch_add(1, Ordering::SeqCst);
match self.inner.unbounded_send(item) {
Ok(_) => Ok(()),
Err(err) if err.is_full() => Err(SendError::Full(err.into_inner())),
Err(err) => Err(SendError::Closed(err.into_inner())),
}
}
#[inline]
pub fn flush(&self) {}
#[inline]
pub fn close(&self) {
self.inner.close_channel();
}
}
pub struct Receiver<T> {
inner: mpsc::UnboundedReceiver<T>,
state: Arc<State>,
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(inner) => {
let val = this.state.buffer - this.state.counter.fetch_sub(1, Ordering::SeqCst) + 1;
for _ in 0..val {
if let Some(waker) = this.state.send_wakers.pop() {
waker.wake();
} else {
break;
}
}
Poll::Ready(inner)
}
Poll::Pending => {
while let Some(waker) = this.state.send_wakers.pop() {
waker.wake();
}
Poll::Pending
},
}
}
}

View File

@ -1,33 +0,0 @@
use core::cmp::{Ord, Ordering};
use core::ops::Range;
pub fn binary_search_range_by_key<'a, T, B, F>(data: &'a [T], item: &B, mut f: F) -> Range<usize>
where
F: FnMut(&'a T) -> B,
B: Ord,
{
if let Ok(index) = data.binary_search_by_key(item, &mut f) {
let mut begin = index;
let mut end = index + 1;
for i in (0..index).rev() {
if f(unsafe { data.get_unchecked(i) }).cmp(item) != Ordering::Equal {
break;
}
begin = i;
}
for i in end..data.len() {
end = i;
if f(unsafe { data.get_unchecked(i) }).cmp(item) != Ordering::Equal {
break;
}
}
begin..end
} else {
data.len()..data.len()
}
}