Quic relay begin

This commit is contained in:
Andrey Tkachenko 2021-08-02 18:08:41 +04:00
parent 9a887d4821
commit db34774acd
16 changed files with 173 additions and 145 deletions

View File

@ -10,6 +10,10 @@ license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018" edition = "2018"
[features]
quic = ["quinn"]
[dependencies] [dependencies]
messagebus_derive = { path = "./derive" } messagebus_derive = { path = "./derive" }
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
@ -23,6 +27,8 @@ thiserror = "1"
erased-serde = "0.3" erased-serde = "0.3"
serde = "1" serde = "1"
serde_derive = "1" serde_derive = "1"
dashmap = "4.0.2"
quinn = { version = "0.7", optional = true }
[dev-dependencies] [dev-dependencies]

View File

@ -254,12 +254,8 @@ impl Module {
self self
} }
pub fn register_relay<S: Relay + Send + Sync + 'static>( pub fn register_relay<S: Relay + Send + Sync + 'static>(mut self, inner: S) -> Self {
mut self, let receiver = Receiver::new_relay::<S>(inner);
inner: S,
queue: u64,
) -> Self {
let receiver = Receiver::new_relay::<S>(queue, inner);
self.pollings.push(receiver.start_polling()); self.pollings.push(receiver.start_polling());
let mut receiver_added = false; let mut receiver_added = false;
@ -358,8 +354,8 @@ impl BusBuilder {
BusBuilder { inner } BusBuilder { inner }
} }
pub fn register_relay<S: Relay + Send + Sync + 'static>(self, inner: S, queue: u64) -> Self { pub fn register_relay<S: Relay + Send + Sync + 'static>(self, inner: S) -> Self {
let inner = self.inner.register_relay(inner, queue); let inner = self.inner.register_relay(inner);
BusBuilder { inner } BusBuilder { inner }
} }

View File

@ -112,47 +112,6 @@ impl Message for () {
} }
} }
// impl<T: Message> Message for Arc<T> {
// fn as_any_ref(&self) -> &dyn Any {
// T::as_any_ref(&*self)
// }
// fn as_any_mut(&mut self) -> &mut dyn Any {
// unimplemented!()
// }
// fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
// unimplemented!()
// }
// fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
// self
// }
// fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
// Some(self)
// }
// fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
// Some(self)
// }
// fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
// Some(self)
// }
// fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
// Some(self)
// }
// fn try_clone_into(&self, into: &mut dyn Any) -> bool {
// let into = if let Some(inner) = into.downcast_mut::<Option<()>>() {
// inner
// } else {
// return false;
// };
// into.replace(self.clone());
// true
// }
// fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
// Some(Box::new(self.clone()))
// }
// }
pub trait IntoBoxedMessage { pub trait IntoBoxedMessage {
fn into_boxed(self) -> Box<dyn Message>; fn into_boxed(self) -> Box<dyn Message>;
} }

View File

