Relays: rewrite relay trat
This commit is contained in:
parent
993ff4a46d
commit
3bb2fe7492
@ -3,14 +3,7 @@ use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc};
|
||||
use futures::{Future, FutureExt};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
envelop::TypeTag,
|
||||
error::{Error, StdSyncSendError},
|
||||
receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
|
||||
receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler,
|
||||
AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler,
|
||||
IntoBoxedMessage, Message, SynchronizedHandler, Untyped,
|
||||
};
|
||||
use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, IntoBoxedMessage, Message, SynchronizedHandler, Untyped, envelop::TypeTag, error::{Error, StdSyncSendError}, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers, relay::TypeMap};
|
||||
|
||||
pub trait ReceiverSubscriberBuilder<T, M, R, E>:
|
||||
SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
|
||||
@ -92,7 +85,7 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
|
||||
let (inner, poller) = S::build(cfg);
|
||||
|
||||
let receiver = Receiver::new::<M, R, E, S>(queue, inner);
|
||||
let poller2 = receiver.start_polling_events::<R, E>();
|
||||
let poller2 = receiver.start_polling();
|
||||
self.receivers
|
||||
.entry(M::type_tag_())
|
||||
.or_insert_with(Vec::new)
|
||||
@ -162,7 +155,7 @@ impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
|
||||
let (inner, poller) = S::build(cfg);
|
||||
|
||||
let receiver = Receiver::new::<M, R, E, S>(queue, inner);
|
||||
let poller2 = receiver.start_polling_events::<R, E>();
|
||||
let poller2 = receiver.start_polling();
|
||||
self.receivers
|
||||
.entry(M::type_tag_())
|
||||
.or_insert_with(Vec::new)
|
||||
@ -269,8 +262,14 @@ impl Module {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn register_relay<T: Send + Sync + 'static, P: IntoIterator<Item = TypeTag>>(pat: P, obj: T) {
|
||||
|
||||
pub fn register_relay<S: SendUntypedReceiver + Send + Sync + 'static>(queue: u64, map: TypeMap, inner: S) {
|
||||
let receiver = Receiver::new_relay::<S>(queue, map, inner);
|
||||
let poller2 = receiver.start_polling();
|
||||
|
||||
// self.receivers
|
||||
// .entry(M::type_tag_())
|
||||
// .or_insert_with(Vec::new)
|
||||
// .push((receiver, poller, poller2));
|
||||
}
|
||||
|
||||
pub fn register<T: Send + Sync + 'static>(
|
||||
|
@ -512,12 +512,7 @@ impl BusInner {
|
||||
.into_iter()
|
||||
.map(|item| item.iter())
|
||||
.flatten()
|
||||
.filter(move |x| match (rid, eid) {
|
||||
(Some(r), Some(e)) => x.accept_response_type(r) && x.accept_error_type(e),
|
||||
(Some(r), None) => x.accept_response_type(r),
|
||||
(None, Some(e)) => x.accept_error_type(e),
|
||||
(None, None) => true,
|
||||
})
|
||||
.filter(move |x| x.accept(tid, rid, eid))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,4 @@
|
||||
use crate::{
|
||||
envelop::{IntoBoxedMessage, TypeTag},
|
||||
error::{SendError, StdSyncSendError},
|
||||
trait_object::TraitObject,
|
||||
Bus, Error, Message,
|
||||
};
|
||||
use crate::{Bus, Error, Message, envelop::{IntoBoxedMessage, TypeTag}, error::{SendError, StdSyncSendError}, relay::TypeMap, trait_object::TraitObject};
|
||||
use core::{
|
||||
any::TypeId,
|
||||
fmt,
|
||||
@ -94,9 +89,8 @@ pub trait ReceiverTrait: Send + Sync {
|
||||
fn try_reserve(&self) -> Option<Permit>;
|
||||
fn reserve_notify(&self) -> &Notify;
|
||||
|
||||
fn accept_message_type(&self, tt: &TypeTag) -> bool;
|
||||
fn accept_response_type(&self, tt: &TypeTag) -> bool;
|
||||
fn accept_error_type(&self, tt: &TypeTag) -> bool;
|
||||
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool;
|
||||
fn start_polling(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>;
|
||||
}
|
||||
|
||||
pub trait ReceiverPollerBuilder {
|
||||
@ -345,16 +339,24 @@ where
|
||||
&self.context.response
|
||||
}
|
||||
|
||||
fn accept_message_type(&self, tt: &TypeTag) -> bool {
|
||||
M::type_tag_().as_ref() == tt.as_ref()
|
||||
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||
if let Some(resp) = resp {
|
||||
if resp.as_ref() != R::type_tag_().as_ref() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(err) = err {
|
||||
if err.as_ref() != E::type_tag_().as_ref() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
msg.as_ref() == M::type_tag_().as_ref()
|
||||
}
|
||||
|
||||
fn accept_response_type(&self, tt: &TypeTag) -> bool {
|
||||
R::type_tag_().as_ref() == tt.as_ref()
|
||||
}
|
||||
|
||||
fn accept_error_type(&self, tt: &TypeTag) -> bool {
|
||||
E::type_tag_().as_ref() == tt.as_ref()
|
||||
fn start_polling(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
|
||||
self.start_polling_events()
|
||||
}
|
||||
}
|
||||
|
||||
@ -602,28 +604,18 @@ impl Receiver {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn new_relay<S>(limit: u64, inner: S) -> Self
|
||||
pub(crate) fn new_relay<S>(limit: u64, type_map: TypeMap, inner: S) -> Self
|
||||
where
|
||||
S: SendUntypedReceiver + 'static,
|
||||
{
|
||||
Self {
|
||||
inner: Arc::new(RelayWrapper::new(inner, limit)),
|
||||
inner: Arc::new(RelayWrapper::new(inner, type_map, limit)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn accept_message_type(&self, tt: &TypeTag) -> bool {
|
||||
self.inner.accept_message_type(tt)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn accept_response_type(&self, tt: &TypeTag) -> bool {
|
||||
self.inner.accept_response_type(tt)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn accept_error_type(&self, tt: &TypeTag) -> bool {
|
||||
self.inner.accept_error_type(tt)
|
||||
pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||
self.inner.accept(msg, resp, err)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@ -697,12 +689,8 @@ impl Receiver {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn start_polling_events<R: Message, E: StdSyncSendError>(&self) -> Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>> {
|
||||
Some(self.inner
|
||||
.clone()
|
||||
.wrapper_arc()?
|
||||
.cast_ret_and_error::<R, E>().unwrap()
|
||||
.start_polling_events())
|
||||
pub fn start_polling(&self) -> Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>> {
|
||||
Some(self.inner.clone().start_polling())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
268
src/relay.rs
268
src/relay.rs
@ -1,6 +1,9 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{collections::HashMap, pin::Pin, sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}};
|
||||
use futures::Future;
|
||||
use tokio::sync::{Notify, oneshot};
|
||||
use crate::{Message, Permit, TypeTag, error::Error, receiver::{Action, AnyReceiver, AnyWrapperArc, AnyWrapperRef, ReceiverTrait, SendUntypedReceiver, Stats}};
|
||||
use crate::{Bus, Message, Permit, TypeTag, TypeTagged, error::Error, receiver::{Action, AnyReceiver, AnyWrapperArc, AnyWrapperRef, PermitDrop, ReceiverTrait, SendUntypedReceiver, Stats}};
|
||||
|
||||
|
||||
|
||||
struct SlabCfg;
|
||||
impl sharded_slab::Config for SlabCfg {
|
||||
@ -9,26 +12,158 @@ impl sharded_slab::Config for SlabCfg {
|
||||
|
||||
type Slab<T> = sharded_slab::Slab<T, SlabCfg>;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! type_map {
|
||||
(inbound : $($tt:tt)*) => {{
|
||||
let mut type_map = TypeMap::new();
|
||||
type_map!(@inbound type_map, $($tt)*);
|
||||
type_map
|
||||
}};
|
||||
|
||||
pub enum TypeTagKind {
|
||||
Message,
|
||||
Response,
|
||||
Error,
|
||||
}
|
||||
|
||||
pub struct TypeDescriptor {
|
||||
kind: TypeTagKind,
|
||||
|
||||
(outbound : $($tt:tt)*) => {
|
||||
let mut type_map = TypeMap::new();
|
||||
type_map!(@outbound type_map, $($tt)*);
|
||||
type_map
|
||||
};
|
||||
|
||||
(@inbound $tm: ident, $msg:ty => $resp: ty, $($tt:tt)*) => {
|
||||
$tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
|
||||
type_map!(@inbound $tm, $($tt)*)
|
||||
};
|
||||
|
||||
(@inbound $tm: ident, $msg:ty => ($resp:ty) throws $err: ty, $($tt:tt)*) => {
|
||||
$tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), Some(<$err as TypeTagged>::type_tag_()));
|
||||
type_map!(@inbound $tm, $($tt)*)
|
||||
};
|
||||
|
||||
(@inbound $tm: ident, $msg:ty => $resp: ty) => {
|
||||
$tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
|
||||
};
|
||||
|
||||
(@inbound $tm: ident,) => {};
|
||||
|
||||
(@inbound $tm: ident, outbound : $($tt:tt)*) => {
|
||||
type_map!(@outbound $tm, $($tt)*)
|
||||
};
|
||||
|
||||
(@outbound $tm: ident, $msg:ty => ($resp:ty) throws $err: ty, $($tt:tt)*) => {
|
||||
$tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), Some(<$err as TypeTagged>::type_tag_()));
|
||||
type_map!(@outbound $tm, $($tt)*)
|
||||
};
|
||||
|
||||
(@outbound $tm: ident, $msg:ty => $resp: ty, $($tt:tt)*) => {
|
||||
$tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
|
||||
type_map!(@outbound $tm, $($tt)*)
|
||||
};
|
||||
|
||||
(@outbound $tm: ident, $msg:ty => $resp: ty) => {
|
||||
$tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
|
||||
};
|
||||
|
||||
(@outbound $tm: ident,) => {};
|
||||
|
||||
(@outbound $tm: ident, inbound : $($tt:tt)*) => {
|
||||
type_map!(@inbound $tm, $($tt)*)
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TypeMap {
|
||||
hash_map: HashMap<TypeTag, TypeDescriptor>
|
||||
inbound: HashMap<TypeTag, Vec<(TypeTag, Option<TypeTag>)>>,
|
||||
outbound: HashMap<TypeTag, Vec<(TypeTag, Option<TypeTag>)>>,
|
||||
}
|
||||
|
||||
impl TypeMap {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inbound: Default::default(),
|
||||
outbound: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_inbound(&mut self, msg: TypeTag, resp: TypeTag, err: Option<TypeTag>) -> &mut Self {
|
||||
let vec = self.inbound.entry(msg)
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
vec.push((resp, err));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_outbound(&mut self, msg: TypeTag, resp: TypeTag, err: Option<TypeTag>) -> &mut Self {
|
||||
let vec = self.outbound.entry(msg)
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
vec.push((resp, err));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn accept_inbound(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||
if let Some(vec) = self.inbound.get(msg) {
|
||||
if let Some(rr) = resp {
|
||||
vec.iter().find(|(r, e)| {
|
||||
let ee = if let Some(e) = e {
|
||||
if let Some(te) = err {
|
||||
te.as_ref() == e.as_ref()
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
err.is_none()
|
||||
};
|
||||
|
||||
r.as_ref() == rr.as_ref() && ee
|
||||
}).is_some()
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn accept_outbound(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||
if let Some(vec) = self.outbound.get(msg) {
|
||||
if let Some(rr) = resp {
|
||||
vec.iter().find(|(r, e)| {
|
||||
let ee = if let Some(e) = e {
|
||||
if let Some(te) = err {
|
||||
te.as_ref() == e.as_ref()
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
err.is_none()
|
||||
};
|
||||
|
||||
r.as_ref() == rr.as_ref() && ee
|
||||
}).is_some()
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RelayContext {
|
||||
|
||||
limit: u64,
|
||||
processing: AtomicU64,
|
||||
need_flush: AtomicBool,
|
||||
flushed: Notify,
|
||||
synchronized: Notify,
|
||||
closed: Notify,
|
||||
response: Notify,
|
||||
type_map: TypeMap,
|
||||
}
|
||||
|
||||
impl PermitDrop for RelayContext {
|
||||
fn permit_drop(&self) {
|
||||
self.processing.fetch_sub(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub(crate) struct RelayWrapper<S>
|
||||
where
|
||||
S: 'static,
|
||||
@ -38,11 +173,18 @@ where
|
||||
waiters: Slab<oneshot::Sender<Result<Box<dyn Message>, Error>>>,
|
||||
}
|
||||
impl<S> RelayWrapper<S> {
|
||||
pub fn new(inner: S, limit: u64) -> Self {
|
||||
pub fn new(inner: S, type_map: TypeMap, limit: u64) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
context: Arc::new(RelayContext {
|
||||
|
||||
limit,
|
||||
processing: AtomicU64::new(0),
|
||||
need_flush: AtomicBool::new(false),
|
||||
flushed: Notify::new(),
|
||||
synchronized: Notify::new(),
|
||||
closed: Notify::new(),
|
||||
response: Notify::new(),
|
||||
type_map,
|
||||
}),
|
||||
waiters: sharded_slab::Slab::new_with_config::<SlabCfg>(),
|
||||
}
|
||||
@ -59,7 +201,6 @@ where
|
||||
fn typed(&self) -> Option<AnyReceiver<'_>> { None }
|
||||
fn wrapper(&self) -> Option<AnyWrapperRef<'_>> { None }
|
||||
fn wrapper_arc(self: Arc<Self>) -> Option<AnyWrapperArc> { None }
|
||||
|
||||
fn send_boxed(
|
||||
&self,
|
||||
mid: u64,
|
||||
@ -69,7 +210,7 @@ where
|
||||
}
|
||||
|
||||
fn need_flush(&self) -> bool {
|
||||
false
|
||||
self.context.need_flush.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
fn stats(&self) -> Result<Stats, Error<Action>> {
|
||||
@ -81,7 +222,7 @@ where
|
||||
}
|
||||
|
||||
fn close_notify(&self) -> &Notify {
|
||||
unimplemented!()
|
||||
&self.context.closed
|
||||
}
|
||||
|
||||
fn sync(&self) -> Result<(), Error<Action>> {
|
||||
@ -89,7 +230,7 @@ where
|
||||
}
|
||||
|
||||
fn sync_notify(&self) -> &Notify {
|
||||
unimplemented!()
|
||||
&self.context.synchronized
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<(), Error<Action>> {
|
||||
@ -97,7 +238,7 @@ where
|
||||
}
|
||||
|
||||
fn flush_notify(&self) -> &Notify {
|
||||
unimplemented!()
|
||||
&self.context.flushed
|
||||
}
|
||||
|
||||
fn add_response_listener(
|
||||
@ -111,22 +252,93 @@ where
|
||||
}
|
||||
|
||||
fn try_reserve(&self) -> Option<Permit> {
|
||||
None
|
||||
loop {
|
||||
let count = self.context.processing.load(Ordering::Relaxed);
|
||||
|
||||
if count < self.context.limit {
|
||||
let res = self.context.processing.compare_exchange(
|
||||
count,
|
||||
count + 1,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
if res.is_ok() {
|
||||
break Some(Permit {
|
||||
fuse: false,
|
||||
inner: self.context.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// continue
|
||||
} else {
|
||||
break None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reserve_notify(&self) -> &Notify {
|
||||
unimplemented!()
|
||||
&self.context.response
|
||||
}
|
||||
|
||||
fn accept_message_type(&self, tt: &TypeTag) -> bool {
|
||||
false
|
||||
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||
self.context.type_map.accept_inbound(msg, resp, err)
|
||||
}
|
||||
|
||||
fn accept_response_type(&self, tt: &TypeTag) -> bool {
|
||||
false
|
||||
fn start_polling(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
|
||||
Box::new(move |_| Box::pin(async move {
|
||||
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn accept_error_type(&self, tt: &TypeTag) -> bool {
|
||||
false
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use crate as messagebus;
|
||||
|
||||
use crate::derive::{ Message, Error };
|
||||
|
||||
#[derive(Debug, Message)]
|
||||
struct Test1;
|
||||
|
||||
#[derive(Debug, Message)]
|
||||
struct Test2;
|
||||
|
||||
#[derive(Debug, Message)]
|
||||
struct Test3;
|
||||
|
||||
#[derive(Debug, Message)]
|
||||
struct Test4;
|
||||
#[derive(Debug, Error)]
|
||||
struct Error1;
|
||||
#[derive(Debug, Error)]
|
||||
struct Error2;
|
||||
#[test]
|
||||
fn test_type_map() {
|
||||
let tm = type_map! {
|
||||
inbound:
|
||||
Test1 => (Test2) throws Error1,
|
||||
Test2 => (Test3) throws Error1,
|
||||
|
||||
outbound:
|
||||
Test3 => (Test4) throws Error2,
|
||||
};
|
||||
|
||||
assert!(tm.accept_inbound(&Test1::type_tag_(), None, None));
|
||||
assert!(tm.accept_inbound(&Test2::type_tag_(), None, None));
|
||||
assert!(!tm.accept_inbound(&Test3::type_tag_(), None, None));
|
||||
assert!(tm.accept_outbound(&Test3::type_tag_(), None, None));
|
||||
|
||||
assert!(tm.accept_inbound(&Test1::type_tag_(), Some(&Test2::type_tag_()), None));
|
||||
assert!(!tm.accept_inbound(&Test1::type_tag_(), Some(&Test3::type_tag_()), None));
|
||||
|
||||
assert!(!tm.accept_inbound(&Test2::type_tag_(), Some(&Test2::type_tag_()), None));
|
||||
assert!(tm.accept_inbound(&Test2::type_tag_(), Some(&Test3::type_tag_()), None));
|
||||
|
||||
assert!(tm.accept_inbound(&Test1::type_tag_(), Some(&Test2::type_tag_()), Some(&Error1::type_tag_())));
|
||||
assert!(!tm.accept_inbound(&Test1::type_tag_(), Some(&Test2::type_tag_()), Some(&Error2::type_tag_())));
|
||||
|
||||
assert!(tm.accept_outbound(&Test3::type_tag_(), Some(&Test4::type_tag_()), Some(&Error2::type_tag_())));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user