diff --git a/Cargo.toml b/Cargo.toml index 1892474..a9ee165 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,10 @@ license = "MIT OR Apache-2.0" exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] edition = "2018" +[features] +quic = ["quinn"] + + [dependencies] messagebus_derive = { path = "./derive" } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] } @@ -23,6 +27,8 @@ thiserror = "1" erased-serde = "0.3" serde = "1" serde_derive = "1" +dashmap = "4.0.2" +quinn = { version = "0.7", optional = true } [dev-dependencies] diff --git a/src/builder.rs b/src/builder.rs index 608cef3..40f977c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -254,12 +254,8 @@ impl Module { self } - pub fn register_relay( - mut self, - inner: S, - queue: u64, - ) -> Self { - let receiver = Receiver::new_relay::(queue, inner); + pub fn register_relay(mut self, inner: S) -> Self { + let receiver = Receiver::new_relay::(inner); self.pollings.push(receiver.start_polling()); let mut receiver_added = false; @@ -358,8 +354,8 @@ impl BusBuilder { BusBuilder { inner } } - pub fn register_relay(self, inner: S, queue: u64) -> Self { - let inner = self.inner.register_relay(inner, queue); + pub fn register_relay(self, inner: S) -> Self { + let inner = self.inner.register_relay(inner); BusBuilder { inner } } diff --git a/src/envelop.rs b/src/envelop.rs index 1e7258e..3da4981 100644 --- a/src/envelop.rs +++ b/src/envelop.rs @@ -112,47 +112,6 @@ impl Message for () { } } -// impl Message for Arc { -// fn as_any_ref(&self) -> &dyn Any { -// T::as_any_ref(&*self) -// } -// fn as_any_mut(&mut self) -> &mut dyn Any { -// unimplemented!() -// } -// fn as_any_boxed(self: Box) -> Box { -// unimplemented!() -// } -// fn as_any_arc(self: Arc) -> Arc { -// self -// } - -// fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { -// Some(self) -// } -// fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { -// Some(self) -// } -// fn as_shared_boxed(self: Box) -> Option> { -// Some(self) -// } -// fn as_shared_arc(self: Arc) -> Option> { -// Some(self) -// } -// fn try_clone_into(&self, into: &mut dyn Any) -> bool { -// let into = if let Some(inner) = into.downcast_mut::>() { -// inner -// } else { -// return false; -// }; - -// into.replace(self.clone()); -// true -// } -// fn try_clone_boxed(&self) -> Option> { -// Some(Box::new(self.clone())) -// } -// } - pub trait IntoBoxedMessage { fn into_boxed(self) -> Box; } diff --git a/src/error.rs b/src/error.rs index e0621c2..9e85270 100644 --- a/src/error.rs +++ b/src/error.rs @@ -79,6 +79,9 @@ pub enum Error #[error("Message Send Error: {0}")] SendError(#[from] SendError), + #[error("Message receiver dropped try again another receiver")] + TryAgain(M), + #[error("NoResponse")] NoResponse, @@ -91,6 +94,9 @@ pub enum Error #[error("MessageCastError")] MessageCastError, + #[error("Not Ready")] + NotReady, + #[error("Other({0})")] Other(E), @@ -111,7 +117,8 @@ impl Error { pub fn map_msg UM>(self, f: F) -> Error { match self { Error::SendError(inner) => Error::SendError(inner.map_msg(f)), - Error::NoResponse => Error::NoReceivers, + Error::TryAgain(inner) => Error::TryAgain(f(inner)), + Error::NoResponse => Error::NoResponse, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::Other(inner), @@ -120,13 +127,15 @@ impl Error { Error::AddListenerError => Error::AddListenerError, Error::MessageCastError => Error::MessageCastError, Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), + Error::NotReady => Error::NotReady, } } pub fn map_err UE>(self, f: F) -> Error { match self { Error::SendError(inner) => Error::SendError(inner), - Error::NoResponse => Error::NoReceivers, + Error::TryAgain(inner) => Error::TryAgain(inner), + Error::NoResponse => Error::NoResponse, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::Other(f(inner)), @@ -135,6 +144,7 @@ impl Error { Error::AddListenerError => Error::AddListenerError, Error::MessageCastError => Error::MessageCastError, Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), + Error::NotReady => Error::NotReady, } } } @@ -143,7 +153,8 @@ impl Error { pub fn into_dyn(self) -> Error { match self { Error::SendError(inner) => Error::SendError(inner), - Error::NoResponse => Error::NoReceivers, + Error::TryAgain(inner) => Error::TryAgain(inner), + Error::NoResponse => Error::NoResponse, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _), @@ -152,13 +163,15 @@ impl Error { Error::AddListenerError => Error::AddListenerError, Error::MessageCastError => Error::MessageCastError, Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), + Error::NotReady => Error::NotReady, } } pub fn map> + StdSyncSendError>(self) -> Error { match self { Error::SendError(inner) => Error::SendError(inner), - Error::NoResponse => Error::NoReceivers, + Error::TryAgain(inner) => Error::TryAgain(inner), + Error::NoResponse => Error::NoResponse, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(_) => panic!("expected boxed error!"), @@ -167,6 +180,7 @@ impl Error { Error::AddListenerError => Error::AddListenerError, Error::MessageCastError => Error::MessageCastError, Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), + Error::NotReady => Error::NotReady, } } } @@ -175,8 +189,9 @@ impl Error<(), E> { pub fn specify(self) -> Error { match self { Error::SendError(_) => panic!("cannot specify type on typed error"), + Error::TryAgain(_) => panic!("cannot specify type on typed error"), Error::WrongMessageType(_) => panic!("cannot specify type on typed error"), - Error::NoResponse => Error::NoReceivers, + Error::NoResponse => Error::NoResponse, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::Other(inner), @@ -184,6 +199,7 @@ impl Error<(), E> { Error::AddListenerError => Error::AddListenerError, Error::MessageCastError => Error::MessageCastError, Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), + Error::NotReady => Error::NotReady, } } } @@ -203,8 +219,9 @@ impl Error> { Error::SendError(SendError::Full(m)) => { Error::SendError(SendError::Full(m.into_boxed())) } + Error::TryAgain(inner) => Error::TryAgain(inner.into_boxed()), Error::WrongMessageType(m) => Error::WrongMessageType(m.into_boxed()), - Error::NoResponse => Error::NoReceivers, + Error::NoResponse => Error::NoResponse, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::Other(inner), @@ -212,6 +229,7 @@ impl Error> { Error::AddListenerError => Error::AddListenerError, Error::MessageCastError => Error::MessageCastError, Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), + Error::NotReady => Error::NotReady, } } } diff --git a/src/lib.rs b/src/lib.rs index c415803..cd6671a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ mod handler; mod receiver; pub mod receivers; mod relay; +pub mod relays; mod trait_object; #[macro_use] @@ -124,11 +125,11 @@ impl BusInner { } } - fn try_reserve(&self, rs: &[Receiver]) -> Option> { + fn try_reserve(&self, tt: &TypeTag, rs: &[Receiver]) -> Option> { let mut permits = SmallVec::<[Permit; 32]>::new(); for r in rs { - if let Some(prmt) = r.try_reserve() { + if let Some(prmt) = r.try_reserve(tt) { permits.push(prmt); } else { return None; @@ -156,7 +157,7 @@ impl BusInner { let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); if let Some(rs) = self.receivers.get(&tt) { - let permits = if let Some(x) = self.try_reserve(rs) { + let permits = if let Some(x) = self.try_reserve(&tt, rs) { x } else { return Err(SendError::Full(msg).into()); @@ -228,10 +229,10 @@ impl BusInner { if let Some(rs) = self.receivers.get(&tt) { if let Some((last, head)) = rs.split_last() { for r in head { - let _ = r.send(mid, msg.clone(), r.reserve().await); + let _ = r.send(mid, msg.clone(), r.reserve(&tt).await); } - let _ = last.send(mid, msg, last.reserve().await); + let _ = last.send(mid, msg, last.reserve(&tt).await); return Ok(()); } @@ -292,7 +293,7 @@ impl BusInner { let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { - let permits = if let Some(x) = rs.try_reserve() { + let permits = if let Some(x) = rs.try_reserve(&tt) { x } else { return Err(SendError::Full(msg).into()); @@ -313,7 +314,7 @@ impl BusInner { let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { - Ok(rs.send(mid, msg, rs.reserve().await)?) + Ok(rs.send(mid, msg, rs.reserve(&tt).await)?) } else { Err(Error::NoReceivers) } @@ -340,7 +341,7 @@ impl BusInner { let mid = mid | 1 << (u64::BITS - 1); - rc.send(mid, req, rc.reserve().await)?; + rc.send(mid, req, rc.reserve(&tid).await)?; rx.await.map_err(|x| x.specify::()) } else { Err(Error::NoReceivers) @@ -364,7 +365,7 @@ impl BusInner { .map_msg(|_| unimplemented!()) })?; - rc.send(mid | 1 << (u64::BITS - 1), req, rc.reserve().await) + rc.send(mid | 1 << (u64::BITS - 1), req, rc.reserve(&tid).await) .map_err(|x| x.map_err(|_| unimplemented!()))?; rx.await.map_err(|x| x.specify::()) @@ -388,10 +389,10 @@ impl BusInner { if let Some(rs) = self.receivers.get(&tt) { if let Some((last, head)) = rs.split_last() { for r in head { - let _ = r.send_boxed(mid, msg.try_clone_boxed().unwrap(), r.reserve().await); + let _ = r.send_boxed(mid, msg.try_clone_boxed().unwrap(), r.reserve(&tt).await); } - let _ = last.send_boxed(mid, msg, last.reserve().await); + let _ = last.send_boxed(mid, msg, last.reserve(&tt).await); return Ok(()); } @@ -415,7 +416,7 @@ impl BusInner { let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { - Ok(rs.send_boxed(mid, msg, rs.reserve().await)?) + Ok(rs.send_boxed(mid, msg, rs.reserve(&tt).await)?) } else { Err(Error::NoReceivers) } @@ -439,7 +440,7 @@ impl BusInner { .map_msg(|_| unimplemented!()) })?; - rc.send_boxed(mid | 1 << (usize::BITS - 1), req, rc.reserve().await)?; + rc.send_boxed(mid | 1 << (usize::BITS - 1), req, rc.reserve(&tt).await)?; rx.await.map_err(|x| x.specify::>()) } else { @@ -462,7 +463,7 @@ impl BusInner { if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { let msg = self.deserialize_message(tt.clone(), de)?; - Ok(rs.send_boxed(mid, msg, rs.reserve().await)?) + Ok(rs.send_boxed(mid, msg, rs.reserve(&tt).await)?) } else { Err(Error::NoReceivers) } @@ -484,7 +485,7 @@ impl BusInner { let (mid, rx) = rc.add_response_waiter_boxed().unwrap(); let msg = self.deserialize_message(tt.clone(), de)?; - rc.send_boxed(mid | 1 << (usize::BITS - 1), msg, rc.reserve().await)?; + rc.send_boxed(mid | 1 << (usize::BITS - 1), msg, rc.reserve(&tt).await)?; rx.await.map_err(|x| x.specify::>()) } else { diff --git a/src/receiver.rs b/src/receiver.rs index d284dd2..b59bbf7 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -85,7 +85,6 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync { fn name(&self) -> &str; fn typed(&self) -> Option>; fn wrapper(&self) -> Option>; - fn wrapper_arc(self: Arc) -> Option; fn send_boxed(&self, mid: u64, msg: Box) -> Result<(), Error>>; fn add_response_listener( @@ -106,8 +105,10 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync { fn need_flush(&self) -> bool; - fn try_reserve(&self) -> Option; - fn reserve_notify(&self) -> &Notify; + fn try_reserve(&self, tt: &TypeTag) -> Option; + fn reserve_notify(&self, tt: &TypeTag) -> Arc; + + fn ready(&self) -> bool; fn start_polling( self: Arc, @@ -140,6 +141,7 @@ pub struct Stats { #[non_exhaustive] #[derive(Debug, Clone)] pub enum Action { + Init, Flush, Sync, Close, @@ -154,6 +156,8 @@ pub enum Event { Stats(Stats), Flushed, Exited, + Ready, + Pause, } struct ReceiverWrapper @@ -186,6 +190,8 @@ where let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await; match event { + Event::Pause => self.context.ready.store(false, Ordering::SeqCst), + Event::Ready => self.context.ready.store(true, Ordering::SeqCst), Event::Exited => { self.context.closed.notify_waiters(); break; @@ -300,10 +306,6 @@ where Some(AnyWrapperRef::new(self)) } - fn wrapper_arc(self: Arc) -> Option { - Some(AnyWrapperArc::new(self)) - } - fn send_boxed( &self, mid: u64, @@ -360,7 +362,7 @@ where self.context.need_flush.load(Ordering::SeqCst) } - fn try_reserve(&self) -> Option { + fn try_reserve(&self, _: &TypeTag) -> Option { loop { let count = self.context.processing.load(Ordering::Relaxed); @@ -385,8 +387,8 @@ where } } - fn reserve_notify(&self) -> &Notify { - &self.context.response + fn reserve_notify(&self, _: &TypeTag) -> Arc { + self.context.response.clone() } fn start_polling( @@ -394,6 +396,10 @@ where ) -> Box Pin + Send>>> { self.start_polling_events() } + + fn ready(&self) -> bool { + self.context.ready.load(Ordering::SeqCst) + } } pub struct Permit { @@ -520,36 +526,6 @@ impl<'a> AnyWrapperRef<'a> { unsafe impl Send for AnyWrapperRef<'_> {} -pub struct AnyWrapperArc { - wrapper_re: Box, -} - -impl AnyWrapperArc { - pub fn new(rcvr: Arc) -> Self - where - R: Message, - E: StdSyncSendError, - S: WrapperReturnTypeAndError + 'static, - { - let wrapper_re = Box::new(rcvr as Arc>); - - Self { wrapper_re } - } - - #[inline] - pub fn cast_ret_and_error( - &self, - ) -> Option>> { - Some( - self.wrapper_re - .downcast_ref::>>()? - .clone(), - ) - } -} - -unsafe impl Send for AnyWrapperArc {} - #[derive(Debug, Clone)] pub struct ReceiverStats { pub name: Cow<'static, str>, @@ -577,10 +553,11 @@ struct ReceiverContext { limit: u64, processing: AtomicU64, need_flush: AtomicBool, + ready: AtomicBool, flushed: Notify, synchronized: Notify, closed: Notify, - response: Notify, + response: Arc, } impl PermitDrop for ReceiverContext { @@ -632,10 +609,11 @@ impl Receiver { limit, processing: AtomicU64::new(0), need_flush: AtomicBool::new(false), + ready: AtomicBool::new(false), flushed: Notify::new(), synchronized: Notify::new(), closed: Notify::new(), - response: Notify::new(), + response: Arc::new(Notify::new()), }), _m: Default::default(), }), @@ -643,12 +621,12 @@ impl Receiver { } #[inline] - pub(crate) fn new_relay(limit: u64, inner: S) -> Self + pub(crate) fn new_relay(inner: S) -> Self where S: Relay + Send + Sync + 'static, { Self { - inner: Arc::new(RelayWrapper::new(inner, limit)), + inner: Arc::new(RelayWrapper::new(inner)), } } @@ -668,19 +646,19 @@ impl Receiver { } #[inline] - pub async fn reserve(&self) -> Permit { + pub async fn reserve(&self, tt: &TypeTag) -> Permit { loop { - if let Some(p) = self.inner.try_reserve() { + if let Some(p) = self.inner.try_reserve(tt) { return p; } else { - self.inner.reserve_notify().notified().await + self.inner.reserve_notify(tt).notified().await } } } #[inline] - pub fn try_reserve(&self) -> Option { - self.inner.try_reserve() + pub fn try_reserve(&self, tt: &TypeTag) -> Option { + self.inner.try_reserve(tt) } #[inline] @@ -730,7 +708,7 @@ impl Receiver { ) -> Result<(), Error>> { let res = self.inner.send_boxed(mid, msg); permit.fuse = true; - Ok(()) + res } #[inline] diff --git a/src/receivers/buffer_unordered/mod.rs b/src/receivers/buffer_unordered/mod.rs index bd7ac88..7decacf 100644 --- a/src/receivers/buffer_unordered/mod.rs +++ b/src/receivers/buffer_unordered/mod.rs @@ -70,6 +70,9 @@ macro_rules! buffer_unordered_poller_macro { stats.clone(), )); } + Request::Action(Action::Init) => { + stx.send(Event::Ready).ok(); + }, Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Close) => rx.close(), Request::Action(Action::Sync) => { diff --git a/src/receivers/buffer_unordered_batched/mod.rs b/src/receivers/buffer_unordered_batched/mod.rs index bab0091..8b5fdd9 100644 --- a/src/receivers/buffer_unordered_batched/mod.rs +++ b/src/receivers/buffer_unordered_batched/mod.rs @@ -76,6 +76,9 @@ macro_rules! buffer_unordered_batch_poller_macro { buffer_mid.push(mid); buffer.push(msg); } + Request::Action(Action::Init) => { + stx.send(Event::Ready).ok(); + }, Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Close) => rx.close(), Request::Action(Action::Sync) => { diff --git a/src/receivers/synchronize_batched/mod.rs b/src/receivers/synchronize_batched/mod.rs index c14cfc4..3dbf0fc 100644 --- a/src/receivers/synchronize_batched/mod.rs +++ b/src/receivers/synchronize_batched/mod.rs @@ -102,6 +102,9 @@ macro_rules! batch_synchronized_poller_macro { buffer_mid.push(mid); buffer.push(msg); } + Request::Action(Action::Init) => { + stx.send(Event::Ready).ok(); + }, Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Sync) => need_sync = true, Request::Action(Action::Close) => rx.close(), diff --git a/src/receivers/synchronized/mod.rs b/src/receivers/synchronized/mod.rs index 786e060..d7976d5 100644 --- a/src/receivers/synchronized/mod.rs +++ b/src/receivers/synchronized/mod.rs @@ -70,6 +70,9 @@ macro_rules! synchronized_poller_macro { stx.send(Event::Flushed).ok(); continue; } + Request::Action(Action::Init) => { + stx.send(Event::Ready).ok(); + } Request::Action(Action::Sync) => need_sync = true, Request::Action(Action::Close) => { rx.close(); diff --git a/src/relay.rs b/src/relay.rs index 79fe539..0956bc5 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -1,11 +1,12 @@ use crate::{ error::Error, receiver::{ - Action, AnyReceiver, AnyWrapperArc, AnyWrapperRef, PermitDrop, ReceiverTrait, - SendUntypedReceiver, Stats, TypeTagAccept, + Action, AnyReceiver, AnyWrapperRef, PermitDrop, ReceiverTrait, SendUntypedReceiver, Stats, + TypeTagAccept, }, Bus, Event, Message, Permit, ReciveUnypedReceiver, TypeTag, }; +use dashmap::DashMap; use futures::{future::poll_fn, Future}; use std::{ pin::Pin, @@ -27,16 +28,31 @@ impl sharded_slab::Config for SlabCfg { type Slab = sharded_slab::Slab; pub(crate) struct RelayContext { - limit: u64, - processing: AtomicU64, + receivers: DashMap>, need_flush: AtomicBool, + ready: AtomicBool, flushed: Notify, synchronized: Notify, closed: Notify, - response: Notify, } -impl PermitDrop for RelayContext { +pub struct RelayReceiverContext { + limit: u64, + processing: AtomicU64, + response: Arc, +} + +impl RelayReceiverContext { + fn new(limit: u64) -> Self { + Self { + limit, + processing: Default::default(), + response: Arc::new(Notify::new()), + } + } +} + +impl PermitDrop for RelayReceiverContext { fn permit_drop(&self) { self.processing.fetch_sub(1, Ordering::SeqCst); } @@ -51,17 +67,16 @@ where waiters: Slab, Error>>>, } impl RelayWrapper { - pub fn new(inner: S, limit: u64) -> Self { + pub fn new(inner: S) -> Self { Self { inner, context: Arc::new(RelayContext { - limit, - processing: AtomicU64::new(0), + receivers: DashMap::new(), need_flush: AtomicBool::new(false), + ready: AtomicBool::new(false), flushed: Notify::new(), synchronized: Notify::new(), closed: Notify::new(), - response: Notify::new(), }), waiters: sharded_slab::Slab::new_with_config::(), } @@ -95,9 +110,7 @@ where fn wrapper(&self) -> Option> { None } - fn wrapper_arc(self: Arc) -> Option { - None - } + fn send_boxed( &self, mid: u64, @@ -148,12 +161,19 @@ where .ok_or_else(|| Error::AddListenerError)? as _) } - fn try_reserve(&self) -> Option { - loop { - let count = self.context.processing.load(Ordering::Relaxed); + fn try_reserve(&self, tt: &TypeTag) -> Option { + if !self.context.receivers.contains_key(tt) { + self.context + .receivers + .insert(tt.clone(), Arc::new(RelayReceiverContext::new(16))); + } - if count < self.context.limit { - let res = self.context.processing.compare_exchange( + loop { + let context = self.context.receivers.get(tt).unwrap(); + let count = context.processing.load(Ordering::Relaxed); + + if count < context.limit { + let res = context.processing.compare_exchange( count, count + 1, Ordering::SeqCst, @@ -162,7 +182,7 @@ where if res.is_ok() { break Some(Permit { fuse: false, - inner: self.context.clone(), + inner: context.clone(), }); } @@ -173,8 +193,18 @@ where } } - fn reserve_notify(&self) -> &Notify { - &self.context.response + fn reserve_notify(&self, tt: &TypeTag) -> Arc { + if !self.context.receivers.contains_key(tt) { + self.context + .receivers + .insert(tt.clone(), Arc::new(RelayReceiverContext::new(16))); + } + + self.context.receivers.get(tt).unwrap().response.clone() + } + + fn ready(&self) -> bool { + self.context.ready.load(Ordering::SeqCst) } fn start_polling( @@ -187,6 +217,8 @@ where let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await; match event { + Event::Pause => self.context.ready.store(false, Ordering::SeqCst), + Event::Ready => self.context.ready.store(true, Ordering::SeqCst), Event::Exited => { self.context.closed.notify_waiters(); break; @@ -194,8 +226,11 @@ where Event::Flushed => self.context.flushed.notify_waiters(), Event::Synchronized(_res) => self.context.synchronized.notify_waiters(), Event::Response(mid, resp) => { - self.context.processing.fetch_sub(1, Ordering::SeqCst); - self.context.response.notify_one(); + let tt = if let Ok(bm) = &resp { + Some(bm.type_tag()) + } else { + None + }; if let Some(chan) = self.waiters.take(mid as _) { if let Err(err) = chan.send(resp) { @@ -203,6 +238,13 @@ where } } else { warn!("No waiters for mid({})", mid); + }; + + if let Some(tt) = tt { + if let Some(ctx) = self.context.receivers.get(&tt) { + ctx.processing.fetch_sub(1, Ordering::SeqCst); + ctx.response.notify_one(); + } } } diff --git a/src/relays/mod.rs b/src/relays/mod.rs new file mode 100644 index 0000000..e79c1ec --- /dev/null +++ b/src/relays/mod.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "quic")] +mod quic; + +pub enum AuthKind { + Token(String), +} + +#[cfg(feature = "quic")] +pub use quic::*; diff --git a/src/relays/quic/client.rs b/src/relays/quic/client.rs new file mode 100644 index 0000000..e34db86 --- /dev/null +++ b/src/relays/quic/client.rs @@ -0,0 +1 @@ +pub struct QuicClientRelay {} diff --git a/src/relays/quic/mod.rs b/src/relays/quic/mod.rs new file mode 100644 index 0000000..67a445b --- /dev/null +++ b/src/relays/quic/mod.rs @@ -0,0 +1,5 @@ +mod client; +mod server; + +pub use client::QuicClientRelay; +pub use server::QuicServerRelay; diff --git a/src/relays/quic/server.rs b/src/relays/quic/server.rs new file mode 100644 index 0000000..5efbf15 --- /dev/null +++ b/src/relays/quic/server.rs @@ -0,0 +1 @@ +pub struct QuicServerRelay {} diff --git a/tests/test_relay.rs b/tests/test_relay.rs index 6f6e23d..85ee57f 100644 --- a/tests/test_relay.rs +++ b/tests/test_relay.rs @@ -192,7 +192,7 @@ impl TypeTagAccept for TestRelay { &self, msg: &messagebus::TypeTag, resp: Option<&messagebus::TypeTag>, - err: Option<&messagebus::TypeTag>, + _err: Option<&messagebus::TypeTag>, ) -> bool { if msg.as_ref() == Msg::::type_tag_().as_ref() { if let Some(resp) = resp { @@ -309,7 +309,7 @@ async fn test_relay() { }; let (b, poller) = Bus::build() - .register_relay(relay, 1) + .register_relay(relay) .register(TmpReceiver) .subscribe_async::>( 1,