@ -79,6 +79,9 @@ pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = GenericError>
#[error("Message Send Error: {0}")] #[error("Message Send Error: {0}")]
SendError(#[from] SendError<M>), SendError(#[from] SendError<M>),
#[error("Message receiver dropped try again another receiver")]
TryAgain(M),
#[error("NoResponse")] #[error("NoResponse")]
NoResponse, NoResponse,
@ -91,6 +94,9 @@ pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = GenericError>
#[error("MessageCastError")] #[error("MessageCastError")]
MessageCastError, MessageCastError,
#[error("Not Ready")]
NotReady,
#[error("Other({0})")] #[error("Other({0})")]
Other(E), Other(E),
@ -111,7 +117,8 @@ impl<M: fmt::Debug + 'static, E: StdSyncSendError> Error<M, E> {
pub fn map_msg<UM: fmt::Debug + 'static, F: FnOnce(M) -> UM>(self, f: F) -> Error<UM, E> { pub fn map_msg<UM: fmt::Debug + 'static, F: FnOnce(M) -> UM>(self, f: F) -> Error<UM, E> {
match self { match self {
Error::SendError(inner) => Error::SendError(inner.map_msg(f)), Error::SendError(inner) => Error::SendError(inner.map_msg(f)),
Error::NoResponse => Error::NoReceivers, Error::TryAgain(inner) => Error::TryAgain(f(inner)),
Error::NoResponse => Error::NoResponse,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s), Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(inner), Error::Other(inner) => Error::Other(inner),
@ -120,13 +127,15 @@ impl<M: fmt::Debug + 'static, E: StdSyncSendError> Error<M, E> {
Error::AddListenerError => Error::AddListenerError, Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError, Error::MessageCastError => Error::MessageCastError,
Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt),
Error::NotReady => Error::NotReady,
} }
} }
pub fn map_err<UE: StdSyncSendError, F: FnOnce(E) -> UE>(self, f: F) -> Error<M, UE> { pub fn map_err<UE: StdSyncSendError, F: FnOnce(E) -> UE>(self, f: F) -> Error<M, UE> {
match self { match self {
Error::SendError(inner) => Error::SendError(inner), Error::SendError(inner) => Error::SendError(inner),
Error::NoResponse => Error::NoReceivers, Error::TryAgain(inner) => Error::TryAgain(inner),
Error::NoResponse => Error::NoResponse,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s), Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(f(inner)), Error::Other(inner) => Error::Other(f(inner)),
@ -135,6 +144,7 @@ impl<M: fmt::Debug + 'static, E: StdSyncSendError> Error<M, E> {
Error::AddListenerError => Error::AddListenerError, Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError, Error::MessageCastError => Error::MessageCastError,
Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt),
Error::NotReady => Error::NotReady,
} }
} }
} }
@ -143,7 +153,8 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
pub fn into_dyn(self) -> Error<M> { pub fn into_dyn(self) -> Error<M> {
match self { match self {
Error::SendError(inner) => Error::SendError(inner), Error::SendError(inner) => Error::SendError(inner),
Error::NoResponse => Error::NoReceivers, Error::TryAgain(inner) => Error::TryAgain(inner),
Error::NoResponse => Error::NoResponse,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s), Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _), Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _),
@ -152,13 +163,15 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
Error::AddListenerError => Error::AddListenerError, Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError, Error::MessageCastError => Error::MessageCastError,
Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt),
Error::NotReady => Error::NotReady,
} }
} }
pub fn map<U: From<Box<dyn StdSyncSendError>> + StdSyncSendError>(self) -> Error<M, U> { pub fn map<U: From<Box<dyn StdSyncSendError>> + StdSyncSendError>(self) -> Error<M, U> {
match self { match self {
Error::SendError(inner) => Error::SendError(inner), Error::SendError(inner) => Error::SendError(inner),
Error::NoResponse => Error::NoReceivers, Error::TryAgain(inner) => Error::TryAgain(inner),
Error::NoResponse => Error::NoResponse,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s), Error::Serialization(s) => Error::Serialization(s),
Error::Other(_) => panic!("expected boxed error!"), Error::Other(_) => panic!("expected boxed error!"),
@ -167,6 +180,7 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
Error::AddListenerError => Error::AddListenerError, Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError, Error::MessageCastError => Error::MessageCastError,
Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt),
Error::NotReady => Error::NotReady,
} }
} }
} }
@ -175,8 +189,9 @@ impl<E: StdSyncSendError> Error<(), E> {
pub fn specify<M: fmt::Debug>(self) -> Error<M, E> { pub fn specify<M: fmt::Debug>(self) -> Error<M, E> {
match self { match self {
Error::SendError(_) => panic!("cannot specify type on typed error"), Error::SendError(_) => panic!("cannot specify type on typed error"),
Error::TryAgain(_) => panic!("cannot specify type on typed error"),
Error::WrongMessageType(_) => panic!("cannot specify type on typed error"), Error::WrongMessageType(_) => panic!("cannot specify type on typed error"),
Error::NoResponse => Error::NoReceivers, Error::NoResponse => Error::NoResponse,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s), Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(inner), Error::Other(inner) => Error::Other(inner),
@ -184,6 +199,7 @@ impl<E: StdSyncSendError> Error<(), E> {
Error::AddListenerError => Error::AddListenerError, Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError, Error::MessageCastError => Error::MessageCastError,
Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt),
Error::NotReady => Error::NotReady,
} }
} }
} }
@ -203,8 +219,9 @@ impl Error<Box<dyn Message>> {
Error::SendError(SendError::Full(m)) => { Error::SendError(SendError::Full(m)) => {
Error::SendError(SendError::Full(m.into_boxed())) Error::SendError(SendError::Full(m.into_boxed()))
} }
Error::TryAgain(inner) => Error::TryAgain(inner.into_boxed()),
Error::WrongMessageType(m) => Error::WrongMessageType(m.into_boxed()), Error::WrongMessageType(m) => Error::WrongMessageType(m.into_boxed()),
Error::NoResponse => Error::NoReceivers, Error::NoResponse => Error::NoResponse,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s), Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(inner), Error::Other(inner) => Error::Other(inner),
@ -212,6 +229,7 @@ impl Error<Box<dyn Message>> {
Error::AddListenerError => Error::AddListenerError, Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError, Error::MessageCastError => Error::MessageCastError,
Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt), Error::TypeTagNotRegistered(tt) => Error::TypeTagNotRegistered(tt),
Error::NotReady => Error::NotReady,
} }
} }
} }

