Relays + example (test)

This commit is contained in:
Andrey Tkachenko 2021-07-30 16:58:30 +04:00
parent 3bb2fe7492
commit 9a887d4821
10 changed files with 732 additions and 418 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target /target
/derive/target
Cargo.lock Cargo.lock

View File

@ -1,6 +1,6 @@
[package] [package]
name = "messagebus" name = "messagebus"
version = "0.6.5" version = "0.8.0"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"] authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git" repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"] keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -1,9 +1,17 @@
use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc}; use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc};
use futures::{Future, FutureExt}; use futures::{Future, FutureExt};
use smallvec::SmallVec;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, IntoBoxedMessage, Message, SynchronizedHandler, Untyped, envelop::TypeTag, error::{Error, StdSyncSendError}, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers, relay::TypeMap}; use crate::{
envelop::TypeTag,
error::{Error, StdSyncSendError},
receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler,
AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler,
IntoBoxedMessage, Message, Relay, SynchronizedHandler, Untyped,
};
pub trait ReceiverSubscriberBuilder<T, M, R, E>: pub trait ReceiverSubscriberBuilder<T, M, R, E>:
SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
@ -31,49 +39,35 @@ pub struct SyncEntry;
pub struct UnsyncEntry; pub struct UnsyncEntry;
#[must_use] #[must_use]
pub struct RegisterEntry<K, T, F, B> { pub struct RegisterEntry<K, T, F, P, B> {
item: Untyped, item: Untyped,
payload: B, payload: B,
builder: F, builder: F,
receivers: HashMap< poller: P,
TypeTag, receivers: HashMap<TypeTag, Receiver>,
Vec<( pollers: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
Receiver,
Box<
dyn FnOnce(
Untyped,
)
-> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
)>,
>,
_m: PhantomData<(K, T)>, _m: PhantomData<(K, T)>,
} }
impl<K, T: 'static, F, B> RegisterEntry<K, T, F, B> impl<K, T: 'static, F, P, B> RegisterEntry<K, T, F, P, B>
where where
F: FnMut( F: FnMut(&mut B, TypeTag, Receiver),
&mut B, P: FnMut(&mut B, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
),
{ {
pub fn done(mut self) -> B { pub fn done(mut self) -> B {
for (tid, v) in self.receivers { for (tid, v) in self.receivers {
for (r, poller, poller2) in v { (self.builder)(&mut self.payload, tid, v);
let poller = poller(self.item.clone());
(self.builder)(&mut self.payload, (tid.clone(), r), poller, poller2);
} }
for p in self.pollers {
(self.poller)(&mut self.payload, p);
} }
self.payload self.payload
} }
} }
impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> { impl<T, F, P, B> RegisterEntry<UnsyncEntry, T, F, P, B> {
pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self
where where
T: Send + 'static, T: Send + 'static,
@ -86,10 +80,9 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
let receiver = Receiver::new::<M, R, E, S>(queue, inner); let receiver = Receiver::new::<M, R, E, S>(queue, inner);
let poller2 = receiver.start_polling(); let poller2 = receiver.start_polling();
self.receivers self.receivers.insert(M::type_tag_(), receiver);
.entry(M::type_tag_()) self.pollers.push(poller(self.item.clone()));
.or_insert_with(Vec::new) self.pollers.push(poller2);
.push((receiver, poller, poller2));
self self
} }
@ -143,7 +136,7 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
} }
} }
impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> { impl<T, F, P, B> RegisterEntry<SyncEntry, T, F, P, B> {
pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self pub fn subscribe<M, S, R, E>(mut self, queue: u64, cfg: S::Config) -> Self
where where
T: Send + Sync + 'static, T: Send + Sync + 'static,
@ -156,10 +149,9 @@ impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
let receiver = Receiver::new::<M, R, E, S>(queue, inner); let receiver = Receiver::new::<M, R, E, S>(queue, inner);
let poller2 = receiver.start_polling(); let poller2 = receiver.start_polling();
self.receivers self.receivers.insert(M::type_tag_(), receiver);
.entry(M::type_tag_()) self.pollers.push(poller(self.item.clone()));
.or_insert_with(Vec::new) self.pollers.push(poller2);
.push((receiver, poller, poller2));
self self
} }
@ -233,16 +225,16 @@ impl MessageTypeDescriptor {
} }
pub struct Module { pub struct Module {
message_types: Vec<(TypeTag, MessageTypeDescriptor)>, message_types: HashMap<TypeTag, MessageTypeDescriptor>,
receivers: Vec<(TypeTag, Receiver)>, receivers: HashMap<TypeTag, SmallVec<[Receiver; 4]>>,
pollings: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>, pollings: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
} }
impl Module { impl Module {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
message_types: Vec::new(), message_types: HashMap::new(),
receivers: Vec::new(), receivers: HashMap::new(),
pollings: Vec::new(), pollings: Vec::new(),
} }
} }
@ -252,24 +244,40 @@ impl Module {
>( >(
mut self, mut self,
) -> Self { ) -> Self {
self.message_types.push(( self.message_types.insert(
M::type_tag_(), M::type_tag_(),
MessageTypeDescriptor { MessageTypeDescriptor {
de: Box::new(move |de| Ok(M::deserialize(de)?.into_boxed())), de: Box::new(move |de| Ok(M::deserialize(de)?.into_boxed())),
}, },
)); );
self self
} }
pub fn register_relay<S: SendUntypedReceiver + Send + Sync + 'static>(queue: u64, map: TypeMap, inner: S) { pub fn register_relay<S: Relay + Send + Sync + 'static>(
let receiver = Receiver::new_relay::<S>(queue, map, inner); mut self,
let poller2 = receiver.start_polling(); inner: S,
queue: u64,
) -> Self {
let receiver = Receiver::new_relay::<S>(queue, inner);
self.pollings.push(receiver.start_polling());
// self.receivers let mut receiver_added = false;
// .entry(M::type_tag_()) receiver.iter_types(&mut |msg, _, _| {
// .or_insert_with(Vec::new) self.receivers
// .push((receiver, poller, poller2)); .entry(msg.clone())
.or_insert_with(SmallVec::new)
.push(receiver.clone());
if !receiver_added {
receiver_added = true;
false
} else {
true
}
});
self
} }
pub fn register<T: Send + Sync + 'static>( pub fn register<T: Send + Sync + 'static>(
@ -278,25 +286,19 @@ impl Module {
) -> RegisterEntry< ) -> RegisterEntry<
SyncEntry, SyncEntry,
T, T,
impl FnMut( impl FnMut(&mut Self, TypeTag, Receiver),
&mut Self, impl FnMut(&mut Self, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
),
Self, Self,
> { > {
RegisterEntry { RegisterEntry {
item: Arc::new(item) as Untyped, item: Arc::new(item) as Untyped,
payload: self, payload: self,
builder: |p: &mut Self, val, poller, poller2| { builder: |p: &mut Self, tt, r| {
p.receivers.push(val); p.receivers.entry(tt).or_insert_with(SmallVec::new).push(r);
p.pollings.push(poller);
if let Some(poller2) = poller2 {
p.pollings.push(poller2);
}
}, },
poller: |p: &mut Self, poller| p.pollings.push(poller),
receivers: HashMap::new(), receivers: HashMap::new(),
pollers: Vec::new(),
_m: Default::default(), _m: Default::default(),
} }
} }
@ -307,25 +309,21 @@ impl Module {
) -> RegisterEntry< ) -> RegisterEntry<
UnsyncEntry, UnsyncEntry,
T, T,
impl FnMut( impl FnMut(&mut Self, TypeTag, Receiver),
&mut Self, impl FnMut(&mut Self, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
),
Self, Self,
> { > {
let item = Arc::new(Mutex::new(item)) as Untyped;
RegisterEntry { RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped, item,
payload: self, payload: self,
builder: |p: &mut Self, val, poller, poller2| { builder: |p: &mut Self, tt, r| {
p.receivers.push(val); p.receivers.entry(tt).or_insert_with(SmallVec::new).push(r);
p.pollings.push(poller);
if let Some(poller2) = poller2 {
p.pollings.push(poller2);
}
}, },
poller: |p: &mut Self, poller| p.pollings.push(poller),
receivers: HashMap::new(), receivers: HashMap::new(),
pollers: Vec::new(),
_m: Default::default(), _m: Default::default(),
} }
} }
@ -360,31 +358,35 @@ impl BusBuilder {
BusBuilder { inner } BusBuilder { inner }
} }
pub fn register_relay<S: Relay + Send + Sync + 'static>(self, inner: S, queue: u64) -> Self {
let inner = self.inner.register_relay(inner, queue);
BusBuilder { inner }
}
pub fn register<T: Send + Sync + 'static>( pub fn register<T: Send + Sync + 'static>(
self, self,
item: T, item: T,
) -> RegisterEntry< ) -> RegisterEntry<
SyncEntry, SyncEntry,
T, T,
impl FnMut( impl FnMut(&mut Self, TypeTag, Receiver),
&mut Self, impl FnMut(&mut Self, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
),
Self, Self,
> { > {
RegisterEntry { RegisterEntry {
item: Arc::new(item) as Untyped, item: Arc::new(item) as Untyped,
payload: self, payload: self,
builder: |p: &mut Self, val, poller, poller2| { builder: |p: &mut Self, tt, r| {
p.inner.receivers.push(val); p.inner
p.inner.pollings.push(poller); .receivers
if let Some(poller2) = poller2 { .entry(tt)
p.inner.pollings.push(poller2); .or_insert_with(SmallVec::new)
} .push(r);
}, },
poller: |p: &mut Self, poller| p.inner.pollings.push(poller),
receivers: HashMap::new(), receivers: HashMap::new(),
pollers: Vec::new(),
_m: Default::default(), _m: Default::default(),
} }
} }
@ -395,26 +397,23 @@ impl BusBuilder {
) -> RegisterEntry< ) -> RegisterEntry<
UnsyncEntry, UnsyncEntry,
T, T,
impl FnMut( impl FnMut(&mut Self, TypeTag, Receiver),
&mut Self, impl FnMut(&mut Self, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
),
Self, Self,
> { > {
RegisterEntry { RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped, item: Arc::new(Mutex::new(item)) as Untyped,
payload: self, payload: self,
builder: |p: &mut Self, val, poller, poller2| { builder: |p: &mut Self, tt, r| {
p.inner.receivers.push(val); p.inner
p.inner.pollings.push(poller); .receivers
.entry(tt)
if let Some(poller2) = poller2 { .or_insert_with(SmallVec::new)
p.inner.pollings.push(poller2); .push(r);
}
}, },
poller: |p: &mut Self, poller| p.inner.pollings.push(poller),
receivers: HashMap::new(), receivers: HashMap::new(),
pollers: Vec::new(),
_m: Default::default(), _m: Default::default(),
} }
} }
@ -426,11 +425,19 @@ impl BusBuilder {
} }
pub fn build(self) -> (Bus, impl Future<Output = ()>) { pub fn build(self) -> (Bus, impl Future<Output = ()>) {
let mut receivers = HashMap::new();
for (key, values) in self.inner.receivers {
for v in values {
receivers
.entry(key.clone())
.or_insert_with(SmallVec::new)
.push(v);
}
}
let bus = Bus { let bus = Bus {
inner: Arc::new(BusInner::new( inner: Arc::new(BusInner::new(receivers, self.inner.message_types)),
self.inner.receivers,
self.inner.message_types,
)),
}; };
let mut futs = Vec::with_capacity(self.inner.pollings.len() * 2); let mut futs = Vec::with_capacity(self.inner.pollings.len() * 2);

View File

@ -58,6 +58,19 @@ impl<T: TypeTagged> TypeTagged for Arc<T> {
} }
} }
impl<T: TypeTagged> TypeTagged for Box<T> {
fn type_tag_() -> TypeTag {
T::type_tag_()
}
fn type_tag(&self) -> TypeTag {
T::type_tag(&*self)
}
fn type_name(&self) -> Cow<str> {
T::type_name(&*self)
}
}
impl Message for () { impl Message for () {
fn as_any_ref(&self) -> &dyn Any { fn as_any_ref(&self) -> &dyn Any {
self self

View File

@ -9,26 +9,41 @@ use crate::{
Message, Message,
}; };
pub trait DynError: TypeTagged {
fn description(&self) -> String;
}
pub trait StdSyncSendError: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static {} pub trait StdSyncSendError: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static {}
impl<T: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static> StdSyncSendError for T {} impl<T: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static> StdSyncSendError for T {}
#[derive(Debug, Error)] #[derive(Debug)]
pub enum VoidError {} pub struct GenericError {
pub type_tag: TypeTag,
pub description: String,
}
impl TypeTagged for VoidError { impl fmt::Display for GenericError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GenericError({}): {}", self.type_tag, self.description)
}
}
impl std::error::Error for GenericError {}
impl TypeTagged for GenericError {
fn type_name(&self) -> Cow<str> { fn type_name(&self) -> Cow<str> {
type_name::<VoidError>().into() type_name::<GenericError>().into()
} }
fn type_tag(&self) -> TypeTag { fn type_tag(&self) -> TypeTag {
type_name::<VoidError>().into() type_name::<GenericError>().into()
} }
fn type_tag_() -> TypeTag fn type_tag_() -> TypeTag
where where
Self: Sized, Self: Sized,
{ {
type_name::<VoidError>().into() type_name::<GenericError>().into()
} }
} }
@ -60,7 +75,7 @@ impl<M: Message> SendError<M> {
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = VoidError> { pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = GenericError> {
#[error("Message Send Error: {0}")] #[error("Message Send Error: {0}")]
SendError(#[from] SendError<M>), SendError(#[from] SendError<M>),

View File

@ -4,8 +4,8 @@ pub mod error;
mod handler; mod handler;
mod receiver; mod receiver;
pub mod receivers; pub mod receivers;
mod trait_object;
mod relay; mod relay;
mod trait_object;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -22,6 +22,11 @@ pub use envelop::{IntoBoxedMessage, Message, MessageBounds, SharedMessage, TypeT
use error::{Error, SendError, StdSyncSendError}; use error::{Error, SendError, StdSyncSendError};
pub use handler::*; pub use handler::*;
use receiver::Receiver; use receiver::Receiver;
pub use receiver::{
Action, Event, ReciveTypedReceiver, ReciveUnypedReceiver, SendTypedReceiver,
SendUntypedReceiver, TypeTagAccept,
};
pub use relay::Relay;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -57,19 +62,9 @@ pub struct BusInner {
impl BusInner { impl BusInner {
pub(crate) fn new( pub(crate) fn new(
input: Vec<(TypeTag, Receiver)>, receivers: HashMap<TypeTag, SmallVec<[Receiver; 4]>>,
mt: Vec<(TypeTag, MessageTypeDescriptor)>, message_types: HashMap<TypeTag, MessageTypeDescriptor>,
) -> Self { ) -> Self {
let mut receivers = HashMap::new();
let message_types: HashMap<TypeTag, MessageTypeDescriptor> = mt.into_iter().collect();
for (key, value) in input {
receivers
.entry(key)
.or_insert_with(SmallVec::new)
.push(value);
}
Self { Self {
message_types, message_types,
receivers, receivers,
@ -220,6 +215,13 @@ impl BusInner {
return Err(SendError::Closed(msg).into()); return Err(SendError::Closed(msg).into());
} }
for r in &self.receivers {
println!("{:?}: ", r.0);
for i in r.1 {
println!(" {:?}: ", i.name());
}
}
let tt = msg.type_tag(); let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
@ -332,7 +334,8 @@ impl BusInner {
let mut iter = self.select_receivers(&tid, options, Some(&rid), None); let mut iter = self.select_receivers(&tid, options, Some(&rid), None);
if let Some(rc) = iter.next() { if let Some(rc) = iter.next() {
let (mid, rx) = rc.add_response_waiter::<R>() let (mid, rx) = rc
.add_response_waiter::<R>()
.map_err(|x| x.specify::<M>())?; .map_err(|x| x.specify::<M>())?;
let mid = mid | 1 << (u64::BITS - 1); let mid = mid | 1 << (u64::BITS - 1);
@ -356,8 +359,10 @@ impl BusInner {
let mut iter = self.select_receivers(&tid, options, Some(&rid), Some(&eid)); let mut iter = self.select_receivers(&tid, options, Some(&rid), Some(&eid));
if let Some(rc) = iter.next() { if let Some(rc) = iter.next() {
let (mid, rx) = rc.add_response_waiter_we::<R, E>() let (mid, rx) = rc.add_response_waiter_we::<R, E>().map_err(|x| {
.map_err(|x| x.map_err(|_| unimplemented!()).map_msg(|_| unimplemented!()))?; x.map_err(|_| unimplemented!())
.map_msg(|_| unimplemented!())
})?;
rc.send(mid | 1 << (u64::BITS - 1), req, rc.reserve().await) rc.send(mid | 1 << (u64::BITS - 1), req, rc.reserve().await)
.map_err(|x| x.map_err(|_| unimplemented!()))?; .map_err(|x| x.map_err(|_| unimplemented!()))?;
@ -429,8 +434,10 @@ impl BusInner {
let mut iter = self.select_receivers(&tt, options, None, None); let mut iter = self.select_receivers(&tt, options, None, None);
if let Some(rc) = iter.next() { if let Some(rc) = iter.next() {
let (mid, rx) = rc.add_response_waiter_boxed() let (mid, rx) = rc.add_response_waiter_boxed().map_err(|x| {
.map_err(|x| x.map_err(|_| unimplemented!()).map_msg(|_| unimplemented!()))?; x.map_err(|_| unimplemented!())
.map_msg(|_| unimplemented!())
})?;
rc.send_boxed(mid | 1 << (usize::BITS - 1), req, rc.reserve().await)?; rc.send_boxed(mid | 1 << (usize::BITS - 1), req, rc.reserve().await)?;

View File

@ -1,4 +1,10 @@
use crate::{Bus, Error, Message, envelop::{IntoBoxedMessage, TypeTag}, error::{SendError, StdSyncSendError}, relay::TypeMap, trait_object::TraitObject}; use crate::relay::RelayWrapper;
use crate::{
envelop::{IntoBoxedMessage, TypeTag},
error::{GenericError, SendError, StdSyncSendError},
trait_object::TraitObject,
Bus, Error, Message, Relay,
};
use core::{ use core::{
any::TypeId, any::TypeId,
fmt, fmt,
@ -7,8 +13,8 @@ use core::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::{FutureExt, future::poll_fn};
use futures::Future; use futures::Future;
use futures::{future::poll_fn, FutureExt};
use std::{ use std::{
any::Any, any::Any,
borrow::Cow, borrow::Cow,
@ -18,7 +24,6 @@ use std::{
}, },
}; };
use tokio::sync::{oneshot, Notify}; use tokio::sync::{oneshot, Notify};
use crate::relay::RelayWrapper;
struct SlabCfg; struct SlabCfg;
impl sharded_slab::Config for SlabCfg { impl sharded_slab::Config for SlabCfg {
const RESERVED_BITS: usize = 1; const RESERVED_BITS: usize = 1;
@ -28,7 +33,11 @@ type Slab<T> = sharded_slab::Slab<T, SlabCfg>;
pub trait SendUntypedReceiver: Send + Sync { pub trait SendUntypedReceiver: Send + Sync {
fn send(&self, msg: Action) -> Result<(), SendError<Action>>; fn send(&self, msg: Action) -> Result<(), SendError<Action>>;
fn send_msg(&self, _mid: u64, _msg: Box<dyn Message>) -> Result<(), SendError<Box<dyn Message>>> { fn send_msg(
&self,
_mid: u64,
_msg: Box<dyn Message>,
) -> Result<(), SendError<Box<dyn Message>>> {
unimplemented!() unimplemented!()
} }
} }
@ -45,6 +54,10 @@ where
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>; fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>;
} }
pub trait ReciveUnypedReceiver: Sync {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<Box<dyn Message>, GenericError>>;
}
pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync { pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
fn add_response_listener( fn add_response_listener(
&self, &self,
@ -53,7 +66,9 @@ pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
} }
pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Sync { pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Sync {
fn start_polling_events(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>; fn start_polling_events(
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>;
fn add_response_listener( fn add_response_listener(
&self, &self,
listener: oneshot::Sender<Result<R, Error<(), E>>>, listener: oneshot::Sender<Result<R, Error<(), E>>>,
@ -61,7 +76,12 @@ pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Syn
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error>; fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error>;
} }
pub trait ReceiverTrait: Send + Sync { pub trait TypeTagAccept {
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool;
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool);
}
pub trait ReceiverTrait: TypeTagAccept + Send + Sync {
fn name(&self) -> &str; fn name(&self) -> &str;
fn typed(&self) -> Option<AnyReceiver<'_>>; fn typed(&self) -> Option<AnyReceiver<'_>>;
fn wrapper(&self) -> Option<AnyWrapperRef<'_>>; fn wrapper(&self) -> Option<AnyWrapperRef<'_>>;
@ -89,8 +109,9 @@ pub trait ReceiverTrait: Send + Sync {
fn try_reserve(&self) -> Option<Permit>; fn try_reserve(&self) -> Option<Permit>;
fn reserve_notify(&self) -> &Notify; fn reserve_notify(&self) -> &Notify;
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool; fn start_polling(
fn start_polling(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>; self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>;
} }
pub trait ReceiverPollerBuilder { pub trait ReceiverPollerBuilder {
@ -155,8 +176,9 @@ where
E: StdSyncSendError, E: StdSyncSendError,
S: ReciveTypedReceiver<R, E> + Send + Sync + 'static, S: ReciveTypedReceiver<R, E> + Send + Sync + 'static,
{ {
fn start_polling_events(
fn start_polling_events(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> { self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
Box::new(move |_| { Box::new(move |_| {
Box::pin(async move { Box::pin(async move {
loop { loop {
@ -231,6 +253,34 @@ where
} }
} }
impl<M, R, E, S> TypeTagAccept for ReceiverWrapper<M, R, E, S>
where
M: Message,
R: Message,
E: StdSyncSendError,
S: ReciveTypedReceiver<R, E> + Send + Sync + 'static,
{
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
let _ = cb(&M::type_tag_(), &R::type_tag_(), &E::type_tag_());
}
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
if let Some(resp) = resp {
if resp.as_ref() != R::type_tag_().as_ref() {
return false;
}
}
if let Some(err) = err {
if err.as_ref() != E::type_tag_().as_ref() {
return false;
}
}
msg.as_ref() == M::type_tag_().as_ref()
}
}
impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S> impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S>
where where
M: Message, M: Message,
@ -339,23 +389,9 @@ where
&self.context.response &self.context.response
} }
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { fn start_polling(
if let Some(resp) = resp { self: Arc<Self>,
if resp.as_ref() != R::type_tag_().as_ref() { ) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
return false;
}
}
if let Some(err) = err {
if err.as_ref() != E::type_tag_().as_ref() {
return false;
}
}
msg.as_ref() == M::type_tag_().as_ref()
}
fn start_polling(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
self.start_polling_events() self.start_polling_events()
} }
} }
@ -454,7 +490,7 @@ impl<'a> AnyWrapperRef<'a> {
#[inline] #[inline]
pub fn cast_ret_only<R: Message>(&'a self) -> Option<&'a dyn WrapperReturnTypeOnly<R>> { pub fn cast_ret_only<R: Message>(&'a self) -> Option<&'a dyn WrapperReturnTypeOnly<R>> {
if self.wrapper_r.0 != TypeId::of::<dyn WrapperReturnTypeOnly<R>>() { if self.wrapper_r.0 != TypeId::of::<dyn WrapperReturnTypeOnly<R>>() {
return None return None;
} }
Some(unsafe { Some(unsafe {
@ -497,16 +533,18 @@ impl AnyWrapperArc {
{ {
let wrapper_re = Box::new(rcvr as Arc<dyn WrapperReturnTypeAndError<R, E>>); let wrapper_re = Box::new(rcvr as Arc<dyn WrapperReturnTypeAndError<R, E>>);
Self { Self { wrapper_re }
wrapper_re,
}
} }
#[inline] #[inline]
pub fn cast_ret_and_error<R: Message, E: StdSyncSendError>( pub fn cast_ret_and_error<R: Message, E: StdSyncSendError>(
&self, &self,
) -> Option<Arc<dyn WrapperReturnTypeAndError<R, E>>> { ) -> Option<Arc<dyn WrapperReturnTypeAndError<R, E>>> {
Some(self.wrapper_re.downcast_ref::<Arc<dyn WrapperReturnTypeAndError<R, E>>>()?.clone()) Some(
self.wrapper_re
.downcast_ref::<Arc<dyn WrapperReturnTypeAndError<R, E>>>()?
.clone(),
)
} }
} }
@ -557,6 +595,7 @@ enum Waiter<R: Message, E: StdSyncSendError> {
Boxed(oneshot::Sender<Result<Box<dyn Message>, Error>>), Boxed(oneshot::Sender<Result<Box<dyn Message>, Error>>),
} }
#[derive(Clone)]
pub struct Receiver { pub struct Receiver {
inner: Arc<dyn ReceiverTrait>, inner: Arc<dyn ReceiverTrait>,
} }
@ -604,15 +643,20 @@ impl Receiver {
} }
#[inline] #[inline]
pub(crate) fn new_relay<S>(limit: u64, type_map: TypeMap, inner: S) -> Self pub(crate) fn new_relay<S>(limit: u64, inner: S) -> Self
where where
S: SendUntypedReceiver + 'static, S: Relay + Send + Sync + 'static,
{ {
Self { Self {
inner: Arc::new(RelayWrapper::new(inner, type_map, limit)), inner: Arc::new(RelayWrapper::new(inner, limit)),
} }
} }
#[inline]
pub fn name(&self) -> &str {
self.inner.name()
}
#[inline] #[inline]
pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.inner.accept(msg, resp, err) self.inner.accept(msg, resp, err)
@ -640,20 +684,18 @@ impl Receiver {
} }
#[inline] #[inline]
pub fn send<M: Message>( pub fn send<M: Message>(&self, mid: u64, msg: M, mut permit: Permit) -> Result<(), Error<M>> {
&self,
mid: u64,
msg: M,
mut permit: Permit,
) -> Result<(), Error<M>> {
let res = if let Some(any_receiver) = self.inner.typed() { let res = if let Some(any_receiver) = self.inner.typed() {
any_receiver.cast_send_typed::<M>().unwrap() any_receiver
.cast_send_typed::<M>()
.unwrap()
.send(mid, msg) .send(mid, msg)
.map_err(Into::into) .map_err(Into::into)
} else { } else {
self.inner.send_boxed(mid, msg.into_boxed()) self.inner
.map_err(|err| err.map_msg(|b|*b.as_any_boxed().downcast::<M>().unwrap())) .send_boxed(mid, msg.into_boxed())
.map(|_|()) .map_err(|err| err.map_msg(|b| *b.as_any_boxed().downcast::<M>().unwrap()))
.map(|_| ())
}; };
permit.fuse = true; permit.fuse = true;
@ -664,13 +706,16 @@ impl Receiver {
#[inline] #[inline]
pub fn force_send<M: Message + Clone>(&self, mid: u64, msg: M) -> Result<(), Error<M>> { pub fn force_send<M: Message + Clone>(&self, mid: u64, msg: M) -> Result<(), Error<M>> {
let res = if let Some(any_receiver) = self.inner.typed() { let res = if let Some(any_receiver) = self.inner.typed() {
any_receiver.cast_send_typed::<M>().unwrap() any_receiver
.cast_send_typed::<M>()
.unwrap()
.send(mid, msg) .send(mid, msg)
.map_err(Into::into) .map_err(Into::into)
} else { } else {
self.inner.send_boxed(mid, msg.into_boxed()) self.inner
.map_err(|err| err.map_msg(|b|*b.as_any_boxed().downcast::<M>().unwrap())) .send_boxed(mid, msg.into_boxed())
.map(|_|()) .map_err(|err| err.map_msg(|b| *b.as_any_boxed().downcast::<M>().unwrap()))
.map(|_| ())
}; };
res res
@ -689,8 +734,10 @@ impl Receiver {
} }
#[inline] #[inline]
pub fn start_polling(&self) -> Option<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>> { pub fn start_polling(
Some(self.inner.clone().start_polling()) &self,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
self.inner.clone().start_polling()
} }
#[inline] #[inline]
@ -703,7 +750,7 @@ impl Receiver {
Ok((mid, async move { Ok((mid, async move {
match rx.await { match rx.await {
Ok(x) => x, Ok(x) => x,
Err(err) => Err(Error::from(err)) Err(err) => Err(Error::from(err)),
} }
})) }))
} }
@ -714,27 +761,36 @@ impl Receiver {
) -> Result<(u64, impl Future<Output = Result<R, Error>>), Error> { ) -> Result<(u64, impl Future<Output = Result<R, Error>>), Error> {
if let Some(any_receiver) = self.inner.wrapper() { if let Some(any_receiver) = self.inner.wrapper() {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let mid = any_receiver.cast_ret_only::<R>().unwrap() let mid = any_receiver
.cast_ret_only::<R>()
.unwrap()
.add_response_listener(tx)?; .add_response_listener(tx)?;
Ok((mid, async move { Ok((
mid,
async move {
match rx.await { match rx.await {
Ok(x) => x, Ok(x) => x,
Err(err) => Err(Error::from(err)) Err(err) => Err(Error::from(err)),
} }
}.left_future())) }
.left_future(),
))
} else { } else {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let mid = self.inner let mid = self.inner.add_response_listener(tx)?;
.add_response_listener(tx)?;
Ok((mid, async move { Ok((
mid,
async move {
match rx.await { match rx.await {
Ok(Ok(x)) => Ok(*x.as_any_boxed().downcast::<R>().unwrap()), Ok(Ok(x)) => Ok(*x.as_any_boxed().downcast::<R>().unwrap()),
Ok(Err(x)) => Err(x), Ok(Err(x)) => Err(x),
Err(err) => Err(Error::from(err)) Err(err) => Err(Error::from(err)),
} }
}.right_future())) }
.right_future(),
))
} }
} }
@ -744,13 +800,15 @@ impl Receiver {
) -> Result<(u64, impl Future<Output = Result<R, Error<(), E>>>), Error> { ) -> Result<(u64, impl Future<Output = Result<R, Error<(), E>>>), Error> {
if let Some(any_wrapper) = self.inner.wrapper() { if let Some(any_wrapper) = self.inner.wrapper() {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let mid = any_wrapper.cast_ret_and_error::<R, E>().unwrap() let mid = any_wrapper
.cast_ret_and_error::<R, E>()
.unwrap()
.add_response_listener(tx)?; .add_response_listener(tx)?;
Ok((mid, async move { Ok((mid, async move {
match rx.await { match rx.await {
Ok(x) => x, Ok(x) => x,
Err(err) => Err(Error::from(err)) Err(err) => Err(Error::from(err)),
} }
})) }))
} else { } else {
@ -790,4 +848,9 @@ impl Receiver {
warn!("flush failed!"); warn!("flush failed!");
} }
} }
#[inline]
pub fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
self.inner.iter_types(cb)
}
} }

View File

@ -1,9 +1,23 @@
use std::{collections::HashMap, pin::Pin, sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}}; use crate::{
use futures::Future; error::Error,
use tokio::sync::{Notify, oneshot}; receiver::{
use crate::{Bus, Message, Permit, TypeTag, TypeTagged, error::Error, receiver::{Action, AnyReceiver, AnyWrapperArc, AnyWrapperRef, PermitDrop, ReceiverTrait, SendUntypedReceiver, Stats}}; Action, AnyReceiver, AnyWrapperArc, AnyWrapperRef, PermitDrop, ReceiverTrait,
SendUntypedReceiver, Stats, TypeTagAccept,
},
Bus, Event, Message, Permit, ReciveUnypedReceiver, TypeTag,
};
use futures::{future::poll_fn, Future};
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::{oneshot, Notify};
pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUnypedReceiver + 'static {}
impl<T: TypeTagAccept + SendUntypedReceiver + ReciveUnypedReceiver + 'static> Relay for T {}
struct SlabCfg; struct SlabCfg;
impl sharded_slab::Config for SlabCfg { impl sharded_slab::Config for SlabCfg {
@ -12,140 +26,6 @@ impl sharded_slab::Config for SlabCfg {
type Slab<T> = sharded_slab::Slab<T, SlabCfg>; type Slab<T> = sharded_slab::Slab<T, SlabCfg>;
#[macro_export]
macro_rules! type_map {
(inbound : $($tt:tt)*) => {{
let mut type_map = TypeMap::new();
type_map!(@inbound type_map, $($tt)*);
type_map
}};
(outbound : $($tt:tt)*) => {
let mut type_map = TypeMap::new();
type_map!(@outbound type_map, $($tt)*);
type_map
};
(@inbound $tm: ident, $msg:ty => $resp: ty, $($tt:tt)*) => {
$tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
type_map!(@inbound $tm, $($tt)*)
};
(@inbound $tm: ident, $msg:ty => ($resp:ty) throws $err: ty, $($tt:tt)*) => {
$tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), Some(<$err as TypeTagged>::type_tag_()));
type_map!(@inbound $tm, $($tt)*)
};
(@inbound $tm: ident, $msg:ty => $resp: ty) => {
$tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
};
(@inbound $tm: ident,) => {};
(@inbound $tm: ident, outbound : $($tt:tt)*) => {
type_map!(@outbound $tm, $($tt)*)
};
(@outbound $tm: ident, $msg:ty => ($resp:ty) throws $err: ty, $($tt:tt)*) => {
$tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), Some(<$err as TypeTagged>::type_tag_()));
type_map!(@outbound $tm, $($tt)*)
};
(@outbound $tm: ident, $msg:ty => $resp: ty, $($tt:tt)*) => {
$tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
type_map!(@outbound $tm, $($tt)*)
};
(@outbound $tm: ident, $msg:ty => $resp: ty) => {
$tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
};
(@outbound $tm: ident,) => {};
(@outbound $tm: ident, inbound : $($tt:tt)*) => {
type_map!(@inbound $tm, $($tt)*)
};
}
#[derive(Debug, Clone)]
pub struct TypeMap {
inbound: HashMap<TypeTag, Vec<(TypeTag, Option<TypeTag>)>>,
outbound: HashMap<TypeTag, Vec<(TypeTag, Option<TypeTag>)>>,
}
impl TypeMap {
pub fn new() -> Self {
Self {
inbound: Default::default(),
outbound: Default::default(),
}
}
pub fn add_inbound(&mut self, msg: TypeTag, resp: TypeTag, err: Option<TypeTag>) -> &mut Self {
let vec = self.inbound.entry(msg)
.or_insert_with(Vec::new);
vec.push((resp, err));
self
}
pub fn add_outbound(&mut self, msg: TypeTag, resp: TypeTag, err: Option<TypeTag>) -> &mut Self {
let vec = self.outbound.entry(msg)
.or_insert_with(Vec::new);
vec.push((resp, err));
self
}
pub fn accept_inbound(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
if let Some(vec) = self.inbound.get(msg) {
if let Some(rr) = resp {
vec.iter().find(|(r, e)| {
let ee = if let Some(e) = e {
if let Some(te) = err {
te.as_ref() == e.as_ref()
} else {
true
}
} else {
err.is_none()
};
r.as_ref() == rr.as_ref() && ee
}).is_some()
} else {
true
}
} else {
false
}
}
pub fn accept_outbound(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
if let Some(vec) = self.outbound.get(msg) {
if let Some(rr) = resp {
vec.iter().find(|(r, e)| {
let ee = if let Some(e) = e {
if let Some(te) = err {
te.as_ref() == e.as_ref()
} else {
true
}
} else {
err.is_none()
};
r.as_ref() == rr.as_ref() && ee
}).is_some()
} else {
true
}
} else {
false
}
}
}
pub(crate) struct RelayContext { pub(crate) struct RelayContext {
limit: u64, limit: u64,
processing: AtomicU64, processing: AtomicU64,
@ -154,7 +34,6 @@ pub(crate) struct RelayContext {
synchronized: Notify, synchronized: Notify,
closed: Notify, closed: Notify,
response: Notify, response: Notify,
type_map: TypeMap,
} }
impl PermitDrop for RelayContext { impl PermitDrop for RelayContext {
@ -163,7 +42,6 @@ impl PermitDrop for RelayContext {
} }
} }
pub(crate) struct RelayWrapper<S> pub(crate) struct RelayWrapper<S>
where where
S: 'static, S: 'static,
@ -173,7 +51,7 @@ where
waiters: Slab<oneshot::Sender<Result<Box<dyn Message>, Error>>>, waiters: Slab<oneshot::Sender<Result<Box<dyn Message>, Error>>>,
} }
impl<S> RelayWrapper<S> { impl<S> RelayWrapper<S> {
pub fn new(inner: S, type_map: TypeMap, limit: u64) -> Self { pub fn new(inner: S, limit: u64) -> Self {
Self { Self {
inner, inner,
context: Arc::new(RelayContext { context: Arc::new(RelayContext {
@ -184,23 +62,42 @@ impl<S> RelayWrapper<S> {
synchronized: Notify::new(), synchronized: Notify::new(),
closed: Notify::new(), closed: Notify::new(),
response: Notify::new(), response: Notify::new(),
type_map,
}), }),
waiters: sharded_slab::Slab::new_with_config::<SlabCfg>(), waiters: sharded_slab::Slab::new_with_config::<SlabCfg>(),
} }
} }
} }
impl<S> TypeTagAccept for RelayWrapper<S>
where
S: Relay + Send + Sync + 'static,
{
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
self.inner.iter_types(cb)
}
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.inner.accept(msg, resp, err)
}
}
impl<S> ReceiverTrait for RelayWrapper<S> impl<S> ReceiverTrait for RelayWrapper<S>
where where
S: SendUntypedReceiver + 'static, S: Relay + Send + Sync + 'static,
{ {
fn name(&self) -> &str { fn name(&self) -> &str {
std::any::type_name::<Self>() std::any::type_name::<Self>()
} }
fn typed(&self) -> Option<AnyReceiver<'_>> { None } fn typed(&self) -> Option<AnyReceiver<'_>> {
fn wrapper(&self) -> Option<AnyWrapperRef<'_>> { None } None
fn wrapper_arc(self: Arc<Self>) -> Option<AnyWrapperArc> { None } }
fn wrapper(&self) -> Option<AnyWrapperRef<'_>> {
None
}
fn wrapper_arc(self: Arc<Self>) -> Option<AnyWrapperArc> {
None
}
fn send_boxed( fn send_boxed(
&self, &self,
mid: u64, mid: u64,
@ -280,65 +177,39 @@ where
&self.context.response &self.context.response
} }
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { fn start_polling(
self.context.type_map.accept_inbound(msg, resp, err) self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
Box::new(move |_| {
Box::pin(async move {
loop {
let this = self.clone();
let event = poll_fn(move |ctx| this.inner.poll_events(ctx)).await;
match event {
Event::Exited => {
self.context.closed.notify_waiters();
break;
}
Event::Flushed => self.context.flushed.notify_waiters(),
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
self.context.processing.fetch_sub(1, Ordering::SeqCst);
self.context.response.notify_one();
if let Some(chan) = self.waiters.take(mid as _) {
if let Err(err) = chan.send(resp) {
error!("Response error for mid({}): {:?}", mid, err);
}
} else {
warn!("No waiters for mid({})", mid);
}
} }
fn start_polling(self: Arc<Self>) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> { _ => unimplemented!(),
Box::new(move |_| Box::pin(async move { }
}
})) })
} })
}
#[cfg(test)]
mod tests {
use super::*;
use crate as messagebus;
use crate::derive::{ Message, Error };
#[derive(Debug, Message)]
struct Test1;
#[derive(Debug, Message)]
struct Test2;
#[derive(Debug, Message)]
struct Test3;
#[derive(Debug, Message)]
struct Test4;
#[derive(Debug, Error)]
struct Error1;
#[derive(Debug, Error)]
struct Error2;
#[test]
fn test_type_map() {
let tm = type_map! {
inbound:
Test1 => (Test2) throws Error1,
Test2 => (Test3) throws Error1,
outbound:
Test3 => (Test4) throws Error2,
};
assert!(tm.accept_inbound(&Test1::type_tag_(), None, None));
assert!(tm.accept_inbound(&Test2::type_tag_(), None, None));
assert!(!tm.accept_inbound(&Test3::type_tag_(), None, None));
assert!(tm.accept_outbound(&Test3::type_tag_(), None, None));
assert!(tm.accept_inbound(&Test1::type_tag_(), Some(&Test2::type_tag_()), None));
assert!(!tm.accept_inbound(&Test1::type_tag_(), Some(&Test3::type_tag_()), None));
assert!(!tm.accept_inbound(&Test2::type_tag_(), Some(&Test2::type_tag_()), None));
assert!(tm.accept_inbound(&Test2::type_tag_(), Some(&Test3::type_tag_()), None));
assert!(tm.accept_inbound(&Test1::type_tag_(), Some(&Test2::type_tag_()), Some(&Error1::type_tag_())));
assert!(!tm.accept_inbound(&Test1::type_tag_(), Some(&Test2::type_tag_()), Some(&Error2::type_tag_())));
assert!(tm.accept_outbound(&Test3::type_tag_(), Some(&Test4::type_tag_()), Some(&Error2::type_tag_())));
} }
} }

332
tests/test_relay.rs Normal file
View File

@ -0,0 +1,332 @@
use std::task::{Context, Poll};
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error::{self, GenericError},
receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUnypedReceiver,
SendUntypedReceiver, TypeTagAccept, TypeTagged,
};
use parking_lot::Mutex;
use thiserror::Error;
use tokio::sync::mpsc;
// macro_rules! type_map {
// (inbound : $($tt:tt)*) => {{
// let mut type_map = TypeMap::new();
// type_map!(@inbound type_map, $($tt)*);
// type_map
// }};
// (outbound : $($tt:tt)*) => {
// let mut type_map = TypeMap::new();
// type_map!(@outbound type_map, $($tt)*);
// type_map
// };
// (@inbound $tm: ident, $msg:ty => $resp: ty, $($tt:tt)*) => {
// $tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
// type_map!(@inbound $tm, $($tt)*)
// };
// (@inbound $tm: ident, $msg:ty => ($resp:ty) throws $err: ty, $($tt:tt)*) => {
// $tm.add_inbound(<$msg as messagebus::TypeTagged>::type_tag_(), <$resp as messagebus:TypeTagged>::type_tag_(), Some(<$err as messagebus::TypeTagged>::type_tag_()));
// type_map!(@inbound $tm, $($tt)*)
// };
// (@inbound $tm: ident, $msg:ty => $resp: ty) => {
// $tm.add_inbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
// };
// (@inbound $tm: ident, outbound : $($tt:tt)*) => {
// type_map!(@outbound $tm, $($tt)*)
// };
// (@inbound $tm: ident,) => {};
// (@outbound $tm: ident, $msg:ty => ($resp:ty) throws $err: ty, $($tt:tt)*) => {
// $tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), Some(<$err as TypeTagged>::type_tag_()));
// type_map!(@outbound $tm, $($tt)*)
// };
// (@outbound $tm: ident, $msg:ty => $resp: ty, $($tt:tt)*) => {
// $tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
// type_map!(@outbound $tm, $($tt)*)
// };
// (@outbound $tm: ident, $msg:ty => $resp: ty) => {
// $tm.add_outbound(<$msg as TypeTagged>::type_tag_(), <$resp as TypeTagged>::type_tag_(), None);
// };
// (@outbound $tm: ident, inbound : $($tt:tt)*) => {
// type_map!(@inbound $tm, $($tt)*)
// };
// (@outbound $tm: ident,) => {};
// }
// #[derive(Debug, Clone)]
// pub struct TypeMap {
// inbound: HashMap<TypeTag, Vec<(TypeTag, Option<TypeTag>)>>,
// outbound: HashMap<TypeTag, Vec<(TypeTag, Option<TypeTag>)>>,
// }
// impl TypeMap {
// pub fn new() -> Self {
// Self {
// inbound: Default::default(),
// outbound: Default::default(),
// }
// }
// pub fn add_inbound(&mut self, msg: TypeTag, resp: TypeTag, err: Option<TypeTag>) -> &mut Self {
// let vec = self.inbound.entry(msg)
// .or_insert_with(Vec::new);
// vec.push((resp, err));
// self
// }
// pub fn add_outbound(&mut self, msg: TypeTag, resp: TypeTag, err: Option<TypeTag>) -> &mut Self {
// let vec = self.outbound.entry(msg)
// .or_insert_with(Vec::new);
// vec.push((resp, err));
// self
// }
// pub fn accept_inbound(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
// if let Some(vec) = self.inbound.get(msg) {
// if let Some(rr) = resp {
// vec.iter().find(|(r, e)| {
// let ee = if let Some(e) = e {
// if let Some(te) = err {
// te.as_ref() == e.as_ref()
// } else {
// true
// }
// } else {
// err.is_none()
// };
// r.as_ref() == rr.as_ref() && ee
// }).is_some()
// } else {
// true
// }
// } else {
// false
// }
// }
// pub fn accept_outbound(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
// if let Some(vec) = self.outbound.get(msg) {
// if let Some(rr) = resp {
// vec.iter().find(|(r, e)| {
// let ee = if let Some(e) = e {
// if let Some(te) = err {
// te.as_ref() == e.as_ref()
// } else {
// true
// }
// } else {
// err.is_none()
// };
// r.as_ref() == rr.as_ref() && ee
// }).is_some()
// } else {
// true
// }
// } else {
// false
// }
// }
// #[inline]
// pub fn inbound_iter(&self) -> impl Iterator<Item = (&TypeTag, &TypeTag, Option<&TypeTag>)> {
// self.inbound.iter().map(|(k, v)| v.into_iter().map(move |(r, e)|(k, r, e.as_ref()))).flatten()
// }
// }
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
#[derive(Debug, Clone, Message)]
pub struct Msg<F: MessageBounds + Clone>(pub F);
struct TmpReceiver;
#[async_trait]
impl AsyncHandler<Msg<i32>> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: Msg<i32>, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver::handle {:?}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver::sync");
Ok(())
}
}
pub struct TestRelay {
stx: mpsc::UnboundedSender<Event<Box<dyn Message>, GenericError>>,
srx: Mutex<mpsc::UnboundedReceiver<Event<Box<dyn Message>, GenericError>>>,
}
impl TypeTagAccept for TestRelay {
fn accept(
&self,
msg: &messagebus::TypeTag,
resp: Option<&messagebus::TypeTag>,
err: Option<&messagebus::TypeTag>,
) -> bool {
if msg.as_ref() == Msg::<i16>::type_tag_().as_ref() {
if let Some(resp) = resp {
if resp.as_ref() != Msg::<u8>::type_tag_().as_ref() {
return false;
}
}
}
if msg.as_ref() == Msg::<i32>::type_tag_().as_ref() {
if let Some(resp) = resp {
if resp.as_ref() != Msg::<i64>::type_tag_().as_ref()
&& resp.as_ref() != Msg::<()>::type_tag_().as_ref()
{
return false;
}
}
}
true
}
fn iter_types(
&self,
cb: &mut dyn FnMut(
&messagebus::TypeTag,
&messagebus::TypeTag,
&messagebus::TypeTag,
) -> bool,
) {
if !cb(
&Msg::<i16>::type_tag_(),
&Msg::<u8>::type_tag_(),
&Error::type_tag_(),
) {
return;
}
if !cb(
&Msg::<i32>::type_tag_(),
&Msg::<()>::type_tag_(),
&Error::type_tag_(),
) {
return;
}
if !cb(
&Msg::<i32>::type_tag_(),
&Msg::<i64>::type_tag_(),
&Error::type_tag_(),
) {
return;
}
}
}
impl SendUntypedReceiver for TestRelay {
fn send(&self, msg: Action) -> Result<(), error::SendError<Action>> {
match msg {
Action::Close => {
self.stx.send(Event::Exited).unwrap();
}
Action::Flush => {
self.stx.send(Event::Flushed).unwrap();
}
Action::Sync => {
self.stx.send(Event::Synchronized(Ok(()))).unwrap();
}
_ => unimplemented!(),
}
println!("TestRelay::send {:?}", msg);
Ok(())
}
fn send_msg(
&self,
mid: u64,
msg: Box<dyn Message>,
) -> Result<(), error::SendError<Box<dyn Message>>> {
println!("TestRelay::send_msg [{}] {:?}", mid, msg);
if msg.type_tag().as_ref() == Msg::<i16>::type_tag_().as_ref() {
self.stx
.send(Event::Response(mid, Ok(Box::new(Msg(9u8)))))
.unwrap();
} else {
self.stx
.send(Event::Response(mid, Ok(Box::new(()))))
.unwrap();
}
Ok(())
}
}
impl ReciveUnypedReceiver for TestRelay {
fn poll_events(
&self,
ctx: &mut Context<'_>,
) -> Poll<Event<Box<dyn Message>, error::GenericError>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
}
}
#[tokio::test]
async fn test_relay() {
let (stx, srx) = mpsc::unbounded_channel();
let relay = TestRelay {
stx,
srx: Mutex::new(srx),
};
let (b, poller) = Bus::build()
.register_relay(relay, 1)
.register(TmpReceiver)
.subscribe_async::<Msg<i32>>(
1,
receivers::BufferUnorderedConfig {
buffer_size: 1,
max_parallel: 1,
},
)
.done()
.build();
b.send(Msg(32i32)).await.unwrap();
let res: Msg<u8> = b.request(Msg(12i16), Default::default()).await.unwrap();
assert_eq!(res.0, 9u8);
b.flush().await;
b.close().await;
poller.await;
}

View File

@ -1,4 +1,7 @@
use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{ use messagebus::{
@ -42,13 +45,16 @@ impl AsyncHandler<SharedMsg<f32>> for TmpReceiver {
type Error = Error; type Error = Error;
type Response = (); type Response = ();
async fn handle(&self, _msg: SharedMsg<f32>, _bus: &Bus) -> Result<Self::Response, Self::Error> { async fn handle(
&self,
_msg: SharedMsg<f32>,
_bus: &Bus,
) -> Result<Self::Response, Self::Error> {
self.ctx.sync1.store(true, Ordering::Relaxed); self.ctx.sync1.store(true, Ordering::Relaxed);
Ok(()) Ok(())
} }
} }
#[async_trait] #[async_trait]
impl AsyncHandler<Msg> for TmpReceiver { impl AsyncHandler<Msg> for TmpReceiver {
type Error = Error; type Error = Error;
@ -60,7 +66,6 @@ impl AsyncHandler<Msg> for TmpReceiver {
} }
} }
#[tokio::test] #[tokio::test]
async fn test_shared() { async fn test_shared() {
let ctx = Arc::new(TmpReceiverContext { let ctx = Arc::new(TmpReceiverContext {