Compare commits

...

2 Commits

Author SHA1 Message Date
629e03df8d unordered buffer variant 2021-09-20 17:18:47 +04:00
90d0f781a5 fmt + bechmark example 2021-09-20 17:06:32 +04:00
24 changed files with 478 additions and 261 deletions

View File

@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
description = "MessageBus allows intercommunicate with messages between modules"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
edition = "2021"
[workspace]
members = [

View File

@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
description = "MessageBus allows intercommunicate with messages between modules"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
edition = "2021"
[lib]
proc-macro = true

View File

@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
description = "MessageBus remote allows intercommunicate by messages between instances"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
edition = "2021"
# [features]
# quic = ["quinn"]

168
examples/benchmark.rs Normal file
View File

@ -0,0 +1,168 @@
use std::time::Instant;
use async_trait::async_trait;
use messagebus::{
derive::Message, error, receivers::BufferUnorderedConfig, AsyncHandler, Bus, Message,
};
use thiserror::Error;
#[derive(Debug, Error, messagebus::derive::Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver;
struct TmpReceiver2;
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgF32(f32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU16(u16);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU32(u32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI32(i32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI16(i16);
#[async_trait]
impl AsyncHandler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgF32, bus: &Bus) -> Result<Self::Response, Self::Error> {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
bus.send(MsgU16(1)).await?;
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgU16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgU16, bus: &Bus) -> Result<Self::Response, Self::Error> {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
bus.send(MsgU32(2)).await?;
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgU32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgU32, bus: &Bus) -> Result<Self::Response, Self::Error> {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
bus.send(MsgI32(3)).await?;
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgI32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
bus.send(MsgI16(4)).await?;
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgI16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgI32> for TmpReceiver2 {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
bus.send(MsgI16(5)).await?;
Ok(())
}
}
async fn iter(bus: &Bus) {
for _ in 0..10_000 {
bus.send(MsgF32(0.)).await.unwrap();
}
bus.flush().await;
}
#[tokio::main]
async fn main() {
let cfg = BufferUnorderedConfig {
buffer_size: 8,
max_parallel: 8,
};
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_async::<MsgF32>(cfg.buffer_size as _, cfg)
.subscribe_async::<MsgU16>(cfg.buffer_size as _, cfg)
.subscribe_async::<MsgU32>(cfg.buffer_size as _, cfg)
.subscribe_async::<MsgI32>(cfg.buffer_size as _, cfg)
.subscribe_async::<MsgI16>(cfg.buffer_size as _, cfg)
.done()
.register(TmpReceiver2)
.subscribe_async::<MsgI32>(cfg.buffer_size as _, cfg)
.done()
.build();
iter(&b).await;
let count = 5;
let mut time_sum = 0;
for _ in 0..count {
let inst = Instant::now();
iter(&b).await;
let diff = inst.elapsed();
time_sum += diff.as_micros();
}
println!("Avg time: {:.4}", time_sum as f64 / (count as f64 * 1000.0));
println!("flush");
b.flush().await;
println!("close");
b.close().await;
println!("closed");
poller.await;
println!("[done]");
}

View File

@ -1,7 +1,4 @@
use core::{
marker::PhantomData,
pin::Pin
};
use core::{marker::PhantomData, pin::Pin};
use std::{collections::HashMap, sync::Arc};

View File

@ -1,12 +1,9 @@
use core::{
any::{type_name, Any},
fmt,
any::{Any, type_name},
};
use std::{
borrow::Cow,
sync::Arc,
};
use std::{borrow::Cow, sync::Arc};
pub trait MessageBounds: TypeTagged + fmt::Debug + Unpin + Send + Sync + 'static {}
impl<T: TypeTagged + fmt::Debug + Unpin + Send + Sync + 'static> MessageBounds for T {}

View File

@ -21,29 +21,24 @@ use core::{
sync::atomic::{AtomicBool, AtomicU64, Ordering},
time::Duration,
};
use std::{
collections::HashMap,
sync::Arc,
};
use tokio::sync::Mutex;
use smallvec::SmallVec;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use builder::{BusBuilder, MessageTypeDescriptor};
use receiver::{Receiver, Permit};
use error::{Error, SendError, StdSyncSendError};
use receiver::{Permit, Receiver};
use stats::Stats;
// public
pub use builder::Module;
pub use envelop::{IntoBoxedMessage, Message, MessageBounds, SharedMessage, TypeTag, TypeTagged};
pub use handler::*;
pub use relay::Relay;
pub use receiver::{
Action, Event, ReciveTypedReceiver,
ReciveUntypedReceiver, SendTypedReceiver,
Action, Event, ReciveTypedReceiver, ReciveUntypedReceiver, SendTypedReceiver,
SendUntypedReceiver, TypeTagAccept,
};
pub use relay::Relay;
pub type Untyped = Arc<dyn Any + Send + Sync>;

View File

@ -8,21 +8,17 @@ use crate::{
};
use core::{
any::{Any, TypeId},
sync::atomic::{AtomicBool, AtomicI64, Ordering},
fmt,
marker::PhantomData,
mem,
pin::Pin,
sync::atomic::{AtomicBool, AtomicI64, Ordering},
task::{Context, Poll},
};
use futures::{Future, future::poll_fn, FutureExt};
use std::{
borrow::Cow,
sync::Arc,
};
use futures::{future::poll_fn, Future, FutureExt};
use std::{borrow::Cow, sync::Arc};
use tokio::sync::{oneshot, Notify};
struct SlabCfg;
impl sharded_slab::Config for SlabCfg {
const RESERVED_BITS: usize = 1;
@ -805,7 +801,7 @@ impl Receiver {
req: bool,
) -> Result<(), Error<M>> {
self.inner.increment_processing(&M::type_tag_());
let res = if let Some(any_receiver) = self.inner.typed() {
any_receiver
.cast_send_typed::<M>()

View File

@ -25,13 +25,9 @@ use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_poller_macro!(
T,
AsyncHandler,
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>, task_permit, flush_permit| {
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>| {
tokio::spawn(async move {
let resp = ut.handle(msg, &bus).await;
drop(task_permit);
drop(flush_permit);
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
})
@ -146,9 +142,7 @@ where
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => {
Poll::Ready(event)
}
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
}

View File

@ -1,12 +1,11 @@
mod r#async;
mod sync;
use std::sync::{Arc, atomic::AtomicU64};
use std::sync::atomic::AtomicU64;
pub use r#async::BufferUnorderedAsync;
use serde_derive::{Deserialize, Serialize};
pub use sync::BufferUnorderedSync;
use tokio::sync::{RwLock, Semaphore};
#[derive(Debug)]
pub struct BufferUnorderedStats {
@ -31,70 +30,114 @@ impl Default for BufferUnorderedConfig {
}
}
#[derive(Clone)]
struct ConcurrentState {
flush_lock: Arc<RwLock<()>>,
semaphore: Arc<Semaphore>,
}
#[macro_export]
macro_rules! buffer_unordered_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn buffer_unordered_poller<$t, M, R, E>(
async fn buffer_unordered_poller<$t, M, R, E>(
mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus,
ut: Untyped,
_stats: Arc<BufferUnorderedStats>,
cfg: BufferUnorderedConfig,
stx: mpsc::UnboundedSender<Event<R, E>>,
) -> impl Future<Output = ()>
where
) where
$t: $h<M, Response = R, Error = E> + 'static,
M: Message,
R: Message,
E: StdSyncSendError,
{
use futures::StreamExt;
#[inline(always)]
unsafe fn fix_type<'a, F, T>(x: &'a mut Option<F>) -> Option<Pin<&'a mut (impl Future<Output = T> + Send)>>
where
F: Future<Output = T> + Send,
{
Some(Pin::new_unchecked(x.as_mut()?))
}
let ut = ut.downcast::<$t>().unwrap();
let state = super::ConcurrentState {
flush_lock: Arc::new(tokio::sync::RwLock::new(())),
semaphore: Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel)),
};
let mut queue = futures::stream::FuturesUnordered::new();
let mut sync_future = None;
let mut flushing = false;
let mut closing = false;
let mut count = cfg.max_parallel;
async move {
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) => {
let bus = bus.clone();
let ut = ut.clone();
let stx = stx.clone();
let state = state.clone();
let flush_permit = state.flush_lock.read_owned().await;
let task_permit = state.semaphore.acquire_owned().await;
let _ = ($st1)(mid, msg, bus, ut, stx, task_permit, flush_permit);
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
Request::Action(Action::Close) => { rx.close(); }
Request::Action(Action::Flush) => {
state.flush_lock.write().await;
stx.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {
let lock = state.flush_lock.write().await;
let resp = ($st2)(bus.clone(), ut.clone()).await;
drop(lock);
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
futures::future::poll_fn(move |ctx| 'main: loop {
while let Poll::Ready(res) = queue.poll_next_unpin(ctx) {
if res.is_some() {
count += 1;
} else {
break;
}
}
}
if (closing || flushing || sync_future.is_some()) && count < cfg.max_parallel {
return Poll::Pending
}
if flushing {
flushing = false;
stx.send(Event::Flushed).unwrap();
}
if let Some(fut) = unsafe { fix_type(&mut sync_future) } {
let resp: Result<_, E> = futures::ready!(fut.poll(ctx));
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
let _ = sync_future.take();
}
if closing {
return Poll::Ready(());
}
while count > 0 {
let msg = futures::ready!(rx.poll_recv(ctx));
if let Some(msg) = msg {
match msg {
Request::Request(mid, msg, _req) => {
count -= 1;
queue.push(($st1)(
mid,
msg,
bus.clone(),
ut.clone(),
stx.clone(),
));
ctx.waker().wake_by_ref();
}
Request::Action(Action::Init) => {
stx.send(Event::Ready).unwrap();
}
Request::Action(Action::Close) => {
rx.close();
}
Request::Action(Action::Flush) => {
flushing = true;
continue 'main;
}
Request::Action(Action::Sync) => {
sync_future.replace(($st2)(bus.clone(), ut.clone()));
continue 'main;
}
_ => unimplemented!(),
}
} else {
closing = true;
continue 'main;
}
}
return Poll::Pending;
}).await;
}
};
}

View File

@ -24,16 +24,14 @@ use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_poller_macro!(
T,
Handler,
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>, task_permit, flush_permit|
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>| {
tokio::task::spawn_blocking(move || {
let resp = ut.handle(msg, &bus);
drop(task_permit);
drop(flush_permit);
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
}),
})
},
|bus, ut: Arc<T>| async move {
tokio::task::spawn_blocking(move || ut.sync(&bus))
.await

View File

@ -24,12 +24,10 @@ use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_batch_poller_macro!(
T,
AsyncBatchHandler,
|mids: Vec<_>, msgs, bus, ut: Arc<T>, flush_permit, task_permit, stx: UnboundedSender<_>| {
|mids: Vec<_>, msgs, bus, ut: Arc<T>, task_permit, stx: UnboundedSender<_>| {
tokio::spawn(async move {
let resp = ut.handle(msgs, &bus).await;
drop(task_permit);
drop(flush_permit);
crate::process_batch_result!(resp, mids, stx);
})

View File

@ -1,12 +1,11 @@
mod r#async;
mod sync;
use std::sync::{Arc, atomic::AtomicU64};
use std::sync::atomic::AtomicU64;
pub use r#async::BufferUnorderedBatchedAsync;
use serde_derive::{Deserialize, Serialize};
pub use sync::BufferUnorderedBatchedSync;
use tokio::sync::{RwLock, Semaphore};
#[derive(Debug)]
pub struct BufferUnorderedBatchedStats {
@ -37,89 +36,80 @@ impl Default for BufferUnorderedBatchedConfig {
}
}
#[derive(Clone)]
struct ConcurrentState {
flush_lock: Arc<RwLock<()>>,
semaphore: Arc<Semaphore>,
}
#[macro_export]
macro_rules! buffer_unordered_batch_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn buffer_unordered_batch_poller<$t, M, R>(
async fn buffer_unordered_batch_poller<$t, M, R>(
mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus,
ut: Untyped,
_stats: Arc<BufferUnorderedBatchedStats>,
cfg: BufferUnorderedBatchedConfig,
stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
) -> impl Future<Output = ()>
where
) where
$t: $h<M, Response = R> + 'static,
M: Message,
R: Message,
{
let ut = ut.downcast::<$t>().unwrap();
let semaphore = Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel));
let state = super::ConcurrentState {
flush_lock: Arc::new(tokio::sync::RwLock::new(())),
semaphore: Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel)),
};
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
async move {
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let semaphore = semaphore.clone();
let stx = stx.clone();
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let state = state.clone();
let stx = stx.clone();
match msg {
Request::Request(mid, msg, req) => {
buffer_mid.push((mid, req));
buffer.push(msg);
match msg {
Request::Request(mid, msg, req) => {
buffer_mid.push((mid, req));
buffer.push(msg);
if buffer_mid.len() >= cfg.batch_size {
let task_permit = state.semaphore.acquire_owned().await;
let flush_permit = state.flush_lock.read_owned().await;
if buffer_mid.len() >= cfg.batch_size {
let task_permit = semaphore.acquire_owned().await;
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, flush_permit, task_permit, stx);
}
let _ =
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
Request::Action(Action::Close) => { rx.close(); }
Request::Action(Action::Flush) => {
let stx_clone = stx.clone();
if !buffer_mid.is_empty() {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let flush_permit = state.flush_lock.clone().read_owned().await;
let task_permit = state.semaphore.acquire_owned().await;
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, flush_permit, task_permit, stx);
}
state.flush_lock.write().await;
stx_clone.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {
let lock = state.flush_lock.write().await;
let resp = ($st2)(bus.clone(), ut.clone()).await;
drop(lock);
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
}
Request::Action(Action::Init) => {
stx.send(Event::Ready).unwrap();
}
Request::Action(Action::Close) => {
rx.close();
}
Request::Action(Action::Flush) => {
let stx_clone = stx.clone();
if !buffer_mid.is_empty() {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let task_permit = semaphore.clone().acquire_owned().await;
let _ =
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
}
let _ = semaphore.acquire_many(cfg.max_parallel as _).await;
stx_clone.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {
let lock = semaphore.acquire_many(cfg.max_parallel as _).await;
let resp = ($st2)(bus.clone(), ut.clone()).await;
drop(lock);
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
}
}
}

View File

@ -24,12 +24,10 @@ use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_batch_poller_macro!(
T,
BatchHandler,
|mids: Vec<_>, msgs, bus, ut: Arc<T>, flush_permit, task_permit, stx: UnboundedSender<_>| {
|mids: Vec<_>, msgs, bus, ut: Arc<T>, task_permit, stx: UnboundedSender<_>| {
tokio::task::spawn_blocking(move || {
let resp = ut.handle(msgs, &bus);
drop(task_permit);
drop(flush_permit);
crate::process_batch_result!(resp, mids, stx);
})

View File

@ -15,7 +15,6 @@ pub use synchronize_batched::{
use crate::receiver::Action;
#[macro_export]
macro_rules! process_batch_result {
($resp: expr, $mids: expr, $stx: expr) => {
@ -28,8 +27,7 @@ macro_rules! process_batch_result {
while let Some((mid, _req)) = mids.next() {
if let Some(r) = re.next() {
$stx.send(Event::Response(mid, Ok(r)))
.unwrap();
$stx.send(Event::Response(mid, Ok(r))).unwrap();
} else {
$stx.send(Event::Response(mid, Err(Error::NoResponse)))
.unwrap();
@ -38,10 +36,8 @@ macro_rules! process_batch_result {
}
Err(er) => {
for (mid, _req) in mids {
$stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
)).unwrap();
$stx.send(Event::Response(mid, Err(Error::Other(er.clone()))))
.unwrap();
}
$stx.send(Event::Error(er)).unwrap();

View File

@ -17,7 +17,10 @@ use futures::Future;
use super::SynchronizedBatchedConfig;
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
};
batch_synchronized_poller_macro! {
T,

View File

@ -35,15 +35,14 @@ impl Default for SynchronizedBatchedConfig {
#[macro_export]
macro_rules! batch_synchronized_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn batch_synchronized_poller<$t, M, R>(
async fn batch_synchronized_poller<$t, M, R>(
mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus,
ut: Untyped,
// stats: Arc<SynchronizedBatchedStats>,
cfg: SynchronizedBatchedConfig,
stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
) -> impl Future<Output = ()>
where
) where
$t: $h<M, Response = R> + 'static,
$t::Error: StdSyncSendError + Clone,
M: Message,
@ -51,50 +50,52 @@ macro_rules! batch_synchronized_poller_macro {
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
async move {
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let stx = stx.clone();
match msg {
Request::Request(mid, msg, req) => {
buffer_mid.push((mid, req));
buffer.push(msg);
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let stx = stx.clone();
if buffer_mid.len() >= cfg.batch_size {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
match msg {
Request::Request(mid, msg, req) => {
buffer_mid.push((mid, req));
buffer.push(msg);
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
}
if buffer_mid.len() >= cfg.batch_size {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
Request::Action(Action::Close) => { rx.close(); }
Request::Action(Action::Flush) => {
let stx_clone = stx.clone();
if !buffer_mid.is_empty() {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
}
stx_clone.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {
let resp = ($st2)(bus.clone(), ut.clone()).await;
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
}
Request::Action(Action::Init) => {
stx.send(Event::Ready).unwrap();
}
Request::Action(Action::Close) => {
rx.close();
}
Request::Action(Action::Flush) => {
let stx_clone = stx.clone();
if !buffer_mid.is_empty() {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
}
stx_clone.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {
let resp = ($st2)(bus.clone(), ut.clone()).await;
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
}
}
}

View File

@ -15,7 +15,10 @@ use crate::{
use super::SynchronizedBatchedConfig;
use futures::{executor::block_on, Future};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
};
batch_synchronized_poller_macro! {
T,

View File

@ -15,7 +15,10 @@ use crate::{
receivers::Request,
AsyncSynchronizedHandler, Bus, Message, Untyped,
};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
};
synchronized_poller_macro! {
T,

View File

@ -27,13 +27,12 @@ impl Default for SynchronizedConfig {
#[macro_export]
macro_rules! synchronized_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn synchronized_poller<$t, M, R>(
async fn synchronized_poller<$t, M, R>(
mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus,
ut: Untyped,
stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
) -> impl Future<Output = ()>
where
) where
$t: $h<M, Response = R> + 'static,
$t::Error: StdSyncSendError,
M: Message,
@ -41,25 +40,29 @@ macro_rules! synchronized_poller_macro {
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
async move {
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) => {
($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone())
.await
.unwrap()
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
Request::Action(Action::Close) => { rx.close(); }
Request::Action(Action::Flush) => { stx.send(Event::Flushed).unwrap(); }
Request::Action(Action::Sync) => {
let resp = ($st2)(bus.clone(), ut.clone()).await;
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) => {
($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone())
.await
.unwrap()
}
Request::Action(Action::Init) => {
stx.send(Event::Ready).unwrap();
}
Request::Action(Action::Close) => {
rx.close();
}
Request::Action(Action::Flush) => {
stx.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {
let resp = ($st2)(bus.clone(), ut.clone()).await;
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}
_ => unimplemented!(),
}
}
}

View File

@ -15,7 +15,10 @@ use crate::{
receivers::Request,
Bus, Message, SynchronizedHandler, Untyped,
};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
};
synchronized_poller_macro! {
T,

View File

@ -7,13 +7,13 @@ use crate::{
stats::Stats,
Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag,
};
use dashmap::DashMap;
use std::sync::Arc;
use core::{
pin::Pin,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
};
use dashmap::DashMap;
use futures::{future::poll_fn, Future};
use std::sync::Arc;
use tokio::sync::{oneshot, Notify};
pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUntypedReceiver + 'static {}
@ -217,7 +217,10 @@ where
}
fn increment_processing(&self, tt: &TypeTag) {
self.context.receivers.get(tt).map(|r|r.processing.fetch_add(1, Ordering::SeqCst));
self.context
.receivers
.get(tt)
.map(|r| r.processing.fetch_add(1, Ordering::SeqCst));
}
fn start_polling(

View File

@ -1,7 +1,12 @@
use std::sync::Arc;
use async_trait::async_trait;
use messagebus::{AsyncBatchHandler, Bus, Message, derive::{Error as MbError, Message}, error, receivers::BufferUnorderedBatchedConfig};
use messagebus::{
derive::{Error as MbError, Message},
error,
receivers::BufferUnorderedBatchedConfig,
AsyncBatchHandler, Bus, Message,
};
use parking_lot::Mutex;
use thiserror::Error;
@ -26,7 +31,7 @@ struct MsgI32(i32);
struct MsgI16(i16);
struct TmpReceiver {
batches: Arc<Mutex<Vec<Vec<i32>>>>
batches: Arc<Mutex<Vec<Vec<i32>>>>,
}
#[async_trait]
@ -41,7 +46,9 @@ impl AsyncBatchHandler<MsgI32> for TmpReceiver {
msg: Vec<MsgI32>,
_bus: &Bus,
) -> Result<Vec<Self::Response>, Self::Error> {
self.batches.lock().push(msg.into_iter().map(|x|x.0).collect());
self.batches
.lock()
.push(msg.into_iter().map(|x| x.0).collect());
Ok(vec![])
}
@ -52,18 +59,23 @@ async fn test_batch() {
let batches = Arc::new(Mutex::new(Vec::new()));
let (b, poller) = Bus::build()
.register(TmpReceiver { batches: batches.clone() })
.subscribe_batch_async::<MsgI32>(16, BufferUnorderedBatchedConfig {
batch_size: 8,
..Default::default()
.register(TmpReceiver {
batches: batches.clone(),
})
.subscribe_batch_async::<MsgI32>(
16,
BufferUnorderedBatchedConfig {
batch_size: 8,
..Default::default()
},
)
.done()
.build();
for i in 1..100i32 {
b.send(MsgI32(i)).await.unwrap();
}
let mut re = Vec::new();
let mut counter = 1i32;
for _ in 1..100i32 {

View File

@ -1,9 +1,13 @@
use std::{sync::atomic::AtomicU32, time::Duration};
use messagebus::{AsyncHandler, Bus, Message, Module, derive::{Error as MbError, Message}, error, receivers::BufferUnorderedConfig};
use thiserror::Error;
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error,
receivers::BufferUnorderedConfig,
AsyncHandler, Bus, Message, Module,
};
use thiserror::Error;
#[derive(Debug, Error, MbError)]
enum Error {
@ -20,7 +24,6 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Debug, Clone, Message)]
struct Req(pub u32);
@ -34,7 +37,7 @@ struct GetCount;
struct CountResult(pub u32);
struct TmpReceiver {
counter: AtomicU32
counter: AtomicU32,
}
#[async_trait]
@ -43,10 +46,9 @@ impl AsyncHandler<Req> for TmpReceiver {
type Response = ();
async fn handle(&self, msg: Req, bus: &Bus) -> Result<Self::Response, Self::Error> {
tokio::time::sleep(Duration::from_millis((msg.0 % 20) as _))
.await;
tokio::time::sleep(Duration::from_millis((msg.0 % 20) as _)).await;
if msg.0 % 128 == 0 {
if msg.0 % 128 == 0 {
return Err(Error::MyError);
} else {
bus.send(Resp(msg.0)).await?;
@ -62,7 +64,8 @@ impl AsyncHandler<Resp> for TmpReceiver {
type Response = ();
async fn handle(&self, _msg: Resp, _bus: &Bus) -> Result<Self::Response, Self::Error> {
self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
}
@ -73,17 +76,24 @@ impl AsyncHandler<GetCount> for TmpReceiver {
type Response = CountResult;
async fn handle(&self, _: GetCount, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(CountResult(self.counter.load(std::sync::atomic::Ordering::SeqCst)))
Ok(CountResult(
self.counter.load(std::sync::atomic::Ordering::SeqCst),
))
}
}
fn module() -> Module {
Module::new()
.register(TmpReceiver { counter: AtomicU32::new(0) })
.subscribe_async::<Req>(1024, BufferUnorderedConfig {
buffer_size: 1024,
max_parallel: 1024,
.register(TmpReceiver {
counter: AtomicU32::new(0),
})
.subscribe_async::<Req>(
1024,
BufferUnorderedConfig {
buffer_size: 1024,
max_parallel: 1024,
},
)
.subscribe_async::<Resp>(1024, Default::default())
.subscribe_async::<GetCount>(8, Default::default())
.done()
@ -93,7 +103,7 @@ fn module() -> Module {
async fn test_sync() {
let (b, poller) = Bus::build().add_module(module()).build();
let cnt = 4u32;
for i in 0..cnt{
for i in 0..cnt {
for j in 0..32768 {
b.send(Req(i * 128 + j)).await.unwrap();
}
@ -106,8 +116,14 @@ async fn test_sync() {
b.flush().await;
println!("flushed");
assert_eq!(b.request_we::<_, CountResult, Error>(GetCount, Default::default()).await.unwrap().0, cnt * 32768 - cnt * 256);
assert_eq!(
b.request_we::<_, CountResult, Error>(GetCount, Default::default())
.await
.unwrap()
.0,
cnt * 32768 - cnt * 256
);
b.close().await;