View File

@ -5,6 +5,7 @@ mod handler;
mod receiver; mod receiver;
pub mod receivers; pub mod receivers;
mod relay; mod relay;
pub mod relays;
mod trait_object; mod trait_object;
#[macro_use] #[macro_use]
@ -124,11 +125,11 @@ impl BusInner {
} }
} }
fn try_reserve(&self, rs: &[Receiver]) -> Option<SmallVec<[Permit; 32]>> { fn try_reserve(&self, tt: &TypeTag, rs: &[Receiver]) -> Option<SmallVec<[Permit; 32]>> {
let mut permits = SmallVec::<[Permit; 32]>::new(); let mut permits = SmallVec::<[Permit; 32]>::new();
for r in rs { for r in rs {
if let Some(prmt) = r.try_reserve() { if let Some(prmt) = r.try_reserve(tt) {
permits.push(prmt); permits.push(prmt);
} else { } else {
return None; return None;
@ -156,7 +157,7 @@ impl BusInner {
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
if let Some(rs) = self.receivers.get(&tt) { if let Some(rs) = self.receivers.get(&tt) {
let permits = if let Some(x) = self.try_reserve(rs) { let permits = if let Some(x) = self.try_reserve(&tt, rs) {
x x
} else { } else {
return Err(SendError::Full(msg).into()); return Err(SendError::Full(msg).into());
@ -228,10 +229,10 @@ impl BusInner {
if let Some(rs) = self.receivers.get(&tt) { if let Some(rs) = self.receivers.get(&tt) {
if let Some((last, head)) = rs.split_last() { if let Some((last, head)) = rs.split_last() {
for r in head { for r in head {
let _ = r.send(mid, msg.clone(), r.reserve().await); let _ = r.send(mid, msg.clone(), r.reserve(&tt).await);
} }
let _ = last.send(mid, msg, last.reserve().await); let _ = last.send(mid, msg, last.reserve(&tt).await);
return Ok(()); return Ok(());
} }
@ -292,7 +293,7 @@ impl BusInner {
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
let permits = if let Some(x) = rs.try_reserve() { let permits = if let Some(x) = rs.try_reserve(&tt) {
x x
} else { } else {
return Err(SendError::Full(msg).into()); return Err(SendError::Full(msg).into());
@ -313,7 +314,7 @@ impl BusInner {
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
Ok(rs.send(mid, msg, rs.reserve().await)?) Ok(rs.send(mid, msg, rs.reserve(&tt).await)?)
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }
@ -340,7 +341,7 @@ impl BusInner {
let mid = mid | 1 << (u64::BITS - 1); let mid = mid | 1 << (u64::BITS - 1);
rc.send(mid, req, rc.reserve().await)?; rc.send(mid, req, rc.reserve(&tid).await)?;
rx.await.map_err(|x| x.specify::<M>()) rx.await.map_err(|x| x.specify::<M>())
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
@ -364,7 +365,7 @@ impl BusInner {
.map_msg(|_| unimplemented!()) .map_msg(|_| unimplemented!())
})?; })?;
rc.send(mid | 1 << (u64::BITS - 1), req, rc.reserve().await) rc.send(mid | 1 << (u64::BITS - 1), req, rc.reserve(&tid).await)
.map_err(|x| x.map_err(|_| unimplemented!()))?; .map_err(|x| x.map_err(|_| unimplemented!()))?;
rx.await.map_err(|x| x.specify::<M>()) rx.await.map_err(|x| x.specify::<M>())
@ -388,10 +389,10 @@ impl BusInner {
if let Some(rs) = self.receivers.get(&tt) { if let Some(rs) = self.receivers.get(&tt) {
if let Some((last, head)) = rs.split_last() { if let Some((last, head)) = rs.split_last() {
for r in head { for r in head {
let _ = r.send_boxed(mid, msg.try_clone_boxed().unwrap(), r.reserve().await); let _ = r.send_boxed(mid, msg.try_clone_boxed().unwrap(), r.reserve(&tt).await);
} }
let _ = last.send_boxed(mid, msg, last.reserve().await); let _ = last.send_boxed(mid, msg, last.reserve(&tt).await);
return Ok(()); return Ok(());
} }
@ -415,7 +416,7 @@ impl BusInner {
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
Ok(rs.send_boxed(mid, msg, rs.reserve().await)?) Ok(rs.send_boxed(mid, msg, rs.reserve(&tt).await)?)
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }
@ -439,7 +440,7 @@ impl BusInner {
.map_msg(|_| unimplemented!()) .map_msg(|_| unimplemented!())
})?; })?;
rc.send_boxed(mid | 1 << (usize::BITS - 1), req, rc.reserve().await)?; rc.send_boxed(mid | 1 << (usize::BITS - 1), req, rc.reserve(&tt).await)?;
rx.await.map_err(|x| x.specify::<Box<dyn Message>>()) rx.await.map_err(|x| x.specify::<Box<dyn Message>>())
} else { } else {
@ -462,7 +463,7 @@ impl BusInner {
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
let msg = self.deserialize_message(tt.clone(), de)?; let msg = self.deserialize_message(tt.clone(), de)?;
Ok(rs.send_boxed(mid, msg, rs.reserve().await)?) Ok(rs.send_boxed(mid, msg, rs.reserve(&tt).await)?)
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }
@ -484,7 +485,7 @@ impl BusInner {
let (mid, rx) = rc.add_response_waiter_boxed().unwrap(); let (mid, rx) = rc.add_response_waiter_boxed().unwrap();
let msg = self.deserialize_message(tt.clone(), de)?; let msg = self.deserialize_message(tt.clone(), de)?;
rc.send_boxed(mid | 1 << (usize::BITS - 1), msg, rc.reserve().await)?; rc.send_boxed(mid | 1 << (usize::BITS - 1), msg, rc.reserve(&tt).await)?;
rx.await.map_err(|x| x.specify::<Box<dyn Message>>()) rx.await.map_err(|x| x.specify::<Box<dyn Message>>())
} else { } else {

View File

@ -85,7 +85,6 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync {
fn name(&self) -> &str; fn name(&self) -> &str;
fn typed(&self) -> Option<AnyReceiver<'_>>; fn typed(&self) -> Option<AnyReceiver<'_>>;
fn wrapper(&self) -> Option<AnyWrapperRef<'_>>; fn wrapper(&self) -> Option<AnyWrapperRef<'_>>;
fn wrapper_arc(self: Arc<Self>) -> Option<AnyWrapperArc>;
fn send_boxed(&self, mid: u64, msg: Box<dyn Message>) -> Result<(), Error<Box<dyn Message>>>; fn send_boxed(&self, mid: u64, msg: Box<dyn Message>) -> Result<(), Error<Box<dyn Message>>>;
fn add_response_listener( fn add_response_listener(
@ -106,8 +105,10 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync {
fn need_flush(&self) -> bool; fn need_flush(&self) -> bool;
fn try_reserve(&self) -> Option<Permit>; fn try_reserve(&self, tt: &TypeTag) -> Option<Permit>;
fn reserve_notify(&self) -> &Notify; fn reserve_notify(&self, tt: &TypeTag) -> Arc<Notify>;
fn ready(&self) -> bool;
fn start_polling( fn start_polling(
self: Arc<Self>, self: Arc<Self>,
@ -140,6 +141,7 @@ pub struct Stats {
#[non_exhaustive] #[non_exhaustive]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Action { pub enum Action {
Init,
Flush, Flush,
Sync, Sync,
Close, Close,
@ -154,6 +156,8 @@ pub enum Event<M, E: StdSyncSendError> {
Stats(Stats), Stats(Stats),
Flushed, Flushed,
Exited, Exited,
Ready,
Pause,
} }
struct ReceiverWrapper<M, R, E, S> struct ReceiverWrapper<M, R, E, S>
@ -186,6 +190,8 @@ where
let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await; let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await;
match event { match event {
Event::Pause => self.context.ready.store(false, Ordering::SeqCst),
Event::Ready => self.context.ready.store(true, Ordering::SeqCst),
Event::Exited => { Event::Exited => {
self.context.closed.notify_waiters(); self.context.closed.notify_waiters();
break; break;
@ -300,10 +306,6 @@ where
Some(AnyWrapperRef::new(self)) Some(AnyWrapperRef::new(self))
} }
fn wrapper_arc(self: Arc<Self>) -> Option<AnyWrapperArc> {
Some(AnyWrapperArc::new(self))
}
fn send_boxed( fn send_boxed(
&self, &self,
mid: u64, mid: u64,
@ -360,7 +362,7 @@ where
self.context.need_flush.load(Ordering::SeqCst) self.context.need_flush.load(Ordering::SeqCst)
} }
fn try_reserve(&self) -> Option<Permit> { fn try_reserve(&self, _: &TypeTag) -> Option<Permit> {
loop { loop {
let count = self.context.processing.load(Ordering::Relaxed); let count = self.context.processing.load(Ordering::Relaxed);
@ -385,8 +387,8 @@ where
} }
} }
fn reserve_notify(&self) -> &Notify { fn reserve_notify(&self, _: &TypeTag) -> Arc<Notify> {
&self.context.response self.context.response.clone()
} }
fn start_polling( fn start_polling(
@ -394,6 +396,10 @@ where
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> { ) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
self.start_polling_events() self.start_polling_events()
} }
fn ready(&self) -> bool {
self.context.ready.load(Ordering::SeqCst)
}
} }
pub struct Permit { pub struct Permit {
@ -520,36 +526,6 @@ impl<'a> AnyWrapperRef<'a> {
unsafe impl Send for AnyWrapperRef<'_> {} unsafe impl Send for AnyWrapperRef<'_> {}
pub struct AnyWrapperArc {
wrapper_re: Box<dyn Any>,
}
impl AnyWrapperArc {
pub fn new<R, E, S>(rcvr: Arc<S>) -> Self
where
R: Message,
E: StdSyncSendError,
S: WrapperReturnTypeAndError<R, E> + 'static,
{
let wrapper_re = Box::new(rcvr as Arc<dyn WrapperReturnTypeAndError<R, E>>);
Self { wrapper_re }
}
#[inline]
pub fn cast_ret_and_error<R: Message, E: StdSyncSendError>(
&self,
) -> Option<Arc<dyn WrapperReturnTypeAndError<R, E>>> {
Some(
self.wrapper_re
.downcast_ref::<Arc<dyn WrapperReturnTypeAndError<R, E>>>()?
.clone(),
)
}
}
unsafe impl Send for AnyWrapperArc {}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ReceiverStats { pub struct ReceiverStats {
pub name: Cow<'static, str>, pub name: Cow<'static, str>,
@ -577,10 +553,11 @@ struct ReceiverContext {
limit: u64, limit: u64,
processing: AtomicU64, processing: AtomicU64,
need_flush: AtomicBool, need_flush: AtomicBool,
ready: AtomicBool,
flushed: Notify, flushed: Notify,
synchronized: Notify, synchronized: Notify,
closed: Notify, closed: Notify,
response: Notify, response: Arc<Notify>,
} }
impl PermitDrop for ReceiverContext { impl PermitDrop for ReceiverContext {
@ -632,10 +609,11 @@ impl Receiver {
limit, limit,
processing: AtomicU64::new(0), processing: AtomicU64::new(0),
need_flush: AtomicBool::new(false), need_flush: AtomicBool::new(false),
ready: AtomicBool::new(false),
flushed: Notify::new(), flushed: Notify::new(),
synchronized: Notify::new(), synchronized: Notify::new(),
closed: Notify::new(), closed: Notify::new(),
response: Notify::new(), response: Arc::new(Notify::new()),
}), }),
_m: Default::default(), _m: Default::default(),
}), }),
@ -643,12 +621,12 @@ impl Receiver {
} }
#[inline] #[inline]
pub(crate) fn new_relay<S>(limit: u64, inner: S) -> Self pub(crate) fn new_relay<S>(inner: S) -> Self
where where
S: Relay + Send + Sync + 'static, S: Relay + Send + Sync + 'static,
{ {
Self { Self {
inner: Arc::new(RelayWrapper::new(inner, limit)), inner: Arc::new(RelayWrapper::new(inner)),
} }
} }
@ -668,19 +646,19 @@ impl Receiver {
} }
#[inline] #[inline]
pub async fn reserve(&self) -> Permit { pub async fn reserve(&self, tt: &TypeTag) -> Permit {
loop { loop {
if let Some(p) = self.inner.try_reserve() { if let Some(p) = self.inner.try_reserve(tt) {
return p; return p;
} else { } else {
self.inner.reserve_notify().notified().await self.inner.reserve_notify(tt).notified().await
} }
} }
} }
#[inline] #[inline]
pub fn try_reserve(&self) -> Option<Permit> { pub fn try_reserve(&self, tt: &TypeTag) -> Option<Permit> {
self.inner.try_reserve() self.inner.try_reserve(tt)
} }
#[inline] #[inline]
@ -730,7 +708,7 @@ impl Receiver {
) -> Result<(), Error<Box<dyn Message>>> { ) -> Result<(), Error<Box<dyn Message>>> {
let res = self.inner.send_boxed(mid, msg); let res = self.inner.send_boxed(mid, msg);
permit.fuse = true; permit.fuse = true;
Ok(()) res
} }
#[inline] #[inline]

View File

@ -70,6 +70,9 @@ macro_rules! buffer_unordered_poller_macro {
stats.clone(), stats.clone(),
)); ));
} }
Request::Action(Action::Init) => {
stx.send(Event::Ready).ok();
},
Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Flush) => need_flush = true,
Request::Action(Action::Close) => rx.close(), Request::Action(Action::Close) => rx.close(),
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {

View File

@ -76,6 +76,9 @@ macro_rules! buffer_unordered_batch_poller_macro {
buffer_mid.push(mid); buffer_mid.push(mid);
buffer.push(msg); buffer.push(msg);
} }
Request::Action(Action::Init) => {
stx.send(Event::Ready).ok();
},
Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Flush) => need_flush = true,
Request::Action(Action::Close) => rx.close(), Request::Action(Action::Close) => rx.close(),
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {

View File

@ -102,6 +102,9 @@ macro_rules! batch_synchronized_poller_macro {
buffer_mid.push(mid); buffer_mid.push(mid);
buffer.push(msg); buffer.push(msg);
} }
Request::Action(Action::Init) => {
stx.send(Event::Ready).ok();
},
Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Flush) => need_flush = true,
Request::Action(Action::Sync) => need_sync = true, Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => rx.close(), Request::Action(Action::Close) => rx.close(),

View File

@ -70,6 +70,9 @@ macro_rules! synchronized_poller_macro {
stx.send(Event::Flushed).ok(); stx.send(Event::Flushed).ok();
continue; continue;
} }
Request::Action(Action::Init) => {
stx.send(Event::Ready).ok();
}
Request::Action(Action::Sync) => need_sync = true, Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => { Request::Action(Action::Close) => {
rx.close(); rx.close();

View File

@ -1,11 +1,12 @@
use crate::{ use crate::{
error::Error, error::Error,
receiver::{ receiver::{
Action, AnyReceiver, AnyWrapperArc, AnyWrapperRef, PermitDrop, ReceiverTrait, Action, AnyReceiver, AnyWrapperRef, PermitDrop, ReceiverTrait, SendUntypedReceiver, Stats,
SendUntypedReceiver, Stats, TypeTagAccept, TypeTagAccept,
}, },
Bus, Event, Message, Permit, ReciveUnypedReceiver, TypeTag, Bus, Event, Message, Permit, ReciveUnypedReceiver, TypeTag,
}; };
use dashmap::DashMap;
use futures::{future::poll_fn, Future}; use futures::{future::poll_fn, Future};
use std::{ use std::{
pin::Pin, pin::Pin,
@ -27,16 +28,31 @@ impl sharded_slab::Config for SlabCfg {
type Slab<T> = sharded_slab::Slab<T, SlabCfg>; type Slab<T> = sharded_slab::Slab<T, SlabCfg>;
pub(crate) struct RelayContext { pub(crate) struct RelayContext {
limit: u64, receivers: DashMap<TypeTag, Arc<RelayReceiverContext>>,
processing: AtomicU64,
need_flush: AtomicBool, need_flush: AtomicBool,
ready: AtomicBool,
flushed: Notify, flushed: Notify,
synchronized: Notify, synchronized: Notify,
closed: Notify, closed: Notify,
response: Notify,
} }
impl PermitDrop for RelayContext { pub struct RelayReceiverContext {
limit: u64,
processing: AtomicU64,
response: Arc<Notify>,
}
impl RelayReceiverContext {
fn new(limit: u64) -> Self {
Self {
limit,
processing: Default::default(),
response: Arc::new(Notify::new()),
}
}
}
impl PermitDrop for RelayReceiverContext {
fn permit_drop(&self) { fn permit_drop(&self) {
self.processing.fetch_sub(1, Ordering::SeqCst); self.processing.fetch_sub(1, Ordering::SeqCst);
} }
@ -51,17 +67,16 @@ where
waiters: Slab<oneshot::Sender<Result<Box<dyn Message>, Error>>>, waiters: Slab<oneshot::Sender<Result<Box<dyn Message>, Error>>>,
} }
impl<S> RelayWrapper<S> { impl<S> RelayWrapper<S> {
pub fn new(inner: S, limit: u64) -> Self { pub fn new(inner: S) -> Self {
Self { Self {
inner, inner,
context: Arc::new(RelayContext { context: Arc::new(RelayContext {
limit, receivers: DashMap::new(),
processing: AtomicU64::new(0),
need_flush: AtomicBool::new(false), need_flush: AtomicBool::new(false),
ready: AtomicBool::new(false),
flushed: Notify::new(), flushed: Notify::new(),
synchronized: Notify::new(), synchronized: Notify::new(),
closed: Notify::new(), closed: Notify::new(),
response: Notify::new(),
}), }),
waiters: sharded_slab::Slab::new_with_config::<SlabCfg>(), waiters: sharded_slab::Slab::new_with_config::<SlabCfg>(),
} }
@ -95,9 +110,7 @@ where
fn wrapper(&self) -> Option<AnyWrapperRef<'_>> { fn wrapper(&self) -> Option<AnyWrapperRef<'_>> {
None None
} }
fn wrapper_arc(self: Arc<Self>) -> Option<AnyWrapperArc> {
None
}
fn send_boxed( fn send_boxed(
&self, &self,
mid: u64, mid: u64,
@ -148,12 +161,19 @@ where
.ok_or_else(|| Error::AddListenerError)? as _) .ok_or_else(|| Error::AddListenerError)? as _)
} }
fn try_reserve(&self) -> Option<Permit> { fn try_reserve(&self, tt: &TypeTag) -> Option<Permit> {
loop { if !self.context.receivers.contains_key(tt) {
let count = self.context.processing.load(Ordering::Relaxed); self.context
.receivers
.insert(tt.clone(), Arc::new(RelayReceiverContext::new(16)));
}
if count < self.context.limit { loop {
let res = self.context.processing.compare_exchange( let context = self.context.receivers.get(tt).unwrap();
let count = context.processing.load(Ordering::Relaxed);
if count < context.limit {
let res = context.processing.compare_exchange(
count, count,
count + 1, count + 1,
Ordering::SeqCst, Ordering::SeqCst,
@ -162,7 +182,7 @@ where
if res.is_ok() { if res.is_ok() {
break Some(Permit { break Some(Permit {
fuse: false, fuse: false,
inner: self.context.clone(), inner: context.clone(),
}); });
} }
@ -173,8 +193,18 @@ where
} }
} }
fn reserve_notify(&self) -> &Notify { fn reserve_notify(&self, tt: &TypeTag) -> Arc<Notify> {
&self.context.response if !self.context.receivers.contains_key(tt) {
self.context
.receivers
.insert(tt.clone(), Arc::new(RelayReceiverContext::new(16)));
}
self.context.receivers.get(tt).unwrap().response.clone()
}
fn ready(&self) -> bool {
self.context.ready.load(Ordering::SeqCst)
} }
fn start_polling( fn start_polling(
@ -187,6 +217,8 @@ where
let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await; let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await;
match event { match event {
Event::Pause => self.context.ready.store(false, Ordering::SeqCst),
Event::Ready => self.context.ready.store(true, Ordering::SeqCst),
Event::Exited => { Event::Exited => {
self.context.closed.notify_waiters(); self.context.closed.notify_waiters();
break; break;
@ -194,8 +226,11 @@ where
Event::Flushed => self.context.flushed.notify_waiters(), Event::Flushed => self.context.flushed.notify_waiters(),
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(), Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => { Event::Response(mid, resp) => {
self.context.processing.fetch_sub(1, Ordering::SeqCst); let tt = if let Ok(bm) = &resp {
self.context.response.notify_one(); Some(bm.type_tag())
} else {
None
};
if let Some(chan) = self.waiters.take(mid as _) { if let Some(chan) = self.waiters.take(mid as _) {
if let Err(err) = chan.send(resp) { if let Err(err) = chan.send(resp) {
@ -203,6 +238,13 @@ where
} }
} else { } else {
warn!("No waiters for mid({})", mid); warn!("No waiters for mid({})", mid);
};
if let Some(tt) = tt {
if let Some(ctx) = self.context.receivers.get(&tt) {
ctx.processing.fetch_sub(1, Ordering::SeqCst);
ctx.response.notify_one();
}
} }
} }

9
src/relays/mod.rs Normal file
View File

@ -0,0 +1,9 @@
#[cfg(feature = "quic")]
mod quic;
pub enum AuthKind {
Token(String),
}
#[cfg(feature = "quic")]
pub use quic::*;

View File

@ -0,0 +1 @@
pub struct QuicClientRelay {}

5
src/relays/quic/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod client;
mod server;
pub use client::QuicClientRelay;
pub use server::QuicServerRelay;

View File

@ -0,0 +1 @@
pub struct QuicServerRelay {}

View File

@ -192,7 +192,7 @@ impl TypeTagAccept for TestRelay {
&self, &self,
msg: &messagebus::TypeTag, msg: &messagebus::TypeTag,
resp: Option<&messagebus::TypeTag>, resp: Option<&messagebus::TypeTag>,
err: Option<&messagebus::TypeTag>, _err: Option<&messagebus::TypeTag>,
) -> bool { ) -> bool {
if msg.as_ref() == Msg::<i16>::type_tag_().as_ref() { if msg.as_ref() == Msg::<i16>::type_tag_().as_ref() {
if let Some(resp) = resp { if let Some(resp) = resp {
@ -309,7 +309,7 @@ async fn test_relay() {
}; };
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register_relay(relay, 1) .register_relay(relay)
.register(TmpReceiver) .register(TmpReceiver)
.subscribe_async::<Msg<i32>>( .subscribe_async::<Msg<i32>>(
1, 1,