Relays: Message and SharedMessage; messagebus_derive

This commit is contained in:
Andrey Tkachenko 2021-07-16 20:21:27 +04:00
parent 76cc57e7ae
commit 13d1d48a70
27 changed files with 1673 additions and 457 deletions

View File

@ -11,19 +11,22 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
[dependencies]
messagebus_derive = { path = "./derive" }
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
parking_lot = "0.11.1"
async-trait = "0.1.42"
futures = "0.3.8"
smallvec = "1.6.1"
log = "0.4.14"
sharded-slab = "0.1.1"
thiserror = "1.0.25"
erased-serde = "0.3.16"
serde = "1.0.126"
serde_derive = "1.0.126"
parking_lot = "0.11"
async-trait = "0.1"
futures = "0.3"
smallvec = "1.6"
log = "0.4"
sharded-slab = "0.1"
thiserror = "1"
erased-serde = "0.3"
serde = "1"
serde_derive = "1"
[dev-dependencies]
anyhow = "1.0.41"
env_logger = "0.8.4"
serde_json = "1.0.64"
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] }

12
derive/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "messagebus_derive"
version = "0.1.0"
edition = "2018"
[lib]
proc-macro = true
[dependencies]
proc-macro2 = "1"
quote = "1"
syn = "1"

204
derive/src/lib.rs Normal file
View File

@ -0,0 +1,204 @@
#![recursion_limit="128"]
extern crate proc_macro;
use proc_macro::TokenStream;
use proc_macro2::TokenTree;
use syn::{LitStr, Result, parenthesized};
use syn::{DeriveInput, parse_macro_input, punctuated::Punctuated, token::Comma};
use syn::parse::{Parse, ParseStream, Parser};
use quote::quote;
fn shared_part(_: &syn::Ident, has_shared: bool) -> proc_macro2::TokenStream {
if has_shared {
quote! {
fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {Some(self)}
fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{Some(self)}
fn as_shared_boxed(self: std::boxed::Box<Self>) -> Option<std::boxed::Box<dyn messagebus::SharedMessage>>{Some(self)}
fn as_shared_arc(self: std::sync::Arc<Self>) -> Option<std::sync::Arc<dyn messagebus::SharedMessage>>{Some(self)}
}
} else {
quote! {
fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {None}
fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{None}
fn as_shared_boxed(self: std::boxed::Box<Self>) -> Option<std::boxed::Box<dyn messagebus::SharedMessage>>{None}
fn as_shared_arc(self: std::sync::Arc<Self>) -> Option<std::sync::Arc<dyn messagebus::SharedMessage>>{None}
}
}
}
fn clone_part(name: &syn::Ident, has_clone: bool) -> proc_macro2::TokenStream {
if has_clone {
quote! {
fn try_clone_into(&self, into: &mut dyn core::any::Any) -> bool {
let into = if let Some(inner) = into.downcast_mut::<Option<#name>>() {
inner
} else {
return false;
};
into.replace(self.clone());
true
}
fn try_clone_boxed(&self) -> std::option::Option<std::boxed::Box<dyn messagebus::Message>>{
Some(Box::new(self.clone()))
}
}
} else {
quote! {
fn try_clone_into(&self, into: &mut dyn core::any::Any) -> bool {false}
fn try_clone_boxed(&self) -> std::option::Option<std::boxed::Box<dyn messagebus::Message>>{ None }
}
}
}
fn type_tag_part(name: &syn::Ident, type_tag: Option<LitStr>) -> proc_macro2::TokenStream {
if let Some(type_tag) = type_tag {
quote! {
impl messagebus::TypeTagged for #name {
fn type_tag_() -> messagebus::TypeTag { #type_tag.into() }
fn type_tag(&self) -> messagebus::TypeTag { #type_tag.into() }
fn type_name(&self) -> std::borrow::Cow<str> { #type_tag.into() }
}
}
} else {
quote! {
impl messagebus::TypeTagged for #name {
fn type_tag_() -> messagebus::TypeTag { std::any::type_name::<Self>().into() }
fn type_tag(&self) -> messagebus::TypeTag { std::any::type_name::<Self>().into() }
fn type_name(&self) -> std::borrow::Cow<str> { std::any::type_name::<Self>().into() }
}
}
}
}
struct TypeTag {
pub inner: syn::LitStr,
}
impl Parse for TypeTag {
fn parse(input: ParseStream) -> Result<Self> {
let mut inner = None;
let content;
parenthesized!(content in input);
let punctuated = Punctuated::<syn::LitStr, Comma>::parse_terminated(&content)?;
for pair in punctuated.pairs() {
inner = Some(pair.into_value());
break;
}
Ok(TypeTag { inner: inner.unwrap().to_owned() })
}
}
#[derive(Default)]
struct Tags {
has_clone: bool,
has_shared: bool,
}
impl Tags {
pub fn add(&mut self, other: Tags) {
self.has_clone = self.has_clone || other.has_clone;
self.has_shared = self.has_shared || other.has_shared;
}
}
impl Parse for Tags {
fn parse(input: ParseStream) -> Result<Self> {
let mut has_shared = false;
let mut has_clone = false;
let content;
parenthesized!(content in input);
let punctuated = Punctuated::<syn::Ident, Comma>::parse_terminated(&content)?;
for pair in punctuated.pairs() {
match pair.into_value().to_string().as_str() {
"shared" => has_shared = true,
"clone" => has_clone = true,
_ => ()
}
}
Ok(Tags {
has_clone,
has_shared,
})
}
}
#[proc_macro_derive(Message, attributes(type_tag, message))]
pub fn derive_message(input: TokenStream) -> TokenStream {
let mut tags = Tags::default();
let mut type_tag = None;
let ast: DeriveInput = syn::parse(input).unwrap();
let name = &ast.ident;
let attrs = ast.attrs;
for attr in attrs {
if let Some(i) = attr.path.get_ident() {
match i.to_string().as_str() {
"message" => {
let tt: Tags = syn::parse2(attr.tokens).unwrap();
tags.add(tt);
}
"type_tag" => {
let tt: TypeTag = syn::parse2(attr.tokens).unwrap();
type_tag = Some(tt.inner);
}
_ => ()
}
}
}
let type_tag_part = type_tag_part(name, type_tag);
let shared_part = shared_part(name, false);
let clone_part = clone_part(name, false);
let tokens = quote! {
#type_tag_part
impl messagebus::Message for #name {
fn as_any_ref(&self) -> &dyn core::any::Any {self}
fn as_any_mut(&mut self) -> &mut dyn core::any::Any {self}
fn as_any_boxed(self: std::boxed::Box<Self>) -> std::boxed::Box<dyn core::any::Any> {self}
fn as_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn core::any::Any> {self}
#shared_part
#clone_part
}
};
tokens.into()
}
#[proc_macro_derive(Error, attributes(type_tag))]
pub fn derive_error(input: TokenStream) -> TokenStream {
let mut type_tag = None;
let ast: DeriveInput = syn::parse(input).unwrap();
let name = &ast.ident;
for attr in ast.attrs {
if let Some(i) = attr.path.get_ident() {
match i.to_string().as_str() {
"type_tag" => {
let tt: TypeTag = syn::parse2(attr.tokens).unwrap();
type_tag = Some(tt.inner);
}
_ => ()
}
}
}
let type_tag_part = type_tag_part(name, type_tag);
let tokens = quote! {
#type_tag_part
};
tokens.into()
}

View File

