Compare commits
2 Commits
master
...
629e03df8d
Author | SHA1 | Date | |
---|---|---|---|
629e03df8d | |||
90d0f781a5 |
@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
description = "MessageBus allows intercommunicate with messages between modules"
|
||||
license = "MIT OR Apache-2.0"
|
||||
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[workspace]
|
||||
members = [
|
||||
|
@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
description = "MessageBus allows intercommunicate with messages between modules"
|
||||
license = "MIT OR Apache-2.0"
|
||||
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
proc-macro = true
|
||||
|
@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
|
||||
description = "MessageBus remote allows intercommunicate by messages between instances"
|
||||
license = "MIT OR Apache-2.0"
|
||||
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
# [features]
|
||||
# quic = ["quinn"]
|
||||
|
168
examples/benchmark.rs
Normal file
168
examples/benchmark.rs
Normal file
@ -0,0 +1,168 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use messagebus::{
|
||||
derive::Message, error, receivers::BufferUnorderedConfig, AsyncHandler, Bus, Message,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error, messagebus::derive::Error)]
|
||||
enum Error {
|
||||
#[error("Error({0})")]
|
||||
Error(anyhow::Error),
|
||||
}
|
||||
|
||||
impl<M: Message> From<error::Error<M>> for Error {
|
||||
fn from(err: error::Error<M>) -> Self {
|
||||
Self::Error(err.into())
|
||||
}
|
||||
}
|
||||
|
||||
struct TmpReceiver;
|
||||
struct TmpReceiver2;
|
||||
|
||||
#[derive(Debug, Clone, Message)]
|
||||
#[message(clone)]
|
||||
struct MsgF32(f32);
|
||||
|
||||
#[derive(Debug, Clone, Message)]
|
||||
#[message(clone)]
|
||||
struct MsgU16(u16);
|
||||
|
||||
#[derive(Debug, Clone, Message)]
|
||||
#[message(clone)]
|
||||
struct MsgU32(u32);
|
||||
|
||||
#[derive(Debug, Clone, Message)]
|
||||
#[message(clone)]
|
||||
struct MsgI32(i32);
|
||||
|
||||
#[derive(Debug, Clone, Message)]
|
||||
#[message(clone)]
|
||||
struct MsgI16(i16);
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncHandler<MsgF32> for TmpReceiver {
|
||||
type Error = Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: MsgF32, bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||
bus.send(MsgU16(1)).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncHandler<MsgU16> for TmpReceiver {
|
||||
type Error = Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: MsgU16, bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||
bus.send(MsgU32(2)).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncHandler<MsgU32> for TmpReceiver {
|
||||
type Error = Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: MsgU32, bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||
bus.send(MsgI32(3)).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncHandler<MsgI32> for TmpReceiver {
|
||||
type Error = Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||
bus.send(MsgI16(4)).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncHandler<MsgI16> for TmpReceiver {
|
||||
type Error = Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AsyncHandler<MsgI32> for TmpReceiver2 {
|
||||
type Error = Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||
bus.send(MsgI16(5)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn iter(bus: &Bus) {
|
||||
for _ in 0..10_000 {
|
||||
bus.send(MsgF32(0.)).await.unwrap();
|
||||
}
|
||||
|
||||
bus.flush().await;
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let cfg = BufferUnorderedConfig {
|
||||
buffer_size: 8,
|
||||
max_parallel: 8,
|
||||
};
|
||||
|
||||
let (b, poller) = Bus::build()
|
||||
.register(TmpReceiver)
|
||||
.subscribe_async::<MsgF32>(cfg.buffer_size as _, cfg)
|
||||
.subscribe_async::<MsgU16>(cfg.buffer_size as _, cfg)
|
||||
.subscribe_async::<MsgU32>(cfg.buffer_size as _, cfg)
|
||||
.subscribe_async::<MsgI32>(cfg.buffer_size as _, cfg)
|
||||
.subscribe_async::<MsgI16>(cfg.buffer_size as _, cfg)
|
||||
.done()
|
||||
.register(TmpReceiver2)
|
||||
.subscribe_async::<MsgI32>(cfg.buffer_size as _, cfg)
|
||||
.done()
|
||||
.build();
|
||||
|
||||
iter(&b).await;
|
||||
|
||||
let count = 5;
|
||||
|
||||
let mut time_sum = 0;
|
||||
for _ in 0..count {
|
||||
let inst = Instant::now();
|
||||
iter(&b).await;
|
||||
let diff = inst.elapsed();
|
||||
|
||||
time_sum += diff.as_micros();
|
||||
}
|
||||
|
||||
println!("Avg time: {:.4}", time_sum as f64 / (count as f64 * 1000.0));
|
||||
|
||||
println!("flush");
|
||||
b.flush().await;
|
||||
|
||||
println!("close");
|
||||
b.close().await;
|
||||
|
||||
println!("closed");
|
||||
|
||||
poller.await;
|
||||
println!("[done]");
|
||||
}
|
@ -1,7 +1,4 @@
|
||||
use core::{
|
||||
marker::PhantomData,
|
||||
pin::Pin
|
||||
};
|
||||
use core::{marker::PhantomData, pin::Pin};
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
|
@ -1,12 +1,9 @@
|
||||
use core::{
|
||||
any::{type_name, Any},
|
||||
fmt,
|
||||
any::{Any, type_name},
|
||||
};
|
||||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
|
||||
pub trait MessageBounds: TypeTagged + fmt::Debug + Unpin + Send + Sync + 'static {}
|
||||
impl<T: TypeTagged + fmt::Debug + Unpin + Send + Sync + 'static> MessageBounds for T {}
|
||||
|
15
src/lib.rs
15
src/lib.rs
@ -21,29 +21,24 @@ use core::{
|
||||
sync::atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
use smallvec::SmallVec;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use builder::{BusBuilder, MessageTypeDescriptor};
|
||||
use receiver::{Receiver, Permit};
|
||||
use error::{Error, SendError, StdSyncSendError};
|
||||
use receiver::{Permit, Receiver};
|
||||
use stats::Stats;
|
||||
|
||||
|
||||
// public
|
||||
pub use builder::Module;
|
||||
pub use envelop::{IntoBoxedMessage, Message, MessageBounds, SharedMessage, TypeTag, TypeTagged};
|
||||
pub use handler::*;
|
||||
pub use relay::Relay;
|
||||
pub use receiver::{
|
||||
Action, Event, ReciveTypedReceiver,
|
||||
ReciveUntypedReceiver, SendTypedReceiver,
|
||||
Action, Event, ReciveTypedReceiver, ReciveUntypedReceiver, SendTypedReceiver,
|
||||
SendUntypedReceiver, TypeTagAccept,
|
||||
};
|
||||
pub use relay::Relay;
|
||||
|
||||
pub type Untyped = Arc<dyn Any + Send + Sync>;
|
||||
|
||||
|
@ -8,21 +8,17 @@ use crate::{
|
||||
};
|
||||
use core::{
|
||||
any::{Any, TypeId},
|
||||
sync::atomic::{AtomicBool, AtomicI64, Ordering},
|
||||
fmt,
|
||||
marker::PhantomData,
|
||||
mem,
|
||||
pin::Pin,
|
||||
sync::atomic::{AtomicBool, AtomicI64, Ordering},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use futures::{Future, future::poll_fn, FutureExt};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
sync::Arc,
|
||||
};
|
||||
use futures::{future::poll_fn, Future, FutureExt};
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use tokio::sync::{oneshot, Notify};
|
||||
|
||||
|
||||
struct SlabCfg;
|
||||
impl sharded_slab::Config for SlabCfg {
|
||||
const RESERVED_BITS: usize = 1;
|
||||
|
@ -25,13 +25,9 @@ use tokio::sync::mpsc::{self, UnboundedSender};
|
||||
buffer_unordered_poller_macro!(
|
||||
T,
|
||||
AsyncHandler,
|
||||
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>, task_permit, flush_permit| {
|
||||
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>| {
|
||||
tokio::spawn(async move {
|
||||
let resp = ut.handle(msg, &bus).await;
|
||||
|
||||
drop(task_permit);
|
||||
drop(flush_permit);
|
||||
|
||||
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
})
|
||||
@ -146,9 +142,7 @@ where
|
||||
let poll = self.srx.lock().poll_recv(ctx);
|
||||
match poll {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(event)) => {
|
||||
Poll::Ready(event)
|
||||
}
|
||||
Poll::Ready(Some(event)) => Poll::Ready(event),
|
||||
Poll::Ready(None) => Poll::Ready(Event::Exited),
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,11 @@
|
||||
mod r#async;
|
||||
mod sync;
|
||||
|
||||
use std::sync::{Arc, atomic::AtomicU64};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
pub use r#async::BufferUnorderedAsync;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
pub use sync::BufferUnorderedSync;
|
||||
use tokio::sync::{RwLock, Semaphore};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BufferUnorderedStats {
|
||||
@ -31,70 +30,114 @@ impl Default for BufferUnorderedConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ConcurrentState {
|
||||
flush_lock: Arc<RwLock<()>>,
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! buffer_unordered_poller_macro {
|
||||
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
|
||||
fn buffer_unordered_poller<$t, M, R, E>(
|
||||
async fn buffer_unordered_poller<$t, M, R, E>(
|
||||
mut rx: mpsc::UnboundedReceiver<Request<M>>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
_stats: Arc<BufferUnorderedStats>,
|
||||
cfg: BufferUnorderedConfig,
|
||||
stx: mpsc::UnboundedSender<Event<R, E>>,
|
||||
) -> impl Future<Output = ()>
|
||||
where
|
||||
) where
|
||||
$t: $h<M, Response = R, Error = E> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: StdSyncSendError,
|
||||
{
|
||||
use futures::StreamExt;
|
||||
|
||||
#[inline(always)]
|
||||
unsafe fn fix_type<'a, F, T>(x: &'a mut Option<F>) -> Option<Pin<&'a mut (impl Future<Output = T> + Send)>>
|
||||
where
|
||||
F: Future<Output = T> + Send,
|
||||
{
|
||||
Some(Pin::new_unchecked(x.as_mut()?))
|
||||
}
|
||||
|
||||
let ut = ut.downcast::<$t>().unwrap();
|
||||
|
||||
let state = super::ConcurrentState {
|
||||
flush_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
semaphore: Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel)),
|
||||
};
|
||||
let mut queue = futures::stream::FuturesUnordered::new();
|
||||
let mut sync_future = None;
|
||||
let mut flushing = false;
|
||||
let mut closing = false;
|
||||
let mut count = cfg.max_parallel;
|
||||
|
||||
async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
match msg {
|
||||
Request::Request(mid, msg, _req) => {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
let stx = stx.clone();
|
||||
let state = state.clone();
|
||||
|
||||
let flush_permit = state.flush_lock.read_owned().await;
|
||||
let task_permit = state.semaphore.acquire_owned().await;
|
||||
|
||||
let _ = ($st1)(mid, msg, bus, ut, stx, task_permit, flush_permit);
|
||||
}
|
||||
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
|
||||
Request::Action(Action::Close) => { rx.close(); }
|
||||
Request::Action(Action::Flush) => {
|
||||
state.flush_lock.write().await;
|
||||
stx.send(Event::Flushed).unwrap();
|
||||
}
|
||||
|
||||
Request::Action(Action::Sync) => {
|
||||
let lock = state.flush_lock.write().await;
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
drop(lock);
|
||||
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
futures::future::poll_fn(move |ctx| 'main: loop {
|
||||
while let Poll::Ready(res) = queue.poll_next_unpin(ctx) {
|
||||
if res.is_some() {
|
||||
count += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (closing || flushing || sync_future.is_some()) && count < cfg.max_parallel {
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
if flushing {
|
||||
flushing = false;
|
||||
stx.send(Event::Flushed).unwrap();
|
||||
}
|
||||
|
||||
if let Some(fut) = unsafe { fix_type(&mut sync_future) } {
|
||||
let resp: Result<_, E> = futures::ready!(fut.poll(ctx));
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
|
||||
let _ = sync_future.take();
|
||||
}
|
||||
|
||||
if closing {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
while count > 0 {
|
||||
let msg = futures::ready!(rx.poll_recv(ctx));
|
||||
|
||||
if let Some(msg) = msg {
|
||||
match msg {
|
||||
Request::Request(mid, msg, _req) => {
|
||||
count -= 1;
|
||||
|
||||
queue.push(($st1)(
|
||||
mid,
|
||||
msg,
|
||||
bus.clone(),
|
||||
ut.clone(),
|
||||
stx.clone(),
|
||||
));
|
||||
|
||||
ctx.waker().wake_by_ref();
|
||||
}
|
||||
Request::Action(Action::Init) => {
|
||||
stx.send(Event::Ready).unwrap();
|
||||
}
|
||||
Request::Action(Action::Close) => {
|
||||
rx.close();
|
||||
}
|
||||
Request::Action(Action::Flush) => {
|
||||
flushing = true;
|
||||
continue 'main;
|
||||
}
|
||||
|
||||
Request::Action(Action::Sync) => {
|
||||
sync_future.replace(($st2)(bus.clone(), ut.clone()));
|
||||
continue 'main;
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
} else {
|
||||
closing = true;
|
||||
continue 'main;
|
||||
}
|
||||
}
|
||||
|
||||
return Poll::Pending;
|
||||
}).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -24,16 +24,14 @@ use tokio::sync::mpsc::{self, UnboundedSender};
|
||||
buffer_unordered_poller_macro!(
|
||||
T,
|
||||
Handler,
|
||||
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>, task_permit, flush_permit|
|
||||
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>| {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let resp = ut.handle(msg, &bus);
|
||||
|
||||
drop(task_permit);
|
||||
drop(flush_permit);
|
||||
|
||||
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}),
|
||||
})
|
||||
},
|
||||
|bus, ut: Arc<T>| async move {
|
||||
tokio::task::spawn_blocking(move || ut.sync(&bus))
|
||||
.await
|
||||
|
@ -24,12 +24,10 @@ use tokio::sync::mpsc::{self, UnboundedSender};
|
||||
buffer_unordered_batch_poller_macro!(
|
||||
T,
|
||||
AsyncBatchHandler,
|
||||
|mids: Vec<_>, msgs, bus, ut: Arc<T>, flush_permit, task_permit, stx: UnboundedSender<_>| {
|
||||
|mids: Vec<_>, msgs, bus, ut: Arc<T>, task_permit, stx: UnboundedSender<_>| {
|
||||
tokio::spawn(async move {
|
||||
let resp = ut.handle(msgs, &bus).await;
|
||||
|
||||
drop(task_permit);
|
||||
drop(flush_permit);
|
||||
|
||||
crate::process_batch_result!(resp, mids, stx);
|
||||
})
|
||||
|
@ -1,12 +1,11 @@
|
||||
mod r#async;
|
||||
mod sync;
|
||||
|
||||
use std::sync::{Arc, atomic::AtomicU64};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
pub use r#async::BufferUnorderedBatchedAsync;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
pub use sync::BufferUnorderedBatchedSync;
|
||||
use tokio::sync::{RwLock, Semaphore};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BufferUnorderedBatchedStats {
|
||||
@ -37,89 +36,80 @@ impl Default for BufferUnorderedBatchedConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ConcurrentState {
|
||||
flush_lock: Arc<RwLock<()>>,
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! buffer_unordered_batch_poller_macro {
|
||||
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
|
||||
fn buffer_unordered_batch_poller<$t, M, R>(
|
||||
async fn buffer_unordered_batch_poller<$t, M, R>(
|
||||
mut rx: mpsc::UnboundedReceiver<Request<M>>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
_stats: Arc<BufferUnorderedBatchedStats>,
|
||||
cfg: BufferUnorderedBatchedConfig,
|
||||
stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
|
||||
) -> impl Future<Output = ()>
|
||||
where
|
||||
) where
|
||||
$t: $h<M, Response = R> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
{
|
||||
let ut = ut.downcast::<$t>().unwrap();
|
||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel));
|
||||
|
||||
let state = super::ConcurrentState {
|
||||
flush_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
semaphore: Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel)),
|
||||
};
|
||||
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
|
||||
let mut buffer = Vec::with_capacity(cfg.batch_size);
|
||||
|
||||
async move {
|
||||
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
|
||||
let mut buffer = Vec::with_capacity(cfg.batch_size);
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
let semaphore = semaphore.clone();
|
||||
let stx = stx.clone();
|
||||
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
let state = state.clone();
|
||||
let stx = stx.clone();
|
||||
match msg {
|
||||
Request::Request(mid, msg, req) => {
|
||||
buffer_mid.push((mid, req));
|
||||
buffer.push(msg);
|
||||
|
||||
match msg {
|
||||
Request::Request(mid, msg, req) => {
|
||||
buffer_mid.push((mid, req));
|
||||
buffer.push(msg);
|
||||
if buffer_mid.len() >= cfg.batch_size {
|
||||
let task_permit = semaphore.acquire_owned().await;
|
||||
|
||||
if buffer_mid.len() >= cfg.batch_size {
|
||||
let task_permit = state.semaphore.acquire_owned().await;
|
||||
let flush_permit = state.flush_lock.read_owned().await;
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
|
||||
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, flush_permit, task_permit, stx);
|
||||
}
|
||||
let _ =
|
||||
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
|
||||
}
|
||||
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
|
||||
Request::Action(Action::Close) => { rx.close(); }
|
||||
Request::Action(Action::Flush) => {
|
||||
let stx_clone = stx.clone();
|
||||
|
||||
if !buffer_mid.is_empty() {
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
|
||||
let flush_permit = state.flush_lock.clone().read_owned().await;
|
||||
let task_permit = state.semaphore.acquire_owned().await;
|
||||
|
||||
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, flush_permit, task_permit, stx);
|
||||
}
|
||||
|
||||
state.flush_lock.write().await;
|
||||
stx_clone.send(Event::Flushed).unwrap();
|
||||
}
|
||||
|
||||
Request::Action(Action::Sync) => {
|
||||
let lock = state.flush_lock.write().await;
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
drop(lock);
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
Request::Action(Action::Init) => {
|
||||
stx.send(Event::Ready).unwrap();
|
||||
}
|
||||
Request::Action(Action::Close) => {
|
||||
rx.close();
|
||||
}
|
||||
Request::Action(Action::Flush) => {
|
||||
let stx_clone = stx.clone();
|
||||
|
||||
if !buffer_mid.is_empty() {
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
let task_permit = semaphore.clone().acquire_owned().await;
|
||||
|
||||
let _ =
|
||||
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
|
||||
}
|
||||
|
||||
let _ = semaphore.acquire_many(cfg.max_parallel as _).await;
|
||||
stx_clone.send(Event::Flushed).unwrap();
|
||||
}
|
||||
|
||||
Request::Action(Action::Sync) => {
|
||||
let lock = semaphore.acquire_many(cfg.max_parallel as _).await;
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
drop(lock);
|
||||
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,12 +24,10 @@ use tokio::sync::mpsc::{self, UnboundedSender};
|
||||
buffer_unordered_batch_poller_macro!(
|
||||
T,
|
||||
BatchHandler,
|
||||
|mids: Vec<_>, msgs, bus, ut: Arc<T>, flush_permit, task_permit, stx: UnboundedSender<_>| {
|
||||
|mids: Vec<_>, msgs, bus, ut: Arc<T>, task_permit, stx: UnboundedSender<_>| {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let resp = ut.handle(msgs, &bus);
|
||||
|
||||
drop(task_permit);
|
||||
drop(flush_permit);
|
||||
|
||||
crate::process_batch_result!(resp, mids, stx);
|
||||
})
|
||||
|
@ -15,7 +15,6 @@ pub use synchronize_batched::{
|
||||
|
||||
use crate::receiver::Action;
|
||||
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! process_batch_result {
|
||||
($resp: expr, $mids: expr, $stx: expr) => {
|
||||
@ -28,8 +27,7 @@ macro_rules! process_batch_result {
|
||||
|
||||
while let Some((mid, _req)) = mids.next() {
|
||||
if let Some(r) = re.next() {
|
||||
$stx.send(Event::Response(mid, Ok(r)))
|
||||
.unwrap();
|
||||
$stx.send(Event::Response(mid, Ok(r))).unwrap();
|
||||
} else {
|
||||
$stx.send(Event::Response(mid, Err(Error::NoResponse)))
|
||||
.unwrap();
|
||||
@ -38,10 +36,8 @@ macro_rules! process_batch_result {
|
||||
}
|
||||
Err(er) => {
|
||||
for (mid, _req) in mids {
|
||||
$stx.send(Event::Response(
|
||||
mid,
|
||||
Err(Error::Other(er.clone())),
|
||||
)).unwrap();
|
||||
$stx.send(Event::Response(mid, Err(Error::Other(er.clone()))))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
$stx.send(Event::Error(er)).unwrap();
|
||||
|
@ -17,7 +17,10 @@ use futures::Future;
|
||||
|
||||
use super::SynchronizedBatchedConfig;
|
||||
|
||||
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedSender},
|
||||
Mutex,
|
||||
};
|
||||
|
||||
batch_synchronized_poller_macro! {
|
||||
T,
|
||||
|
@ -35,15 +35,14 @@ impl Default for SynchronizedBatchedConfig {
|
||||
#[macro_export]
|
||||
macro_rules! batch_synchronized_poller_macro {
|
||||
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
|
||||
fn batch_synchronized_poller<$t, M, R>(
|
||||
async fn batch_synchronized_poller<$t, M, R>(
|
||||
mut rx: mpsc::UnboundedReceiver<Request<M>>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
// stats: Arc<SynchronizedBatchedStats>,
|
||||
cfg: SynchronizedBatchedConfig,
|
||||
stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
|
||||
) -> impl Future<Output = ()>
|
||||
where
|
||||
) where
|
||||
$t: $h<M, Response = R> + 'static,
|
||||
$t::Error: StdSyncSendError + Clone,
|
||||
M: Message,
|
||||
@ -51,50 +50,52 @@ macro_rules! batch_synchronized_poller_macro {
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
|
||||
async move {
|
||||
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
|
||||
let mut buffer = Vec::with_capacity(cfg.batch_size);
|
||||
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
|
||||
let mut buffer = Vec::with_capacity(cfg.batch_size);
|
||||
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
let stx = stx.clone();
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
let stx = stx.clone();
|
||||
|
||||
match msg {
|
||||
Request::Request(mid, msg, req) => {
|
||||
buffer_mid.push((mid, req));
|
||||
buffer.push(msg);
|
||||
match msg {
|
||||
Request::Request(mid, msg, req) => {
|
||||
buffer_mid.push((mid, req));
|
||||
buffer.push(msg);
|
||||
|
||||
if buffer_mid.len() >= cfg.batch_size {
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
if buffer_mid.len() >= cfg.batch_size {
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
|
||||
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
|
||||
}
|
||||
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
|
||||
}
|
||||
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
|
||||
Request::Action(Action::Close) => { rx.close(); }
|
||||
Request::Action(Action::Flush) => {
|
||||
let stx_clone = stx.clone();
|
||||
|
||||
if !buffer_mid.is_empty() {
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
|
||||
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
|
||||
}
|
||||
|
||||
stx_clone.send(Event::Flushed).unwrap();
|
||||
}
|
||||
|
||||
Request::Action(Action::Sync) => {
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
Request::Action(Action::Init) => {
|
||||
stx.send(Event::Ready).unwrap();
|
||||
}
|
||||
Request::Action(Action::Close) => {
|
||||
rx.close();
|
||||
}
|
||||
Request::Action(Action::Flush) => {
|
||||
let stx_clone = stx.clone();
|
||||
|
||||
if !buffer_mid.is_empty() {
|
||||
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
|
||||
let buffer_clone = buffer.drain(..).collect();
|
||||
|
||||
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
|
||||
}
|
||||
|
||||
stx_clone.send(Event::Flushed).unwrap();
|
||||
}
|
||||
|
||||
Request::Action(Action::Sync) => {
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,10 @@ use crate::{
|
||||
|
||||
use super::SynchronizedBatchedConfig;
|
||||
use futures::{executor::block_on, Future};
|
||||
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedSender},
|
||||
Mutex,
|
||||
};
|
||||
|
||||
batch_synchronized_poller_macro! {
|
||||
T,
|
||||
|
@ -15,7 +15,10 @@ use crate::{
|
||||
receivers::Request,
|
||||
AsyncSynchronizedHandler, Bus, Message, Untyped,
|
||||
};
|
||||
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedSender},
|
||||
Mutex,
|
||||
};
|
||||
|
||||
synchronized_poller_macro! {
|
||||
T,
|
||||
|
@ -27,13 +27,12 @@ impl Default for SynchronizedConfig {
|
||||
#[macro_export]
|
||||
macro_rules! synchronized_poller_macro {
|
||||
($t: tt, $h: tt, $st1: expr, $st2: expr) => {
|
||||
fn synchronized_poller<$t, M, R>(
|
||||
async fn synchronized_poller<$t, M, R>(
|
||||
mut rx: mpsc::UnboundedReceiver<Request<M>>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
|
||||
) -> impl Future<Output = ()>
|
||||
where
|
||||
) where
|
||||
$t: $h<M, Response = R> + 'static,
|
||||
$t::Error: StdSyncSendError,
|
||||
M: Message,
|
||||
@ -41,25 +40,29 @@ macro_rules! synchronized_poller_macro {
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
|
||||
async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
match msg {
|
||||
Request::Request(mid, msg, _req) => {
|
||||
($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
|
||||
Request::Action(Action::Close) => { rx.close(); }
|
||||
Request::Action(Action::Flush) => { stx.send(Event::Flushed).unwrap(); }
|
||||
Request::Action(Action::Sync) => {
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
while let Some(msg) = rx.recv().await {
|
||||
match msg {
|
||||
Request::Request(mid, msg, _req) => {
|
||||
($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
Request::Action(Action::Init) => {
|
||||
stx.send(Event::Ready).unwrap();
|
||||
}
|
||||
Request::Action(Action::Close) => {
|
||||
rx.close();
|
||||
}
|
||||
Request::Action(Action::Flush) => {
|
||||
stx.send(Event::Flushed).unwrap();
|
||||
}
|
||||
Request::Action(Action::Sync) => {
|
||||
let resp = ($st2)(bus.clone(), ut.clone()).await;
|
||||
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,10 @@ use crate::{
|
||||
receivers::Request,
|
||||
Bus, Message, SynchronizedHandler, Untyped,
|
||||
};
|
||||
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedSender},
|
||||
Mutex,
|
||||
};
|
||||
|
||||
synchronized_poller_macro! {
|
||||
T,
|
||||
|
@ -7,13 +7,13 @@ use crate::{
|
||||
stats::Stats,
|
||||
Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use std::sync::Arc;
|
||||
use core::{
|
||||
pin::Pin,
|
||||
sync::atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use futures::{future::poll_fn, Future};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{oneshot, Notify};
|
||||
|
||||
pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUntypedReceiver + 'static {}
|
||||
@ -217,7 +217,10 @@ where
|
||||
}
|
||||
|
||||
fn increment_processing(&self, tt: &TypeTag) {
|
||||
self.context.receivers.get(tt).map(|r|r.processing.fetch_add(1, Ordering::SeqCst));
|
||||
self.context
|
||||
.receivers
|
||||
.get(tt)
|
||||
.map(|r| r.processing.fetch_add(1, Ordering::SeqCst));
|
||||
}
|
||||
|
||||
fn start_polling(
|
||||
|
@ -1,7 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use messagebus::{AsyncBatchHandler, Bus, Message, derive::{Error as MbError, Message}, error, receivers::BufferUnorderedBatchedConfig};
|
||||
use messagebus::{
|
||||
derive::{Error as MbError, Message},
|
||||
error,
|
||||
receivers::BufferUnorderedBatchedConfig,
|
||||
AsyncBatchHandler, Bus, Message,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use thiserror::Error;
|
||||
|
||||
@ -26,7 +31,7 @@ struct MsgI32(i32);
|
||||
struct MsgI16(i16);
|
||||
|
||||
struct TmpReceiver {
|
||||
batches: Arc<Mutex<Vec<Vec<i32>>>>
|
||||
batches: Arc<Mutex<Vec<Vec<i32>>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -41,7 +46,9 @@ impl AsyncBatchHandler<MsgI32> for TmpReceiver {
|
||||
msg: Vec<MsgI32>,
|
||||
_bus: &Bus,
|
||||
) -> Result<Vec<Self::Response>, Self::Error> {
|
||||
self.batches.lock().push(msg.into_iter().map(|x|x.0).collect());
|
||||
self.batches
|
||||
.lock()
|
||||
.push(msg.into_iter().map(|x| x.0).collect());
|
||||
|
||||
Ok(vec![])
|
||||
}
|
||||
@ -52,11 +59,16 @@ async fn test_batch() {
|
||||
let batches = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let (b, poller) = Bus::build()
|
||||
.register(TmpReceiver { batches: batches.clone() })
|
||||
.subscribe_batch_async::<MsgI32>(16, BufferUnorderedBatchedConfig {
|
||||
batch_size: 8,
|
||||
..Default::default()
|
||||
.register(TmpReceiver {
|
||||
batches: batches.clone(),
|
||||
})
|
||||
.subscribe_batch_async::<MsgI32>(
|
||||
16,
|
||||
BufferUnorderedBatchedConfig {
|
||||
batch_size: 8,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.done()
|
||||
.build();
|
||||
|
||||
|
@ -1,9 +1,13 @@
|
||||
use std::{sync::atomic::AtomicU32, time::Duration};
|
||||
|
||||
use messagebus::{AsyncHandler, Bus, Message, Module, derive::{Error as MbError, Message}, error, receivers::BufferUnorderedConfig};
|
||||
use thiserror::Error;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use messagebus::{
|
||||
derive::{Error as MbError, Message},
|
||||
error,
|
||||
receivers::BufferUnorderedConfig,
|
||||
AsyncHandler, Bus, Message, Module,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error, MbError)]
|
||||
enum Error {
|
||||
@ -20,7 +24,6 @@ impl<M: Message> From<error::Error<M>> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Message)]
|
||||
struct Req(pub u32);
|
||||
|
||||
@ -34,7 +37,7 @@ struct GetCount;
|
||||
struct CountResult(pub u32);
|
||||
|
||||
struct TmpReceiver {
|
||||
counter: AtomicU32
|
||||
counter: AtomicU32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -43,10 +46,9 @@ impl AsyncHandler<Req> for TmpReceiver {
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, msg: Req, bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
tokio::time::sleep(Duration::from_millis((msg.0 % 20) as _))
|
||||
.await;
|
||||
tokio::time::sleep(Duration::from_millis((msg.0 % 20) as _)).await;
|
||||
|
||||
if msg.0 % 128 == 0 {
|
||||
if msg.0 % 128 == 0 {
|
||||
return Err(Error::MyError);
|
||||
} else {
|
||||
bus.send(Resp(msg.0)).await?;
|
||||
@ -62,7 +64,8 @@ impl AsyncHandler<Resp> for TmpReceiver {
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&self, _msg: Resp, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
self.counter
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -73,17 +76,24 @@ impl AsyncHandler<GetCount> for TmpReceiver {
|
||||
type Response = CountResult;
|
||||
|
||||
async fn handle(&self, _: GetCount, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
Ok(CountResult(self.counter.load(std::sync::atomic::Ordering::SeqCst)))
|
||||
Ok(CountResult(
|
||||
self.counter.load(std::sync::atomic::Ordering::SeqCst),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn module() -> Module {
|
||||
Module::new()
|
||||
.register(TmpReceiver { counter: AtomicU32::new(0) })
|
||||
.subscribe_async::<Req>(1024, BufferUnorderedConfig {
|
||||
buffer_size: 1024,
|
||||
max_parallel: 1024,
|
||||
.register(TmpReceiver {
|
||||
counter: AtomicU32::new(0),
|
||||
})
|
||||
.subscribe_async::<Req>(
|
||||
1024,
|
||||
BufferUnorderedConfig {
|
||||
buffer_size: 1024,
|
||||
max_parallel: 1024,
|
||||
},
|
||||
)
|
||||
.subscribe_async::<Resp>(1024, Default::default())
|
||||
.subscribe_async::<GetCount>(8, Default::default())
|
||||
.done()
|
||||
@ -93,7 +103,7 @@ fn module() -> Module {
|
||||
async fn test_sync() {
|
||||
let (b, poller) = Bus::build().add_module(module()).build();
|
||||
let cnt = 4u32;
|
||||
for i in 0..cnt{
|
||||
for i in 0..cnt {
|
||||
for j in 0..32768 {
|
||||
b.send(Req(i * 128 + j)).await.unwrap();
|
||||
}
|
||||
@ -107,7 +117,13 @@ async fn test_sync() {
|
||||
|
||||
println!("flushed");
|
||||
|
||||
assert_eq!(b.request_we::<_, CountResult, Error>(GetCount, Default::default()).await.unwrap().0, cnt * 32768 - cnt * 256);
|
||||
assert_eq!(
|
||||
b.request_we::<_, CountResult, Error>(GetCount, Default::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.0,
|
||||
cnt * 32768 - cnt * 256
|
||||
);
|
||||
|
||||
b.close().await;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user