@ -1,8 +1,8 @@
use async_trait::async_trait;
use messagebus::{error, AsyncHandler, Bus, Handler, Message};
use messagebus::{derive::Message, error, AsyncHandler, Bus, Handler, Message, TypeTagged};
use thiserror::Error;
#[derive(Debug, Error)]
#[derive(Debug, Error, messagebus::derive::Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
@ -17,15 +17,35 @@ impl<M: Message> From<error::Error<M>> for Error {
struct TmpReceiver;
struct TmpReceiver2;
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgF32(f32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU16(u16);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU32(u32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI32(i32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI16(i16);
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver {
impl AsyncHandler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: f32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(1u16).await?;
async fn handle(&self, msg: MsgF32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgU16(1)).await?;
println!("TmpReceiver ---> f32 {}", msg);
println!("TmpReceiver ---> {:?} {}", msg, msg.type_tag());
Ok(())
}
@ -38,13 +58,13 @@ impl AsyncHandler<f32> for TmpReceiver {
}
#[async_trait]
impl AsyncHandler<u16> for TmpReceiver {
impl AsyncHandler<MsgU16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: u16, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(2u32).await?;
println!("TmpReceiver ---> u16 {}", msg);
async fn handle(&self, msg: MsgU16, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgU32(2)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
@ -57,13 +77,13 @@ impl AsyncHandler<u16> for TmpReceiver {
}
#[async_trait]
impl AsyncHandler<u32> for TmpReceiver {
impl AsyncHandler<MsgU32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: u32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(3i32).await?;
println!("TmpReceiver ---> u32 {}", msg);
async fn handle(&self, msg: MsgU32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgI32(3)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
@ -75,13 +95,13 @@ impl AsyncHandler<u32> for TmpReceiver {
}
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
impl AsyncHandler<MsgI32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(4i16).await?;
println!("TmpReceiver ---> i32 {}", msg);
async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgI16(4)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
@ -94,12 +114,12 @@ impl AsyncHandler<i32> for TmpReceiver {
}
#[async_trait]
impl AsyncHandler<i16> for TmpReceiver {
impl AsyncHandler<MsgI16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver ---> i16 {}", msg);
async fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
@ -111,14 +131,14 @@ impl AsyncHandler<i16> for TmpReceiver {
}
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver2 {
impl AsyncHandler<MsgI32> for TmpReceiver2 {
type Error = Error;
type Response = ();
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("!!!! TmpReceiver2: ---> 2 i32 {}", msg);
async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver2: ---> {:?}", msg);
bus.send(5i16).await?;
bus.send(MsgI16(5)).await?;
Ok(())
}
@ -129,12 +149,12 @@ impl AsyncHandler<i32> for TmpReceiver2 {
}
}
impl Handler<i16> for TmpReceiver2 {
impl Handler<MsgI16> for TmpReceiver2 {
type Error = Error;
type Response = ();
fn handle(&self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver2: ---> 2 i16 {}", msg);
fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver2: ---> {:?}", msg);
Ok(())
}
@ -152,19 +172,19 @@ async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_async::<f32>(8, Default::default())
.subscribe_async::<u16>(8, Default::default())
.subscribe_async::<u32>(8, Default::default())
.subscribe_async::<i32>(8, Default::default())
.subscribe_async::<i16>(8, Default::default())
.subscribe_async::<MsgF32>(8, Default::default())
.subscribe_async::<MsgU16>(8, Default::default())
.subscribe_async::<MsgU32>(8, Default::default())
.subscribe_async::<MsgI32>(8, Default::default())
.subscribe_async::<MsgI16>(8, Default::default())
.done()
.register(TmpReceiver2)
.subscribe_async::<i32>(8, Default::default())
.subscribe_sync::<i16>(8, Default::default())
.subscribe_async::<MsgI32>(8, Default::default())
.subscribe_sync::<MsgI16>(8, Default::default())
.done()
.build();
b.send(0f32).await.unwrap();
b.send(MsgF32(0.)).await.unwrap();
println!("flush");
b.flush().await;

View File

@ -1,8 +1,11 @@
use async_trait::async_trait;
use messagebus::{error, receivers, AsyncHandler, Bus, Message};
use messagebus::{
derive::{Error as MbError, Message},
error, receivers, AsyncHandler, Bus, Message,
};
use thiserror::Error;
#[derive(Debug, Error)]
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
@ -14,15 +17,18 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Debug, Clone, Message)]
struct MsgF32(pub f32);
struct TmpReceiver;
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver {
impl AsyncHandler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {}", msg);
async fn handle(&self, msg: MsgF32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {:?}", msg);
std::thread::sleep(std::time::Duration::from_secs(1));
@ -35,42 +41,48 @@ impl AsyncHandler<f32> for TmpReceiver {
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_async::<f32>(1, receivers::BufferUnorderedConfig { buffer_size: 1, max_parallel: 1 })
.subscribe_async::<MsgF32>(
1,
receivers::BufferUnorderedConfig {
buffer_size: 1,
max_parallel: 1,
},
)
.done()
.build();
println!("sending 1");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 2");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 3");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 4");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 5");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 6");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 7");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 8");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 9");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 10");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("sending 11");
b.send(32f32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
println!("flush");
b.flush().await;

View File

@ -1,10 +1,13 @@
use std::sync::Arc;
use async_trait::async_trait;
use messagebus::{error, AsyncBatchHandler, BatchHandler, Bus, Message};
use messagebus::{
derive::{Error as MbError, Message},
error, AsyncBatchHandler, BatchHandler, Bus, Message,
};
use thiserror::Error;
#[derive(Debug, Error, Clone)]
#[derive(Debug, Error, Clone, MbError)]
enum Error {
#[error("Error({0})")]
Error(Arc<anyhow::Error>),
@ -15,29 +18,42 @@ impl<M: Message> From<error::Error<M>> for Error {
Self::Error(Arc::new(err.into()))
}
}
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI32(i32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI16(i16);
struct TmpReceiver;
#[async_trait]
impl AsyncBatchHandler<i32> for TmpReceiver {
impl AsyncBatchHandler<MsgI32> for TmpReceiver {
type Error = Error;
type Response = ();
type InBatch = Vec<i32>;
type InBatch = Vec<MsgI32>;
type OutBatch = Vec<()>;
async fn handle(&self, msg: Vec<i32>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
async fn handle(
&self,
msg: Vec<MsgI32>,
_bus: &Bus,
) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i32; {}] {:?}", msg.len(), msg);
Ok(vec![])
}
}
impl BatchHandler<i16> for TmpReceiver {
impl BatchHandler<MsgI16> for TmpReceiver {
type Error = Error;
type Response = ();
type InBatch = Vec<i16>;
type InBatch = Vec<MsgI16>;
type OutBatch = Vec<()>;
fn handle(&self, msg: Vec<i16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
fn handle(&self, msg: Vec<MsgI16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i16; {}] {:?}", msg.len(), msg);
Ok(vec![])
}
@ -47,22 +63,22 @@ impl BatchHandler<i16> for TmpReceiver {
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_batch_async::<i32>(16, Default::default())
.subscribe_batch_sync::<i16>(16, Default::default())
.subscribe_batch_async::<MsgI32>(16, Default::default())
.subscribe_batch_sync::<MsgI16>(16, Default::default())
.done()
.build();
for i in 1..100i32 {
b.send(i).await.unwrap();
b.send(MsgI32(i)).await.unwrap();
}
b.send(1i16).await.unwrap();
b.send(2i16).await.unwrap();
b.send(3i16).await.unwrap();
b.send(4i16).await.unwrap();
b.send(5i16).await.unwrap();
b.send(6i16).await.unwrap();
b.send(7i16).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgI16(2i16)).await.unwrap();
b.send(MsgI16(3i16)).await.unwrap();
b.send(MsgI16(4i16)).await.unwrap();
b.send(MsgI16(5i16)).await.unwrap();
b.send(MsgI16(6i16)).await.unwrap();
b.send(MsgI16(7i16)).await.unwrap();
println!("flush");
b.flush().await;

211
examples/demo_boxed.rs Normal file
View File

@ -0,0 +1,211 @@
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error, AsyncHandler, Bus, Handler, Message,
};
use thiserror::Error;
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgF32(f32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU16(u16);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU32(u32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI32(i32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI16(i16);
struct TmpReceiver;
struct TmpReceiver2;
#[async_trait]
impl AsyncHandler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: MsgF32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgU16(1u16)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver f32: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgU16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: MsgU16, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgU32(2u32)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver u16: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgU32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: MsgU32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgI32(3i32)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver u32: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgI32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgI16(4i16)).await?;
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver i32: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgI16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver ---> {:?}", msg);
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver i16: sync");
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgI32> for TmpReceiver2 {
type Error = Error;
type Response = ();
async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver2: ---> {:?}", msg);
bus.send(MsgI16(5i16)).await?;
Ok(())
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver2: i32: sync");
Ok(())
}
}
impl Handler<MsgI16> for TmpReceiver2 {
type Error = Error;
type Response = ();
fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("TmpReceiver2: ---> {:?}", msg);
Ok(())
}
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver2: i16: sync");
Ok(())
}
}
#[tokio::main]
async fn main() {
env_logger::init();
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_async::<MsgF32>(8, Default::default())
.subscribe_async::<MsgU16>(8, Default::default())
.subscribe_async::<MsgU32>(8, Default::default())
.subscribe_async::<MsgI32>(8, Default::default())
.subscribe_async::<MsgI16>(8, Default::default())
.done()
.register(TmpReceiver2)
.subscribe_async::<MsgI32>(8, Default::default())
.subscribe_sync::<MsgI16>(8, Default::default())
.done()
.build();
b.send(MsgF32(0f32)).await.unwrap();
println!("flush");
b.flush().await;
println!("sending boxed variant");
b.send_boxed(Box::new(MsgF32(0f32)), Default::default())
.await
.unwrap();
println!("flush");
b.flush().await;
println!("close");
b.close().await;
println!("closed");
poller.await;
println!("[done]");
}

View File

@ -28,7 +28,7 @@ impl Handler<u32> for TmpReceiver {
fn module() -> Module {
Module::new()
.register(TmpReceiver)
.subscribe_sync::<u32>(8, Default::default())
.subscribe_sync::<u32>(8, Default::default())
.done()
}

View File

@ -2,12 +2,13 @@ use core::f32;
use async_trait::async_trait;
use messagebus::{
derive::Message,
error::{self, StdSyncSendError},
AsyncHandler, Bus, Message,
};
use thiserror::Error;
#[derive(Debug, Error)]
#[derive(Debug, Error, messagebus::derive::Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
@ -19,19 +20,55 @@ impl<M: Message, E: StdSyncSendError> From<error::Error<M, E>> for Error {
}
}
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgF64(pub f64);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgF32(pub f32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI32(pub i32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU32(pub u32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU16(pub u16);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI16(pub i16);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgU8(pub u8);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI8(pub i8);
struct TmpReceiver1;
struct TmpReceiver2;
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver1 {
impl AsyncHandler<MsgI32> for TmpReceiver1 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus.request::<_, f32>(10i16, Default::default()).await?;
let resp2 = bus.request::<_, f32>(20u16, Default::default()).await?;
async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus
.request::<_, MsgF32>(MsgI16(10i16), Default::default())
.await?;
let resp2 = bus
.request::<_, MsgF32>(MsgU16(20u16), Default::default())
.await?;
Ok(msg as f32 + resp1 + resp2)
Ok(MsgF32(msg.0 as f32 + resp1.0 + resp2.0))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
@ -42,12 +79,12 @@ impl AsyncHandler<i32> for TmpReceiver1 {
}
#[async_trait]
impl AsyncHandler<u32> for TmpReceiver1 {
impl AsyncHandler<MsgU32> for TmpReceiver1 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: u32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
async fn handle(&self, msg: MsgU32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(MsgF32(msg.0 as _))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 u32: sync");
@ -57,15 +94,19 @@ impl AsyncHandler<u32> for TmpReceiver1 {
}
#[async_trait]
impl AsyncHandler<i16> for TmpReceiver1 {
impl AsyncHandler<MsgI16> for TmpReceiver1 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: i16, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus.request::<_, f32>(1i8, Default::default()).await?;
let resp2 = bus.request::<_, f32>(2u8, Default::default()).await?;
async fn handle(&self, msg: MsgI16, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus
.request::<_, MsgF32>(MsgI8(1i8), Default::default())
.await?;
let resp2 = bus
.request::<_, MsgF32>(MsgU8(2u8), Default::default())
.await?;
Ok(msg as f32 + resp1 + resp2)
Ok(MsgF32(msg.0 as f32 + resp1.0 + resp2.0))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
@ -76,12 +117,12 @@ impl AsyncHandler<i16> for TmpReceiver1 {
}
#[async_trait]
impl AsyncHandler<u16> for TmpReceiver1 {
impl AsyncHandler<MsgU16> for TmpReceiver1 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: u16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
async fn handle(&self, msg: MsgU16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(MsgF32(msg.0 as _))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
@ -92,12 +133,12 @@ impl AsyncHandler<u16> for TmpReceiver1 {
}
#[async_trait]
impl AsyncHandler<i8> for TmpReceiver1 {
impl AsyncHandler<MsgI8> for TmpReceiver1 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: i8, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
async fn handle(&self, msg: MsgI8, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(MsgF32(msg.0 as _))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
@ -108,12 +149,12 @@ impl AsyncHandler<i8> for TmpReceiver1 {
}
#[async_trait]
impl AsyncHandler<u8> for TmpReceiver1 {
impl AsyncHandler<MsgU8> for TmpReceiver1 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: u8, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg as f32)
async fn handle(&self, msg: MsgU8, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(MsgF32(msg.0 as _))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
println!("TmpReceiver1 u8: sync");
@ -123,16 +164,25 @@ impl AsyncHandler<u8> for TmpReceiver1 {
}
#[async_trait]
impl AsyncHandler<f64> for TmpReceiver2 {
impl AsyncHandler<MsgF64> for TmpReceiver2 {
type Error = Error;
type Response = f64;
type Response = MsgF64;
async fn handle(&self, msg: f64, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus.request::<_, f32>(100i32, Default::default()).await? as f64;
let resp2 = bus.request::<_, f32>(200u32, Default::default()).await? as f64;
let resp3 = bus.request::<_, f32>(300f32, Default::default()).await? as f64;
async fn handle(&self, msg: MsgF64, bus: &Bus) -> Result<Self::Response, Self::Error> {
let resp1 = bus
.request::<_, MsgF32>(MsgI32(100i32), Default::default())
.await?
.0 as f64;
let resp2 = bus
.request::<_, MsgF32>(MsgU32(200u32), Default::default())
.await?
.0 as f64;
let resp3 = bus
.request::<_, MsgF32>(MsgF32(300f32), Default::default())
.await?
.0 as f64;
Ok(msg + resp1 + resp2 + resp3)
Ok(MsgF64(msg.0 + resp1 + resp2 + resp3))
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
@ -143,11 +193,11 @@ impl AsyncHandler<f64> for TmpReceiver2 {
}
#[async_trait]
impl AsyncHandler<f32> for TmpReceiver2 {
impl AsyncHandler<MsgF32> for TmpReceiver2 {
type Error = Error;
type Response = f32;
type Response = MsgF32;
async fn handle(&self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
async fn handle(&self, msg: MsgF32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
Ok(msg)
}
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
@ -161,22 +211,28 @@ impl AsyncHandler<f32> for TmpReceiver2 {
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver1)
.subscribe_async::<i32>(8, Default::default())
.subscribe_async::<u32>(8, Default::default())
.subscribe_async::<i16>(8, Default::default())
.subscribe_async::<u16>(8, Default::default())
.subscribe_async::<i8>(8, Default::default())
.subscribe_async::<u8>(8, Default::default())
.subscribe_async::<MsgI32>(8, Default::default())
.subscribe_async::<MsgU32>(8, Default::default())
.subscribe_async::<MsgI16>(8, Default::default())
.subscribe_async::<MsgU16>(8, Default::default())
.subscribe_async::<MsgI8>(8, Default::default())
.subscribe_async::<MsgU8>(8, Default::default())
.done()
.register(TmpReceiver2)
.subscribe_async::<f32>(8, Default::default())
.subscribe_async::<f64>(8, Default::default())
.subscribe_async::<MsgF32>(8, Default::default())
.subscribe_async::<MsgF64>(8, Default::default())
.done()
.build();
println!(
"{:?}",
b.request_local_we::<_, f64, Error>(1000f64, Default::default())
"plain {:?}",
b.request_we::<_, MsgF64, Error>(MsgF64(1000f64), Default::default())
.await
);
println!(
"boxed {:?}",
b.request_boxed(Box::new(MsgF64(1000.)), Default::default())
.await
);

53
examples/demo_shared.rs Normal file
View File

@ -0,0 +1,53 @@
use async_trait::async_trait;
use messagebus::{error, AsyncHandler, Bus, Message};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
#[derive(Debug, Clone)]
struct Msg;
struct TmpReceiver;
#[async_trait]
impl AsyncHandler<Msg> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: Msg, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {:?}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_local_async::<Msg>(8, Default::default())
.done()
.build();
b.send_local_one(Msg).await.unwrap();
println!("flushing");
b.flush().await;
println!("closing");
b.close().await;
println!("closed");
poller.await;
println!("[done]");
}

View File

@ -1,7 +1,10 @@
use messagebus::{error, Bus, Handler, Message, Module};
use messagebus::{
derive::{Error as MbError, Message},
error, Bus, Handler, Message, Module,
};
use thiserror::Error;
#[derive(Debug, Error)]
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
@ -13,14 +16,23 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Debug, Clone, Message)]
struct MsgF32(pub f32);
#[derive(Debug, Clone, Message)]
struct MsgU32(pub u32);
#[derive(Debug, Clone, Message)]
struct MsgU16(pub u16);
struct TmpReceiver;
impl Handler<f32> for TmpReceiver {
impl Handler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
fn handle(&self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {}", msg);
fn handle(&self, msg: MsgF32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> f32 {:?}", msg);
std::thread::sleep(std::time::Duration::from_secs(5));
@ -30,22 +42,22 @@ impl Handler<f32> for TmpReceiver {
}
}
impl Handler<u16> for TmpReceiver {
impl Handler<MsgU16> for TmpReceiver {
type Error = Error;
type Response = ();
fn handle(&self, msg: u16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u16 {}", msg);
fn handle(&self, msg: MsgU16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u16 {:?}", msg);
Ok(())
}
}
impl Handler<u32> for TmpReceiver {
impl Handler<MsgU32> for TmpReceiver {
type Error = Error;
type Response = ();
fn handle(&self, msg: u32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u32 {}", msg);
fn handle(&self, msg: MsgU32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u32 {:?}", msg);
Ok(())
}
}
@ -53,9 +65,9 @@ impl Handler<u32> for TmpReceiver {
fn module() -> Module {
Module::new()
.register(TmpReceiver)
.subscribe_sync::<f32>(8, Default::default())
.subscribe_sync::<u16>(8, Default::default())
.subscribe_sync::<u32>(8, Default::default())
.subscribe_sync::<MsgF32>(8, Default::default())
.subscribe_sync::<MsgU16>(8, Default::default())
.subscribe_sync::<MsgU32>(8, Default::default())
.done()
}
@ -63,9 +75,9 @@ fn module() -> Module {
async fn main() {
let (b, poller) = Bus::build().add_module(module()).build();
b.send(32f32).await.unwrap();
b.send(11u16).await.unwrap();
b.send(32u32).await.unwrap();
b.send(MsgF32(32f32)).await.unwrap();
b.send(MsgU16(11u16)).await.unwrap();
b.send(MsgU32(32u32)).await.unwrap();
println!("flush");
b.flush().await;

View File

@ -2,11 +2,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Bus, Message,
};
use thiserror::Error;
#[derive(Debug, Error, Clone)]
#[derive(Debug, Error, Clone, MbError)]
enum Error {
#[error("Error({0})")]
Error(Arc<anyhow::Error>),
@ -18,19 +19,26 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI32(i32);
#[derive(Debug, Clone, Message)]
#[message(clone)]
struct MsgI16(i16);
struct TmpReceiver;
#[async_trait]
impl AsyncBatchSynchronizedHandler<i32> for TmpReceiver {
impl AsyncBatchSynchronizedHandler<MsgI32> for TmpReceiver {
type Error = Error;
type Response = ();
type InBatch = Vec<i32>;
type InBatch = Vec<MsgI32>;
type OutBatch = Vec<()>;
async fn handle(
&mut self,
msg: Vec<i32>,
msg: Vec<MsgI32>,
_bus: &Bus,
) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i32; {}] {:?}", msg.len(), msg);
@ -39,13 +47,13 @@ impl AsyncBatchSynchronizedHandler<i32> for TmpReceiver {
}
}
impl BatchSynchronizedHandler<i16> for TmpReceiver {
impl BatchSynchronizedHandler<MsgI16> for TmpReceiver {
type Error = Error;
type Response = ();
type InBatch = Vec<i16>;
type InBatch = Vec<MsgI16>;
type OutBatch = Vec<()>;
fn handle(&mut self, msg: Vec<i16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
fn handle(&mut self, msg: Vec<MsgI16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i16; {}] {:?}", msg.len(), msg);
Ok(vec![])
}
@ -55,22 +63,22 @@ impl BatchSynchronizedHandler<i16> for TmpReceiver {
async fn main() {
let (b, poller) = Bus::build()
.register_unsync(TmpReceiver)
.subscribe_batch_async::<i32>(16, Default::default())
.subscribe_batch_sync::<i16>(16, Default::default())
.subscribe_batch_async::<MsgI32>(16, Default::default())
.subscribe_batch_sync::<MsgI16>(16, Default::default())
.done()
.build();
for i in 1..100i32 {
b.send(i).await.unwrap();
b.send(MsgI32(i)).await.unwrap();
}
b.send(1i16).await.unwrap();
b.send(2i16).await.unwrap();
b.send(3i16).await.unwrap();
b.send(4i16).await.unwrap();
b.send(5i16).await.unwrap();
b.send(6i16).await.unwrap();
b.send(7i16).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgI16(2i16)).await.unwrap();
b.send(MsgI16(3i16)).await.unwrap();
b.send(MsgI16(4i16)).await.unwrap();
b.send(MsgI16(5i16)).await.unwrap();
b.send(MsgI16(6i16)).await.unwrap();
b.send(MsgI16(7i16)).await.unwrap();
println!("flush");
b.flush().await;

View File

@ -1,8 +1,11 @@
use async_trait::async_trait;
use messagebus::{error, receivers, AsyncSynchronizedHandler, Bus, Message, SynchronizedHandler};
use messagebus::{
derive::{Error as MbError, Message},
error, AsyncSynchronizedHandler, Bus, Message, SynchronizedHandler,
};
use thiserror::Error;
#[derive(Debug, Error)]
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
@ -14,15 +17,21 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Debug, Clone, Message)]
struct MsgF32(pub f32);
#[derive(Debug, Clone, Message)]
struct MsgI16(pub i16);
struct TmpReceiver;
impl SynchronizedHandler<f32> for TmpReceiver {
impl SynchronizedHandler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
fn handle(&mut self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
fn handle(&mut self, msg: MsgF32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
// std::thread::sleep(std::time::Duration::from_millis(100));
println!("---> f32 {}", msg);
println!("---> f32 {:?}", msg);
println!("done");
Ok(())
@ -30,13 +39,13 @@ impl SynchronizedHandler<f32> for TmpReceiver {
}
#[async_trait]
impl AsyncSynchronizedHandler<i16> for TmpReceiver {
impl AsyncSynchronizedHandler<MsgI16> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&mut self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
async fn handle(&mut self, msg: MsgI16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
std::thread::sleep(std::time::Duration::from_millis(100));
println!("---> i16 {}", msg);
println!("---> i16 {:?}", msg);
println!("done");
Ok(())
@ -47,30 +56,30 @@ impl AsyncSynchronizedHandler<i16> for TmpReceiver {
async fn main() {
let (b, poller) = Bus::build()
.register_unsync(TmpReceiver)
.subscribe::<f32, receivers::SynchronizedSync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i16, receivers::SynchronizedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe_sync::<MsgF32>(8, Default::default())
.subscribe_async::<MsgI16>(8, Default::default())
.done()
.build();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap();
b.send(MsgF32(12.0f32)).await.unwrap();
b.send(MsgI16(1i16)).await.unwrap();
println!("flush");

View File

@ -1,9 +1,16 @@
use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc};
use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc};
use futures::{Future, FutureExt};
use tokio::sync::Mutex;
use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, SynchronizedHandler, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers};
use crate::{
envelop::TypeTag,
error::StdSyncSendError,
receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler,
AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler,
Message, SynchronizedHandler, Untyped,
};
pub trait ReceiverSubscriberBuilder<T, M, R, E>:
SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
@ -36,7 +43,7 @@ pub struct RegisterEntry<K, T, F, B> {
payload: B,
builder: F,
receivers: HashMap<
TypeId,
TypeTag,
Vec<(
Receiver,
Box<
@ -55,7 +62,7 @@ impl<K, T: 'static, F, B> RegisterEntry<K, T, F, B>
where
F: FnMut(
&mut B,
(TypeId, Receiver),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
@ -65,7 +72,7 @@ where
for (r, poller, poller2) in v {
let poller = poller(self.item.clone());
(self.builder)(&mut self.payload, (tid, r), poller, poller2);
(self.builder)(&mut self.payload, (tid.clone(), r), poller, poller2);
}
}
@ -87,7 +94,7 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
let receiver = Receiver::new::<M, R, E, S>(queue, inner);
let poller2 = receiver.start_polling_events::<R, E>();
self.receivers
.entry(TypeId::of::<M>())
.entry(M::type_tag_())
.or_insert_with(Vec::new)
.push((receiver, poller, poller2));
@ -99,6 +106,7 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
where
T: SynchronizedHandler<M> + Send + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::SynchronizedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
@ -108,24 +116,35 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
where
T: AsyncSynchronizedHandler<M> + Send + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::SynchronizedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_sync<M>(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self
pub fn subscribe_batch_sync<M>(
self,
queue: u64,
cfg: receivers::SynchronizedBatchedConfig,
) -> Self
where
T: BatchSynchronizedHandler<M> + Send + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::SynchronizedBatchedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_async<M>(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self
pub fn subscribe_batch_async<M>(
self,
queue: u64,
cfg: receivers::SynchronizedBatchedConfig,
) -> Self
where
T: AsyncBatchSynchronizedHandler<M> + Send + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::SynchronizedBatchedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
@ -145,7 +164,7 @@ impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
let receiver = Receiver::new::<M, R, E, S>(queue, inner);
let poller2 = receiver.start_polling_events::<R, E>();
self.receivers
.entry(TypeId::of::<M>())
.entry(M::type_tag_())
.or_insert_with(Vec::new)
.push((receiver, poller, poller2));
@ -157,6 +176,7 @@ impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
where
T: Handler<M> + Send + Sync + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::BufferUnorderedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
@ -173,26 +193,36 @@ impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
}
#[inline]
pub fn subscribe_batch_sync<M>(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self
pub fn subscribe_batch_sync<M>(
self,
queue: u64,
cfg: receivers::BufferUnorderedBatchedConfig,
) -> Self
where
T: BatchHandler<M> + Send + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::BufferUnorderedBatchedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_async<M>(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self
pub fn subscribe_batch_async<M>(
self,
queue: u64,
cfg: receivers::BufferUnorderedBatchedConfig,
) -> Self
where
T: AsyncBatchHandler<M> + Send + 'static,
M: Message,
T::Response: Message,
{
self.subscribe::<M, receivers::BufferUnorderedBatchedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
}
pub struct Module {
receivers: Vec<(TypeId, Receiver)>,
receivers: Vec<(TypeTag, Receiver)>,
pollings: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>,
}
@ -212,7 +242,7 @@ impl Module {
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
@ -239,7 +269,7 @@ impl Module {
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
@ -285,7 +315,7 @@ impl BusBuilder {
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
@ -312,7 +342,7 @@ impl BusBuilder {
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
(TypeTag, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),

View File

@ -1,45 +1,338 @@
use core::any::{Any, type_name};
use core::any::Any;
use core::fmt;
use serde::{de::DeserializeOwned, Serialize};
use std::any::type_name;
use std::borrow::Cow;
use std::sync::Arc;
pub trait Message:
fmt::Debug + Unpin + Send + Sync + 'static
{
fn type_name(&self) -> &str;
#[derive(Debug, Copy, Clone)]
#[repr(C)]
pub struct TraitObject {
pub data: *mut (),
pub vtable: *mut (),
}
impl<T: fmt::Debug + Unpin + Send + Sync + 'static> Message for T {
fn type_name(&self) -> &str {
type_name::<T>()
pub type TypeTag = Cow<'static, str>;
pub trait TypeTagged {
fn type_tag_() -> TypeTag
where
Self: Sized;
fn type_tag(&self) -> TypeTag;
fn type_name(&self) -> Cow<str>;
}
pub trait Message: TypeTagged + fmt::Debug + Unpin + Send + Sync + 'static {
fn as_any_ref(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any>;
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any>;
fn as_shared_ref(&self) -> Option<&dyn SharedMessage>;
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage>;
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>>;
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>>;
fn try_clone_into(&self, into: &mut dyn Any) -> bool;
fn try_clone_boxed(&self) -> Option<Box<dyn Message>>;
}
impl TypeTagged for () {
fn type_tag_() -> TypeTag {
type_name::<Self>().into()
}
fn type_tag(&self) -> TypeTag {
type_name::<Self>().into()
}
fn type_name(&self) -> Cow<str> {
type_name::<Self>().into()
}
}
pub trait TransferableMessage: Message + Serialize + DeserializeOwned
{
fn into_boxed(self) -> BoxedMessage;
}
impl<T: Message + Serialize + DeserializeOwned> TransferableMessage for T {
fn into_boxed(self) -> BoxedMessage {
BoxedMessage(Box::new(self) as _)
impl Message for () {
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
Some(self)
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
Some(self)
}
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
Some(self)
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
Some(self)
}
fn try_clone_into(&self, into: &mut dyn Any) -> bool {
let into = if let Some(inner) = into.downcast_mut::<Option<()>>() {
inner
} else {
return false;
};
into.replace(self.clone());
true
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(self.clone()))
}
}
pub trait SafeMessage:
Any + fmt::Debug + erased_serde::Serialize + Unpin + Send + Sync + 'static
{
fn type_name(&self) -> &str;
pub trait IntoBoxedMessage {
fn into_boxed(self) -> Box<dyn Message>;
}
impl<T: Any + fmt::Debug + erased_serde::Serialize + Unpin + Send + Sync> SafeMessage for T {
fn type_name(&self) -> &str {
type_name::<T>()
impl<T: Message> IntoBoxedMessage for T {
fn into_boxed(self) -> Box<dyn Message> {
Box::new(self)
}
}
#[derive(Debug)]
pub struct BoxedMessage(Box<dyn SafeMessage>);
pub trait SharedMessage: Message + erased_serde::Serialize {}
impl<T: Message + erased_serde::Serialize> SharedMessage for T {}
impl<M: TransferableMessage> From<M> for BoxedMessage {
fn from(m: M) -> Self {
BoxedMessage(Box::new(m))
// pub trait IntoTakeable {
// fn into_takeable(&mut self) -> Takeable<'_>;
// }
// impl<T: 'static> IntoTakeable for Option<T> {
// fn into_takeable(&mut self) -> Takeable<'_> {
// Takeable {
// inner_ref: self
// }
// }
// }
// pub struct Takeable<'a> {
// inner_ref: &'a mut dyn Any,
// }
// impl Takeable<'_> {
// pub fn take<M: Message>(&mut self) -> Option<M> {
// let m = self.inner_ref.downcast_mut::<Option<M>>()?;
// m.take()
// }
// }
#[cfg(test)]
mod tests {
use super::*;
use erased_serde::Serializer;
use std::{any::type_name, borrow::Cow};
#[derive(Debug, Clone)]
struct Msg0;
impl TypeTagged for Msg0 {
fn type_tag_() -> TypeTag {
type_name::<Self>().into()
}
fn type_tag(&self) -> TypeTag {
type_name::<Self>().into()
}
fn type_name(&self) -> Cow<str> {
type_name::<Self>().into()
}
}
impl Message for Msg0 {
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
None
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
None
}
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
None
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
None
}
fn try_clone_into(&self, _: &mut dyn Any) -> bool {
false
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
None
}
}
#[derive(Debug, Clone)]
struct Msg1;
impl TypeTagged for Msg1 {
fn type_tag_() -> TypeTag {
type_name::<Self>().into()
}
fn type_tag(&self) -> TypeTag {
type_name::<Self>().into()
}
fn type_name(&self) -> Cow<str> {
type_name::<Self>().into()
}
}
impl Message for Msg1 {
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
None
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
None
}
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
None
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
None
}
fn try_clone_into(&self, into: &mut dyn Any) -> bool {
let into = if let Some(inner) = into.downcast_mut::<Option<Msg1>>() {
inner
} else {
return false;
};
into.replace(self.clone());
true
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(self.clone()))
}
}
#[derive(Debug, Clone, serde_derive::Serialize, serde_derive::Deserialize)]
struct Msg2 {
inner: [i32; 2],
}
impl TypeTagged for Msg2 {
fn type_tag_() -> TypeTag {
type_name::<Self>().into()
}
fn type_tag(&self) -> TypeTag {
type_name::<Self>().into()
}
fn type_name(&self) -> Cow<str> {
type_name::<Self>().into()
}
}
impl Message for Msg2 {
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
Some(self)
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
Some(self)
}
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
Some(self)
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
Some(self)
}
fn try_clone_into(&self, into: &mut dyn Any) -> bool {
let into = if let Some(inner) = into.downcast_mut::<Option<Msg2>>() {
inner
} else {
return false;
};
into.replace(self.clone());
true
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(self.clone()))
}
}
#[test]
fn test_static_upcast() {
let mut buff: Vec<u8> = Vec::new();
let json = &mut serde_json::Serializer::new(&mut buff);
let mut json = <dyn Serializer>::erase(json);
let x = Msg1;
let y = Msg2 { inner: [12, 13] };
assert!(x.as_shared_ref().is_none());
assert!(y.as_shared_ref().is_some());
assert!(y
.as_shared_ref()
.unwrap()
.erased_serialize(&mut json)
.is_ok());
assert_eq!(buff.as_slice(), b"{\"inner\":[12,13]}");
}
#[test]
fn test_dyn_upcast() {
let mut buff: Vec<u8> = Vec::new();
let json = &mut serde_json::Serializer::new(&mut buff);
let mut json = <dyn Serializer>::erase(json);
let x = Msg1;
let y = Msg2 { inner: [12, 13] };
let x_dyn: &dyn Message = &x;
let y_dyn: &dyn Message = &y;
assert!(x_dyn.as_shared_ref().is_none());
assert!(y_dyn.as_shared_ref().is_some());
assert!(y_dyn
.as_shared_ref()
.unwrap()
.erased_serialize(&mut json)
.is_ok());
assert_eq!(buff.as_slice(), b"{\"inner\":[12,13]}");
}
}

View File

@ -1,16 +1,37 @@
use core::fmt;
use std::{any::type_name, borrow::Cow};
use thiserror::Error;
use tokio::sync::oneshot;
use crate::{Message, envelop::{BoxedMessage, TransferableMessage}};
use crate::{
envelop::{IntoBoxedMessage, TypeTag, TypeTagged},
Message,
};
pub trait StdSyncSendError: std::error::Error + Send + Sync + Unpin + 'static {}
impl<T: std::error::Error + Send + Sync + Unpin + 'static> StdSyncSendError for T {}
pub trait StdSyncSendError: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static {}
impl<T: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static> StdSyncSendError for T {}
#[derive(Debug, Error)]
pub enum VoidError {}
impl TypeTagged for VoidError {
fn type_name(&self) -> Cow<str> {
type_name::<VoidError>().into()
}
fn type_tag(&self) -> TypeTag {
type_name::<VoidError>().into()
}
fn type_tag_() -> TypeTag
where
Self: Sized,
{
type_name::<VoidError>().into()
}
}
#[derive(Debug, Error)]
pub enum SendError<M: fmt::Debug> {
#[error("Closed")]
@ -20,11 +41,11 @@ pub enum SendError<M: fmt::Debug> {
Full(M),
}
impl<M: TransferableMessage> SendError<M> {
pub fn into_boxed(self) -> SendError<BoxedMessage> {
impl<M: Message> SendError<M> {
pub fn into_boxed(self) -> SendError<Box<dyn Message>> {
match self {
SendError::Closed(m) => SendError::Closed(BoxedMessage::from(m)),
SendError::Full(m) => SendError::Closed(BoxedMessage::from(m)),
SendError::Closed(m) => SendError::Closed(m.into_boxed()),
SendError::Full(m) => SendError::Closed(m.into_boxed()),
}
}
}
@ -40,6 +61,12 @@ pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = VoidError> {
#[error("NoReceivers")]
NoReceivers,
#[error("AddListenerError")]
AddListenerError,
#[error("MessageCastError")]
MessageCastError,
#[error("Other({0})")]
Other(E),
@ -48,6 +75,9 @@ pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = VoidError> {
#[error("Other({0})")]
OtherBoxed(Box<dyn StdSyncSendError>),
#[error("WrongMessageType()")]
WrongMessageType(M),
}
impl<M: Message, E: StdSyncSendError> Error<M, E> {
@ -59,6 +89,9 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
Error::WrongMessageType(inner) => Error::WrongMessageType(inner),
Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError,
}
}
@ -70,25 +103,52 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
Error::Serialization(s) => Error::Serialization(s),
Error::Other(_) => panic!("expected boxed error!"),
Error::OtherBoxed(inner) => Error::Other(inner.into()),
Error::WrongMessageType(inner) => Error::WrongMessageType(inner),
Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError,
}
}
}
impl<E: StdSyncSendError> Error<(), E> {
pub fn specify<M: Message>(self) -> Error<M, E> {
pub fn specify<M: fmt::Debug>(self) -> Error<M, E> {
match self {
Error::SendError(_) => panic!("cannot specify type on typed error"),
Error::WrongMessageType(_) => panic!("cannot specify type on typed error"),
Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(inner),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError,
}
}
}
impl<M: Message, E: StdSyncSendError> From<oneshot::error::RecvError> for Error<M, E> {
impl<M: fmt::Debug, E: StdSyncSendError> From<oneshot::error::RecvError> for Error<M, E> {
fn from(_: oneshot::error::RecvError) -> Self {
Error::NoResponse
}
}
impl Error<Box<dyn Message>> {
pub fn from_typed<M: Message>(err: Error<M>) -> Self {
match err {
Error::SendError(SendError::Closed(m)) => {
Error::SendError(SendError::Closed(m.into_boxed()))
}
Error::SendError(SendError::Full(m)) => {
Error::SendError(SendError::Full(m.into_boxed()))
}
Error::WrongMessageType(m) => Error::WrongMessageType(m.into_boxed()),
Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(inner),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
Error::AddListenerError => Error::AddListenerError,
Error::MessageCastError => Error::MessageCastError,
}
}
}

View File

@ -89,7 +89,11 @@ pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn handle(
&mut self,
msg: Self::InBatch,
bus: &Bus,
) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
@ -135,7 +139,11 @@ pub trait LocalAsyncBatchHandler<M: Message> {
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn handle(
&mut self,
msg: Self::InBatch,
bus: &Bus,
) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}

View File

@ -4,21 +4,25 @@ pub mod error;
mod handler;
mod receiver;
pub mod receivers;
mod trait_object;
pub mod relay;
mod trait_object;
#[macro_use]
extern crate log;
pub mod derive {
pub use messagebus_derive::*;
}
use crate::receiver::Permit;
use builder::BusBuilder;
pub use builder::Module;
pub use relay::RelayTrait;
use core::any::{Any, TypeId};
pub use envelop::{BoxedMessage, TransferableMessage, Message};
use core::any::Any;
pub use envelop::{Message, SharedMessage, TypeTag, TypeTagged};
use error::{Error, SendError, StdSyncSendError};
pub use handler::*;
use receiver::Receiver;
pub use relay::RelayTrait;
use smallvec::SmallVec;
use std::{
collections::HashMap,
@ -48,12 +52,12 @@ impl Default for SendOptions {
}
pub struct BusInner {
receivers: HashMap<TypeId, SmallVec<[Receiver; 4]>>,
receivers: HashMap<TypeTag, SmallVec<[Receiver; 4]>>,
closed: AtomicBool,
}
impl BusInner {
pub(crate) fn new(input: Vec<(TypeId, Receiver)>) -> Self {
pub(crate) fn new(input: Vec<(TypeTag, Receiver)>) -> Self {
let mut receivers = HashMap::new();
for (key, value) in input {
@ -136,11 +140,11 @@ impl BusInner {
}
#[inline]
pub fn try_send<M: TransferableMessage + Clone>(&self, msg: M) -> Result<(), Error<M>> {
pub fn try_send<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.try_send_ext(msg, SendOptions::Broadcast)
}
pub fn try_send_ext<M: TransferableMessage + Clone>(
pub fn try_send_ext<M: Message + Clone>(
&self,
msg: M,
_options: SendOptions,
@ -149,10 +153,10 @@ impl BusInner {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid) {
if let Some(rs) = self.receivers.get(&tt) {
let permits = if let Some(x) = self.try_reserve(rs) {
x
} else {
@ -185,12 +189,12 @@ impl BusInner {
}
#[inline]
pub fn send_blocking<M: TransferableMessage + Clone>(&self, msg: M) -> Result<(), Error<M>> {
pub fn send_blocking<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.send_blocking_ext(msg, SendOptions::Broadcast)
}
#[inline]
pub fn send_blocking_ext<M: TransferableMessage + Clone>(
pub fn send_blocking_ext<M: Message + Clone>(
&self,
msg: M,
options: SendOptions,
@ -199,11 +203,11 @@ impl BusInner {
}
#[inline]
pub async fn send<M: TransferableMessage + Clone>(&self, msg: M) -> core::result::Result<(), Error<M>> {
pub async fn send<M: Message + Clone>(&self, msg: M) -> core::result::Result<(), Error<M>> {
Ok(self.send_ext(msg, SendOptions::Broadcast).await?)
}
pub async fn send_ext<M: TransferableMessage + Clone>(
pub async fn send_ext<M: Message + Clone>(
&self,
msg: M,
_options: SendOptions,
@ -212,10 +216,10 @@ impl BusInner {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid) {
if let Some(rs) = self.receivers.get(&tt) {
if let Some((last, head)) = rs.split_last() {
for r in head {
let _ = r.send(mid, msg.clone(), r.reserve().await);
@ -236,11 +240,11 @@ impl BusInner {
}
#[inline]
pub fn force_send<M: TransferableMessage + Clone>(&self, msg: M) -> Result<(), Error<M>> {
pub fn force_send<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.force_send_ext(msg, SendOptions::Broadcast)
}
pub fn force_send_ext<M: TransferableMessage + Clone>(
pub fn force_send_ext<M: Message + Clone>(
&self,
msg: M,
_options: SendOptions,
@ -249,10 +253,10 @@ impl BusInner {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid) {
if let Some(rs) = self.receivers.get(&tt) {
if let Some((last, head)) = rs.split_last() {
for r in head {
let _ = r.force_send(mid, msg.clone());
@ -273,15 +277,15 @@ impl BusInner {
}
#[inline]
pub fn try_send_one<M: TransferableMessage>(&self, msg: M) -> Result<(), Error<M>> {
pub fn try_send_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) {
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
let permits = if let Some(x) = rs.try_reserve() {
x
} else {
@ -294,30 +298,15 @@ impl BusInner {
}
}
pub async fn send_one<M: TransferableMessage>(&self, msg: M) -> Result<(), Error<M>> {
pub async fn send_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) {
Ok(rs.send(mid, msg, rs.reserve().await)?)
} else {
Err(Error::NoReceivers)
}
}
pub async fn send_local_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) {
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
Ok(rs.send(mid, msg, rs.reserve().await)?)
} else {
Err(Error::NoReceivers)
@ -325,19 +314,19 @@ impl BusInner {
}
#[inline]
pub fn send_one_blocking<M: TransferableMessage>(&self, msg: M) -> Result<(), Error<M>> {
pub fn send_one_blocking<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
futures::executor::block_on(self.send_one(msg))
}
pub async fn request<M: TransferableMessage, R: TransferableMessage>(
pub async fn request<M: Message, R: Message>(
&self,
req: M,
options: SendOptions,
) -> Result<R, Error<M>> {
let tid = TypeId::of::<M>();
let rid = TypeId::of::<R>();
let tid = req.type_tag();
let rid = R::type_tag_();
let mut iter = self.select_receivers(tid, options, Some(rid), None);
let mut iter = self.select_receivers(&tid, options, Some(&rid), None);
if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
@ -349,17 +338,17 @@ impl BusInner {
}
}
pub async fn request_local_we<M, R, E>(&self, req: M, options: SendOptions) -> Result<R, Error<M, E>>
pub async fn request_we<M, R, E>(&self, req: M, options: SendOptions) -> Result<R, Error<M, E>>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
let tid = TypeId::of::<M>();
let rid = TypeId::of::<R>();
let eid = TypeId::of::<E>();
let tid = M::type_tag_();
let rid = R::type_tag_();
let eid = E::type_tag_();
let mut iter = self.select_receivers(tid, options, Some(rid), Some(eid));
let mut iter = self.select_receivers(&tid, options, Some(&rid), Some(&eid));
if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter_we(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
@ -371,23 +360,94 @@ impl BusInner {
}
}
#[inline]
fn select_receivers(
pub async fn send_boxed(
&self,
tid: TypeId,
msg: Box<dyn Message>,
_options: SendOptions,
rid: Option<TypeId>,
eid: Option<TypeId>,
) -> impl Iterator<Item = &Receiver> + '_ {
) -> Result<(), Error<Box<dyn Message>>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
if let Some(rs) = self.receivers.get(&tt) {
if let Some((last, head)) = rs.split_last() {
for r in head {
let _ = r.send_boxed(mid, msg.try_clone_boxed().unwrap(), r.reserve().await);
}
let _ = last.send_boxed(mid, msg, last.reserve().await);
return Ok(());
}
}
warn!("Unhandled message: no receivers");
Ok(())
}
pub async fn send_boxed_one(
&self,
msg: Box<dyn Message>,
_options: SendOptions,
) -> Result<(), Error<Box<dyn Message>>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let tt = msg.type_tag();
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) {
Ok(rs.send_boxed(mid, msg, rs.reserve().await)?)
} else {
Err(Error::NoReceivers)
}
}
pub async fn request_boxed(
&self,
req: Box<dyn Message>,
options: SendOptions,
) -> Result<Box<dyn Message>, Error<Box<dyn Message>>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(req).into());
}
let tt = req.type_tag();
let mut iter = self.select_receivers(&tt, options, None, None);
if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter_boxed(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
rc.send_boxed(mid, req, rc.reserve().await)?;
rx.await?.map_err(|x| x.specify::<Box<dyn Message>>())
} else {
Err(Error::NoReceivers)
}
}
#[inline]
fn select_receivers<'a, 'b: 'a, 'c: 'a, 'd: 'a>(
&'a self,
tid: &'b TypeTag,
_options: SendOptions,
rid: Option<&'c TypeTag>,
eid: Option<&'d TypeTag>,
) -> impl Iterator<Item = &Receiver> + 'a {
self.receivers
.get(&tid)
.get(tid)
.into_iter()
.map(|item| item.iter())
.flatten()
.filter(move |x| match (rid, eid) {
(Some(r), Some(e)) => x.resp_type_id() == r && x.err_type_id() == e,
(Some(r), None) => x.resp_type_id() == r,
(None, Some(e)) => x.err_type_id() == e,
(Some(r), Some(e)) => x.resp_type_tag() == r && x.err_type_tag() == e,
(Some(r), None) => x.resp_type_tag() == r,
(None, Some(e)) => x.err_type_tag() == e,
(None, None) => true,
})
}

View File

@ -1,4 +1,9 @@
use crate::{Bus, Error, Message, envelop::{BoxedMessage, TransferableMessage}, error::{SendError, StdSyncSendError}, trait_object::TraitObject};
use crate::{
envelop::{IntoBoxedMessage, TypeTag},
error::{SendError, StdSyncSendError},
trait_object::TraitObject,
Bus, Error, Message,
};
use core::{
any::TypeId,
fmt,
@ -7,7 +12,6 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use erased_serde::Deserializer;
use futures::future::poll_fn;
use futures::Future;
use std::{
@ -43,20 +47,36 @@ where
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>;
}
pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<R, Error>>,
) -> Result<usize, Error>;
}
pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Sync {
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<R, Error<(), E>>>,
) -> Result<usize, Error>;
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error>;
}
pub trait ReceiverTrait: Send + Sync {
fn typed(&self) -> AnyReceiver<'_>;
fn poller(&self) -> AnyPoller<'_>;
fn wrapper(&self) -> AnyWrapper<'_>;
fn name(&self) -> &str;
fn send_boxed(&self, mid: u64, msg: Box<dyn Message>) -> Result<(), Error<Box<dyn Message>>>;
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<usize, Error>;
fn stats(&self) -> Result<(), Error<Action>>;
fn close(&self) -> Result<(), Error<Action>>;
fn sync(&self) -> Result<(), Error<Action>>;
fn flush(&self) -> Result<(), Error<Action>>;
}
pub trait TransferableReceiverTrait: Send + Sync {
fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error<BoxedMessage>>;
}
pub trait ReceiverPollerBuilder {
fn build(bus: Bus) -> Box<dyn Future<Output = ()>>;
}
@ -107,9 +127,62 @@ where
S: 'static,
{
inner: S,
waiters: Slab<Waiter<R, E>>,
_m: PhantomData<(M, R, E)>,
}
impl<M, R, E, S> WrapperReturnTypeAndError<R, E> for ReceiverWrapper<M, R, E, S>
where
M: Message,
R: Message,
E: StdSyncSendError,
S: Send + Sync + 'static,
{
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<R, Error<(), E>>>,
) -> Result<usize, Error> {
Ok(self
.waiters
.insert(Waiter::WithErrorType(listener))
.ok_or_else(|| Error::AddListenerError)?)
}
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error> {
if let Some(waiter) = self.waiters.take(mid as _) {
match waiter {
Waiter::WithErrorType(sender) => sender.send(resp).unwrap(),
Waiter::WithoutErrorType(sender) => {
sender.send(resp.map_err(|e| e.into_dyn())).unwrap()
}
Waiter::Boxed(sender) => sender
.send(resp.map_err(|e| e.into_dyn()).map(|x| x.into_boxed()))
.unwrap(),
}
}
Ok(())
}
}
impl<M, R, E, S> WrapperReturnTypeOnly<R> for ReceiverWrapper<M, R, E, S>
where
M: Message,
R: Message,
E: StdSyncSendError,
S: Send + Sync + 'static,
{
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<R, Error>>,
) -> Result<usize, Error> {
Ok(self
.waiters
.insert(Waiter::WithoutErrorType(listener))
.ok_or_else(|| Error::AddListenerError)?)
}
}
impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S>
where
M: Message,
@ -125,8 +198,22 @@ where
AnyReceiver::new(&self.inner)
}
fn poller(&self) -> AnyPoller<'_> {
AnyPoller::new(&self.inner)
fn wrapper(&self) -> AnyWrapper<'_> {
AnyWrapper::new(self)
}
fn send_boxed(
&self,
mid: u64,
boxed_msg: Box<dyn Message>,
) -> Result<(), Error<Box<dyn Message>>> {
let boxed = boxed_msg
.as_any_boxed()
.downcast::<M>()
.map_err(|_| Error::MessageCastError)?;
Ok(SendTypedReceiver::send(&self.inner, mid, *boxed)
.map_err(|err| Error::from(err.into_boxed()))?)
}
fn stats(&self) -> Result<(), Error<Action>> {
@ -144,21 +231,18 @@ where
fn flush(&self) -> Result<(), Error<Action>> {
Ok(SendUntypedReceiver::send(&self.inner, Action::Flush)?)
}
}
impl<M, R, E, S> TransferableReceiverTrait for ReceiverWrapper<M, R, E, S>
where
M: TransferableMessage,
R: TransferableMessage,
E: StdSyncSendError,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{
fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error<BoxedMessage>> {
unimplemented!()
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<usize, Error> {
Ok(self
.waiters
.insert(Waiter::Boxed(listener))
.ok_or_else(|| Error::AddListenerError)?)
}
}
pub struct Permit {
pub(crate) fuse: bool,
pub(crate) inner: Arc<dyn PermitDrop>,
@ -173,64 +257,134 @@ impl Drop for Permit {
}
pub struct AnyReceiver<'a> {
dyn_typed_receiver_trait_object: TraitObject,
type_id: TypeId,
data: *mut (),
typed: (TypeId, *mut ()),
poller: (TypeId, *mut ()),
_m: PhantomData<&'a usize>,
}
impl<'a> AnyReceiver<'a> {
pub fn new<M, R, E, S>(rcvr: &'a S) -> Self
where
M: Message,
R: Message,
E: StdSyncSendError,
S: SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{
let send_typed_receiver = rcvr as &(dyn SendTypedReceiver<M>);
let recive_typed_receiver = rcvr as &(dyn ReciveTypedReceiver<R, E>);
let send_typed_receiver: TraitObject = unsafe { mem::transmute(send_typed_receiver) };
let recive_typed_receiver: TraitObject = unsafe { mem::transmute(recive_typed_receiver) };
Self {
data: send_typed_receiver.data,
typed: (
TypeId::of::<dyn SendTypedReceiver<M>>(),
send_typed_receiver.vtable,
),
poller: (
TypeId::of::<dyn ReciveTypedReceiver<R, E>>(),
recive_typed_receiver.vtable,
),
_m: Default::default(),
}
}
pub fn cast_send_typed<M: Message>(&'a self) -> &'a dyn SendTypedReceiver<M> {
assert_eq!(self.typed.0, TypeId::of::<dyn SendTypedReceiver<M>>());
unsafe {
mem::transmute(TraitObject {
data: self.data,
vtable: self.typed.1,
})
}
}
pub fn cast_recive_typed<R: Message, E: StdSyncSendError>(
&'a self,
) -> &'a dyn ReciveTypedReceiver<R, E> {
assert_eq!(self.poller.0, TypeId::of::<dyn ReciveTypedReceiver<R, E>>());
unsafe {
mem::transmute(TraitObject {
data: self.data,
vtable: self.poller.1,
})
}
}
}
unsafe impl Send for AnyReceiver<'_> {}
impl<'a> AnyReceiver<'a> {
pub fn new<M: Message, R: SendTypedReceiver<M> + 'static>(rcvr: &'a R) -> Self {
let trcvr = rcvr as &(dyn SendTypedReceiver<M>);
Self {
dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) },
type_id: TypeId::of::<dyn SendTypedReceiver<M>>(),
_m: Default::default(),
}
}
pub fn dyn_typed_receiver<M: Message>(&'a self) -> &'a dyn SendTypedReceiver<M> {
assert_eq!(self.type_id, TypeId::of::<dyn SendTypedReceiver<M>>());
unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) }
}
}
pub struct AnyPoller<'a> {
dyn_typed_receiver_trait_object: TraitObject,
type_id: TypeId,
pub struct AnyWrapper<'a> {
data: *mut (),
wrapper_r: (TypeId, *mut ()),
wrapper_re: (TypeId, *mut ()),
_m: PhantomData<&'a usize>,
}
unsafe impl Send for AnyPoller<'_> {}
impl<'a> AnyPoller<'a> {
pub fn new<M, E, R>(rcvr: &'a R) -> Self
impl<'a> AnyWrapper<'a> {
pub fn new<R, E, S>(rcvr: &'a S) -> Self
where
M: Message,
R: Message,
E: StdSyncSendError,
R: ReciveTypedReceiver<M, E> + 'static,
S: WrapperReturnTypeOnly<R> + WrapperReturnTypeAndError<R, E> + 'static,
{
let trcvr = rcvr as &(dyn ReciveTypedReceiver<M, E>);
let wrapper_r = rcvr as &(dyn WrapperReturnTypeOnly<R>);
let wrapper_re = rcvr as &(dyn WrapperReturnTypeAndError<R, E>);
let wrapper_r: TraitObject = unsafe { mem::transmute(wrapper_r) };
let wrapper_re: TraitObject = unsafe { mem::transmute(wrapper_re) };
Self {
dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) },
type_id: TypeId::of::<dyn ReciveTypedReceiver<M, E>>(),
data: wrapper_r.data,
wrapper_r: (
TypeId::of::<dyn WrapperReturnTypeOnly<R>>(),
wrapper_r.vtable,
),
wrapper_re: (
TypeId::of::<dyn WrapperReturnTypeAndError<R, E>>(),
wrapper_re.vtable,
),
_m: Default::default(),
}
}
pub fn dyn_typed_receiver<M: Message, E: StdSyncSendError>(
&'a self,
) -> &'a dyn ReciveTypedReceiver<M, E> {
assert_eq!(self.type_id, TypeId::of::<dyn ReciveTypedReceiver<M, E>>());
pub fn cast_ret_only<R: Message>(&'a self) -> &'a dyn WrapperReturnTypeOnly<R> {
assert_eq!(
self.wrapper_r.0,
TypeId::of::<dyn WrapperReturnTypeOnly<R>>()
);
unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) }
unsafe {
mem::transmute(TraitObject {
data: self.data,
vtable: self.wrapper_r.1,
})
}
}
pub fn cast_ret_and_error<R: Message, E: StdSyncSendError>(
&'a self,
) -> &'a dyn WrapperReturnTypeAndError<R, E> {
assert_eq!(
self.wrapper_re.0,
TypeId::of::<dyn WrapperReturnTypeAndError<R, E>>()
);
unsafe {
mem::transmute(TraitObject {
data: self.data,
vtable: self.wrapper_re.1,
})
}
}
}
unsafe impl Send for AnyWrapper<'_> {}
#[derive(Debug, Clone)]
pub struct ReceiverStats {
pub name: Cow<'static, str>,
@ -255,8 +409,8 @@ impl fmt::Display for ReceiverStats {
}
struct ReceiverContext {
resp_type_id: TypeId,
err_type_id: TypeId,
resp_type_tag: TypeTag,
err_type_tag: TypeTag,
limit: u64,
processing: AtomicU64,
need_flush: AtomicBool,
@ -272,11 +426,15 @@ impl PermitDrop for ReceiverContext {
}
}
enum Waiter<R: Message, E: StdSyncSendError> {
WithErrorType(oneshot::Sender<Result<R, Error<(), E>>>),
WithoutErrorType(oneshot::Sender<Result<R, Error>>),
Boxed(oneshot::Sender<Result<Box<dyn Message>, Error>>),
}
pub struct Receiver {
inner: Arc<dyn ReceiverTrait>,
context: Arc<ReceiverContext>,
waiters: Arc<dyn Any + Send + Sync>,
waiters_void: Arc<dyn Any + Send + Sync>,
}
impl fmt::Debug for Receiver {
@ -305,8 +463,8 @@ impl Receiver {
{
Self {
context: Arc::new(ReceiverContext {
resp_type_id: TypeId::of::<R>(),
err_type_id: TypeId::of::<E>(),
resp_type_tag: R::type_tag_(),
err_type_tag: E::type_tag_(),
limit,
processing: AtomicU64::new(0),
need_flush: AtomicBool::new(false),
@ -317,29 +475,20 @@ impl Receiver {
}),
inner: Arc::new(ReceiverWrapper {
inner,
waiters: sharded_slab::Slab::<Waiter<R, E>>::new_with_config::<SlabCfg>(),
_m: Default::default(),
}),
waiters: Arc::new(
sharded_slab::Slab::<oneshot::Sender<Result<R, Error<(), E>>>>::new_with_config::<
SlabCfg,
>(),
),
waiters_void: Arc::new(
sharded_slab::Slab::<oneshot::Sender<Result<R, Error<()>>>>::new_with_config::<
SlabCfg,
>(),
),
}
}
#[inline]
pub fn resp_type_id(&self) -> TypeId {
self.context.resp_type_id
pub fn resp_type_tag(&self) -> &TypeTag {
&self.context.resp_type_tag
}
#[inline]
pub fn err_type_id(&self) -> TypeId {
self.context.err_type_id
pub fn err_type_tag(&self) -> &TypeTag {
&self.context.err_type_tag
}
#[inline]
@ -404,7 +553,7 @@ impl Receiver {
mut permit: Permit,
) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
let receiver = any_receiver.cast_send_typed::<M>();
let res = receiver.send(mid, msg);
permit.fuse = true;
@ -418,7 +567,7 @@ impl Receiver {
#[inline]
pub fn force_send<M: Message + Clone>(&self, mid: u64, msg: M) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>();
let receiver = any_receiver.cast_send_typed::<M>();
let res = receiver.send(mid, msg);
self.context.processing.fetch_add(1, Ordering::SeqCst);
@ -429,6 +578,23 @@ impl Receiver {
res
}
pub fn send_boxed(
&self,
mid: u64,
msg: Box<dyn Message>,
mut permit: Permit,
) -> Result<(), Error<Box<dyn Message>>> {
self.context.processing.fetch_add(1, Ordering::SeqCst);
let res = self.inner.send_boxed(mid, msg);
permit.fuse = true;
if !res.is_err() {
self.context.need_flush.store(true, Ordering::SeqCst);
}
Ok(())
}
pub fn start_polling_events<R, E>(
&self,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
@ -439,22 +605,13 @@ impl Receiver {
let ctx_clone = self.context.clone();
let inner_clone = self.inner.clone();
let waiters = self
.waiters
.clone()
.downcast::<Slab<oneshot::Sender<Result<R, Error<(), E>>>>>()
.unwrap();
let waiters_void = self
.waiters_void
.clone()
.downcast::<Slab<oneshot::Sender<Result<R, Error<()>>>>>()
.unwrap();
Box::new(move |_| {
Box::pin(async move {
let any_receiver = inner_clone.poller();
let receiver = any_receiver.dyn_typed_receiver::<R, E>();
let any_receiver = inner_clone.typed();
let receiver = any_receiver.cast_recive_typed::<R, E>();
let any_wrapper = inner_clone.wrapper();
let wrapper = any_wrapper.cast_ret_and_error::<R, E>();
loop {
let event = poll_fn(move |ctx| receiver.poll_events(ctx)).await;
@ -469,16 +626,8 @@ impl Receiver {
ctx_clone.processing.fetch_sub(1, Ordering::SeqCst);
ctx_clone.response.notify_one();
if let Some(waiter) = waiters.take(mid as usize) {
if waiter.send(resp).is_err() {
error!("Response cannot be processed!");
}
} else if let Some(waiter) = waiters_void.take(mid as usize) {
if waiter.send(resp.map_err(|x| x.into_dyn())).is_err() {
error!("Response cannot be processed!");
}
} else if TypeId::of::<R>() != TypeId::of::<()>() {
warn!("Non-void response has no waiters!");
if let Err(err) = wrapper.response(mid, resp) {
error!("Response error: {}", err);
}
}
@ -489,48 +638,36 @@ impl Receiver {
})
}
#[inline]
pub(crate) fn add_response_waiter_boxed(
&self,
waiter: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<usize, Error> {
self.inner.add_response_listener(waiter)
}
#[inline]
pub(crate) fn add_response_waiter<R: Message>(
&self,
waiter: oneshot::Sender<Result<R, Error<()>>>,
) -> Option<usize> {
let idx = self
.waiters_void
.downcast_ref::<Slab<oneshot::Sender<Result<R, Error<()>>>>>()
.unwrap()
.insert(waiter)?;
waiter: oneshot::Sender<Result<R, Error>>,
) -> Result<usize, Error> {
let any_receiver = self.inner.wrapper();
let receiver = any_receiver.cast_ret_only::<R>();
Some(idx)
receiver.add_response_listener(waiter)
}
#[inline]
pub(crate) fn add_response_waiter_we<R: Message, E: StdSyncSendError>(
&self,
waiter: oneshot::Sender<Result<R, Error<(), E>>>,
) -> Option<usize> {
let idx = self
.waiters
.downcast_ref::<Slab<oneshot::Sender<Result<R, Error<(), E>>>>>()
.unwrap()
.insert(waiter)?;
) -> Result<usize, Error> {
let any_receiver = self.inner.wrapper();
let receiver = any_receiver.cast_ret_and_error::<R, E>();
Some(idx)
receiver.add_response_listener(waiter)
}
// #[inline]
// pub(crate) fn add_response_waiter_dyn(
// &self,
// waiter: oneshot::Sender<Result<R, Error<(), E>>>,
// ) -> Option<usize> {
// let idx = self
// .waiters
// .downcast_ref::<Slab<oneshot::Sender<Result<R, Error<(), E>>>>>()
// .unwrap()
// .insert(waiter)?;
// Some(idx)
// }
#[inline]
pub async fn close(&self) {
let notified = self.context.closed.notified();

View File

@ -4,8 +4,8 @@ mod sync;
use std::sync::atomic::AtomicU64;
pub use r#async::BufferUnorderedAsync;
use serde_derive::{Deserialize, Serialize};
pub use sync::BufferUnorderedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)]
pub struct BufferUnorderedStats {

View File

@ -4,8 +4,8 @@ mod sync;
use std::sync::atomic::AtomicU64;
pub use r#async::BufferUnorderedBatchedAsync;
use serde_derive::{Deserialize, Serialize};
pub use sync::BufferUnorderedBatchedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)]
pub struct BufferUnorderedBatchedStats {

View File

@ -29,7 +29,9 @@ where
}
#[inline(always)]
pub(crate) fn fix_into_iter<I, T: IntoIterator<Item = I> + Send>(x: T) -> impl IntoIterator<Item = I> + Send {
pub(crate) fn fix_into_iter<I, T: IntoIterator<Item = I> + Send>(
x: T,
) -> impl IntoIterator<Item = I> + Send {
x
}

View File

@ -9,7 +9,7 @@ use crate::{
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, fix_into_iter, Request},
receivers::{fix_into_iter, fix_type, Request},
AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
};

View File

@ -4,9 +4,8 @@ mod sync;
use std::sync::atomic::AtomicU64;
pub use r#async::SynchronizedBatchedAsync;
use serde_derive::{Deserialize, Serialize};
pub use sync::SynchronizedBatchedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)]
pub struct SynchronizedBatchedStats {

View File

@ -9,7 +9,7 @@ use crate::{
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, fix_into_iter, Request},
receivers::{fix_into_iter, fix_type, Request},
BatchSynchronizedHandler, Bus, Message, Untyped,
};

View File

@ -4,8 +4,8 @@ mod sync;
use std::sync::atomic::AtomicU64;
pub use r#async::SynchronizedAsync;
use serde_derive::{Deserialize, Serialize};
pub use sync::SynchronizedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)]
pub struct SynchronizedStats {

View File

@ -1,14 +1,25 @@
use std::{any::TypeId, borrow::Cow, collections::HashMap, sync::atomic::{AtomicU64, Ordering}};
use std::{
any::TypeId,
borrow::Cow,
collections::HashMap,
sync::atomic::{AtomicU64, Ordering},
};
use tokio::sync::oneshot::Sender;
use sharded_slab::Slab;
use tokio::sync::oneshot::Sender;
use crate::{Bus, Message, envelop::SafeMessage, error::{Error, SendError}, receiver::Permit};
use crate::{
error::{Error, SendError},
receiver::Permit,
Bus, Message,
};
pub trait RelayTrait {
// fn handle_message(&self, mid: u64, msg: &dyn SafeMessage, tx: Option<Sender<>>, bus: &Bus);
fn start_relay(&self, bus: &Bus) -> Result<(), Error> ;
fn stop_relay(&self);
type Context;
// fn handle_message(&self, mid: u64, msg: &dyn SafeMessage, ctx: Self::Context, bus: &Bus);
// fn handle_request(&self, mid: u64, msg: &dyn SafeMessage, ctx: Self::Context, bus: &Bus);
fn start_relay(&self, bus: &Bus) -> Result<Self::Context, Error>;
fn stop_relay(&self, ctx: Self::Context);
}
pub struct Relay {
@ -119,4 +130,4 @@ impl Relay {
// warn!("flush failed!");
// }
// }
}
}