From 13d1d48a70744a1ce114108a6454a9c8e88670be Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Fri, 16 Jul 2021 20:21:27 +0400 Subject: [PATCH] Relays: Message and SharedMessage; messagebus_derive --- Cargo.toml | 23 +- derive/Cargo.toml | 12 + derive/src/lib.rs | 204 +++++++++ examples/demo_async.rs | 92 ++-- examples/demo_backpressure.rs | 46 +- examples/demo_batch.rs | 52 ++- examples/demo_boxed.rs | 211 +++++++++ examples/demo_relay.rs | 2 +- examples/demo_req_resp.rs | 154 ++++--- examples/demo_shared.rs | 53 +++ examples/demo_slow.rs | 46 +- examples/demo_sync_batch.rs | 44 +- examples/non_sync.rs | 65 +-- src/builder.rs | 62 ++- src/envelop.rs | 349 +++++++++++++-- src/error.rs | 78 +++- src/handler.rs | 12 +- src/lib.rs | 182 +++++--- src/receiver.rs | 401 ++++++++++++------ src/receivers/buffer_unordered/mod.rs | 2 +- src/receivers/buffer_unordered_batched/mod.rs | 2 +- src/receivers/mod.rs | 4 +- src/receivers/synchronize_batched/async.rs | 2 +- src/receivers/synchronize_batched/mod.rs | 3 +- src/receivers/synchronize_batched/sync.rs | 2 +- src/receivers/synchronized/mod.rs | 2 +- src/relay.rs | 25 +- 27 files changed, 1673 insertions(+), 457 deletions(-) create mode 100644 derive/Cargo.toml create mode 100644 derive/src/lib.rs create mode 100644 examples/demo_boxed.rs create mode 100644 examples/demo_shared.rs diff --git a/Cargo.toml b/Cargo.toml index 04e5b8d..9e1fc72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,19 +11,22 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] edition = "2018" [dependencies] +messagebus_derive = { path = "./derive" } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] } -parking_lot = "0.11.1" -async-trait = "0.1.42" -futures = "0.3.8" -smallvec = "1.6.1" -log = "0.4.14" -sharded-slab = "0.1.1" -thiserror = "1.0.25" -erased-serde = "0.3.16" -serde = "1.0.126" -serde_derive = "1.0.126" +parking_lot = "0.11" +async-trait = "0.1" +futures = "0.3" +smallvec = "1.6" +log = "0.4" +sharded-slab = "0.1" +thiserror = "1" +erased-serde = "0.3" +serde = "1" +serde_derive = "1" + [dev-dependencies] anyhow = "1.0.41" env_logger = "0.8.4" +serde_json = "1.0.64" tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] } diff --git a/derive/Cargo.toml b/derive/Cargo.toml new file mode 100644 index 0000000..012bfcc --- /dev/null +++ b/derive/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "messagebus_derive" +version = "0.1.0" +edition = "2018" + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +quote = "1" +syn = "1" diff --git a/derive/src/lib.rs b/derive/src/lib.rs new file mode 100644 index 0000000..5fdd195 --- /dev/null +++ b/derive/src/lib.rs @@ -0,0 +1,204 @@ +#![recursion_limit="128"] + +extern crate proc_macro; + +use proc_macro::TokenStream; +use proc_macro2::TokenTree; +use syn::{LitStr, Result, parenthesized}; +use syn::{DeriveInput, parse_macro_input, punctuated::Punctuated, token::Comma}; +use syn::parse::{Parse, ParseStream, Parser}; +use quote::quote; + +fn shared_part(_: &syn::Ident, has_shared: bool) -> proc_macro2::TokenStream { + if has_shared { + quote! { + fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {Some(self)} + fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{Some(self)} + fn as_shared_boxed(self: std::boxed::Box) -> Option>{Some(self)} + fn as_shared_arc(self: std::sync::Arc) -> Option>{Some(self)} + } + } else { + quote! { + fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {None} + fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{None} + fn as_shared_boxed(self: std::boxed::Box) -> Option>{None} + fn as_shared_arc(self: std::sync::Arc) -> Option>{None} + } + } +} + +fn clone_part(name: &syn::Ident, has_clone: bool) -> proc_macro2::TokenStream { + if has_clone { + quote! { + fn try_clone_into(&self, into: &mut dyn core::any::Any) -> bool { + let into = if let Some(inner) = into.downcast_mut::>() { + inner + } else { + return false; + }; + + into.replace(self.clone()); + true + } + fn try_clone_boxed(&self) -> std::option::Option>{ + Some(Box::new(self.clone())) + } + } + } else { + quote! { + fn try_clone_into(&self, into: &mut dyn core::any::Any) -> bool {false} + fn try_clone_boxed(&self) -> std::option::Option>{ None } + } + } +} + +fn type_tag_part(name: &syn::Ident, type_tag: Option) -> proc_macro2::TokenStream { + if let Some(type_tag) = type_tag { + quote! { + impl messagebus::TypeTagged for #name { + fn type_tag_() -> messagebus::TypeTag { #type_tag.into() } + fn type_tag(&self) -> messagebus::TypeTag { #type_tag.into() } + fn type_name(&self) -> std::borrow::Cow { #type_tag.into() } + } + } + } else { + quote! { + impl messagebus::TypeTagged for #name { + fn type_tag_() -> messagebus::TypeTag { std::any::type_name::().into() } + fn type_tag(&self) -> messagebus::TypeTag { std::any::type_name::().into() } + fn type_name(&self) -> std::borrow::Cow { std::any::type_name::().into() } + } + } + } +} + +struct TypeTag { + pub inner: syn::LitStr, +} + +impl Parse for TypeTag { + fn parse(input: ParseStream) -> Result { + let mut inner = None; + let content; + parenthesized!(content in input); + let punctuated = Punctuated::::parse_terminated(&content)?; + + for pair in punctuated.pairs() { + inner = Some(pair.into_value()); + break; + } + + Ok(TypeTag { inner: inner.unwrap().to_owned() }) + } +} + +#[derive(Default)] +struct Tags { + has_clone: bool, + has_shared: bool, +} + +impl Tags { + pub fn add(&mut self, other: Tags) { + self.has_clone = self.has_clone || other.has_clone; + self.has_shared = self.has_shared || other.has_shared; + } +} + +impl Parse for Tags { + fn parse(input: ParseStream) -> Result { + let mut has_shared = false; + let mut has_clone = false; + + let content; + parenthesized!(content in input); + let punctuated = Punctuated::::parse_terminated(&content)?; + + for pair in punctuated.pairs() { + match pair.into_value().to_string().as_str() { + "shared" => has_shared = true, + "clone" => has_clone = true, + _ => () + } + } + + Ok(Tags { + has_clone, + has_shared, + }) + } +} + +#[proc_macro_derive(Message, attributes(type_tag, message))] +pub fn derive_message(input: TokenStream) -> TokenStream { + let mut tags = Tags::default(); + let mut type_tag = None; + + let ast: DeriveInput = syn::parse(input).unwrap(); + let name = &ast.ident; + let attrs = ast.attrs; + for attr in attrs { + if let Some(i) = attr.path.get_ident() { + match i.to_string().as_str() { + "message" => { + let tt: Tags = syn::parse2(attr.tokens).unwrap(); + tags.add(tt); + } + + "type_tag" => { + let tt: TypeTag = syn::parse2(attr.tokens).unwrap(); + type_tag = Some(tt.inner); + } + + _ => () + } + } + } + + let type_tag_part = type_tag_part(name, type_tag); + let shared_part = shared_part(name, false); + let clone_part = clone_part(name, false); + + let tokens = quote! { + #type_tag_part + + impl messagebus::Message for #name { + fn as_any_ref(&self) -> &dyn core::any::Any {self} + fn as_any_mut(&mut self) -> &mut dyn core::any::Any {self} + fn as_any_boxed(self: std::boxed::Box) -> std::boxed::Box {self} + fn as_any_arc(self: std::sync::Arc) -> std::sync::Arc {self} + + #shared_part + #clone_part + } + }; + + tokens.into() +} + +#[proc_macro_derive(Error, attributes(type_tag))] +pub fn derive_error(input: TokenStream) -> TokenStream { + let mut type_tag = None; + let ast: DeriveInput = syn::parse(input).unwrap(); + let name = &ast.ident; + for attr in ast.attrs { + if let Some(i) = attr.path.get_ident() { + match i.to_string().as_str() { + "type_tag" => { + let tt: TypeTag = syn::parse2(attr.tokens).unwrap(); + type_tag = Some(tt.inner); + } + + _ => () + } + } + } + + let type_tag_part = type_tag_part(name, type_tag); + + let tokens = quote! { + #type_tag_part + }; + + tokens.into() +} \ No newline at end of file diff --git a/examples/demo_async.rs b/examples/demo_async.rs index 4a6d96b..2d44ae6 100644 --- a/examples/demo_async.rs +++ b/examples/demo_async.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use messagebus::{error, AsyncHandler, Bus, Handler, Message}; +use messagebus::{derive::Message, error, AsyncHandler, Bus, Handler, Message, TypeTagged}; use thiserror::Error; -#[derive(Debug, Error)] +#[derive(Debug, Error, messagebus::derive::Error)] enum Error { #[error("Error({0})")] Error(anyhow::Error), @@ -17,15 +17,35 @@ impl From> for Error { struct TmpReceiver; struct TmpReceiver2; +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgF32(f32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU16(u16); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU32(u32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI32(i32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI16(i16); + #[async_trait] -impl AsyncHandler for TmpReceiver { +impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&self, msg: f32, bus: &Bus) -> Result { - bus.send(1u16).await?; + async fn handle(&self, msg: MsgF32, bus: &Bus) -> Result { + bus.send(MsgU16(1)).await?; - println!("TmpReceiver ---> f32 {}", msg); + println!("TmpReceiver ---> {:?} {}", msg, msg.type_tag()); Ok(()) } @@ -38,13 +58,13 @@ impl AsyncHandler for TmpReceiver { } #[async_trait] -impl AsyncHandler for TmpReceiver { +impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&self, msg: u16, bus: &Bus) -> Result { - bus.send(2u32).await?; - println!("TmpReceiver ---> u16 {}", msg); + async fn handle(&self, msg: MsgU16, bus: &Bus) -> Result { + bus.send(MsgU32(2)).await?; + println!("TmpReceiver ---> {:?}", msg); Ok(()) } @@ -57,13 +77,13 @@ impl AsyncHandler for TmpReceiver { } #[async_trait] -impl AsyncHandler for TmpReceiver { +impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&self, msg: u32, bus: &Bus) -> Result { - bus.send(3i32).await?; - println!("TmpReceiver ---> u32 {}", msg); + async fn handle(&self, msg: MsgU32, bus: &Bus) -> Result { + bus.send(MsgI32(3)).await?; + println!("TmpReceiver ---> {:?}", msg); Ok(()) } @@ -75,13 +95,13 @@ impl AsyncHandler for TmpReceiver { } #[async_trait] -impl AsyncHandler for TmpReceiver { +impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&self, msg: i32, bus: &Bus) -> Result { - bus.send(4i16).await?; - println!("TmpReceiver ---> i32 {}", msg); + async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result { + bus.send(MsgI16(4)).await?; + println!("TmpReceiver ---> {:?}", msg); Ok(()) } @@ -94,12 +114,12 @@ impl AsyncHandler for TmpReceiver { } #[async_trait] -impl AsyncHandler for TmpReceiver { +impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&self, msg: i16, _bus: &Bus) -> Result { - println!("TmpReceiver ---> i16 {}", msg); + async fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result { + println!("TmpReceiver ---> {:?}", msg); Ok(()) } @@ -111,14 +131,14 @@ impl AsyncHandler for TmpReceiver { } #[async_trait] -impl AsyncHandler for TmpReceiver2 { +impl AsyncHandler for TmpReceiver2 { type Error = Error; type Response = (); - async fn handle(&self, msg: i32, bus: &Bus) -> Result { - println!("!!!! TmpReceiver2: ---> 2 i32 {}", msg); + async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result { + println!("TmpReceiver2: ---> {:?}", msg); - bus.send(5i16).await?; + bus.send(MsgI16(5)).await?; Ok(()) } @@ -129,12 +149,12 @@ impl AsyncHandler for TmpReceiver2 { } } -impl Handler for TmpReceiver2 { +impl Handler for TmpReceiver2 { type Error = Error; type Response = (); - fn handle(&self, msg: i16, _bus: &Bus) -> Result { - println!("TmpReceiver2: ---> 2 i16 {}", msg); + fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result { + println!("TmpReceiver2: ---> {:?}", msg); Ok(()) } @@ -152,19 +172,19 @@ async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .register(TmpReceiver2) - .subscribe_async::(8, Default::default()) - .subscribe_sync::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_sync::(8, Default::default()) .done() .build(); - b.send(0f32).await.unwrap(); + b.send(MsgF32(0.)).await.unwrap(); println!("flush"); b.flush().await; diff --git a/examples/demo_backpressure.rs b/examples/demo_backpressure.rs index fe32649..18fb760 100644 --- a/examples/demo_backpressure.rs +++ b/examples/demo_backpressure.rs @@ -1,8 +1,11 @@ use async_trait::async_trait; -use messagebus::{error, receivers, AsyncHandler, Bus, Message}; +use messagebus::{ + derive::{Error as MbError, Message}, + error, receivers, AsyncHandler, Bus, Message, +}; use thiserror::Error; -#[derive(Debug, Error)] +#[derive(Debug, Error, MbError)] enum Error { #[error("Error({0})")] Error(anyhow::Error), @@ -14,15 +17,18 @@ impl From> for Error { } } +#[derive(Debug, Clone, Message)] +struct MsgF32(pub f32); + struct TmpReceiver; #[async_trait] -impl AsyncHandler for TmpReceiver { +impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&self, msg: f32, _bus: &Bus) -> Result { - println!("---> f32 {}", msg); + async fn handle(&self, msg: MsgF32, _bus: &Bus) -> Result { + println!("---> f32 {:?}", msg); std::thread::sleep(std::time::Duration::from_secs(1)); @@ -35,42 +41,48 @@ impl AsyncHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) - .subscribe_async::(1, receivers::BufferUnorderedConfig { buffer_size: 1, max_parallel: 1 }) + .subscribe_async::( + 1, + receivers::BufferUnorderedConfig { + buffer_size: 1, + max_parallel: 1, + }, + ) .done() .build(); println!("sending 1"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 2"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 3"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 4"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 5"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 6"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 7"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 8"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 9"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 10"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("sending 11"); - b.send(32f32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); println!("flush"); b.flush().await; diff --git a/examples/demo_batch.rs b/examples/demo_batch.rs index 438bcbb..3988523 100644 --- a/examples/demo_batch.rs +++ b/examples/demo_batch.rs @@ -1,10 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; -use messagebus::{error, AsyncBatchHandler, BatchHandler, Bus, Message}; +use messagebus::{ + derive::{Error as MbError, Message}, + error, AsyncBatchHandler, BatchHandler, Bus, Message, +}; use thiserror::Error; -#[derive(Debug, Error, Clone)] +#[derive(Debug, Error, Clone, MbError)] enum Error { #[error("Error({0})")] Error(Arc), @@ -15,29 +18,42 @@ impl From> for Error { Self::Error(Arc::new(err.into())) } } + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI32(i32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI16(i16); + struct TmpReceiver; #[async_trait] -impl AsyncBatchHandler for TmpReceiver { +impl AsyncBatchHandler for TmpReceiver { type Error = Error; type Response = (); - type InBatch = Vec; + type InBatch = Vec; type OutBatch = Vec<()>; - async fn handle(&self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { + async fn handle( + &self, + msg: Vec, + _bus: &Bus, + ) -> Result, Self::Error> { println!("---> [i32; {}] {:?}", msg.len(), msg); Ok(vec![]) } } -impl BatchHandler for TmpReceiver { +impl BatchHandler for TmpReceiver { type Error = Error; type Response = (); - type InBatch = Vec; + type InBatch = Vec; type OutBatch = Vec<()>; - fn handle(&self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { + fn handle(&self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { println!("---> [i16; {}] {:?}", msg.len(), msg); Ok(vec![]) } @@ -47,22 +63,22 @@ impl BatchHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) - .subscribe_batch_async::(16, Default::default()) - .subscribe_batch_sync::(16, Default::default()) + .subscribe_batch_async::(16, Default::default()) + .subscribe_batch_sync::(16, Default::default()) .done() .build(); for i in 1..100i32 { - b.send(i).await.unwrap(); + b.send(MsgI32(i)).await.unwrap(); } - b.send(1i16).await.unwrap(); - b.send(2i16).await.unwrap(); - b.send(3i16).await.unwrap(); - b.send(4i16).await.unwrap(); - b.send(5i16).await.unwrap(); - b.send(6i16).await.unwrap(); - b.send(7i16).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgI16(2i16)).await.unwrap(); + b.send(MsgI16(3i16)).await.unwrap(); + b.send(MsgI16(4i16)).await.unwrap(); + b.send(MsgI16(5i16)).await.unwrap(); + b.send(MsgI16(6i16)).await.unwrap(); + b.send(MsgI16(7i16)).await.unwrap(); println!("flush"); b.flush().await; diff --git a/examples/demo_boxed.rs b/examples/demo_boxed.rs new file mode 100644 index 0000000..4d033e7 --- /dev/null +++ b/examples/demo_boxed.rs @@ -0,0 +1,211 @@ +use async_trait::async_trait; +use messagebus::{ + derive::{Error as MbError, Message}, + error, AsyncHandler, Bus, Handler, Message, +}; +use thiserror::Error; + +#[derive(Debug, Error, MbError)] +enum Error { + #[error("Error({0})")] + Error(anyhow::Error), +} + +impl From> for Error { + fn from(err: error::Error) -> Self { + Self::Error(err.into()) + } +} + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgF32(f32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU16(u16); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU32(u32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI32(i32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI16(i16); + +struct TmpReceiver; +struct TmpReceiver2; + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: MsgF32, bus: &Bus) -> Result { + bus.send(MsgU16(1u16)).await?; + + println!("TmpReceiver ---> {:?}", msg); + + Ok(()) + } + + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver f32: sync"); + + Ok(()) + } +} + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: MsgU16, bus: &Bus) -> Result { + bus.send(MsgU32(2u32)).await?; + println!("TmpReceiver ---> {:?}", msg); + + Ok(()) + } + + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver u16: sync"); + + Ok(()) + } +} + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: MsgU32, bus: &Bus) -> Result { + bus.send(MsgI32(3i32)).await?; + println!("TmpReceiver ---> {:?}", msg); + + Ok(()) + } + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver u32: sync"); + + Ok(()) + } +} + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result { + bus.send(MsgI16(4i16)).await?; + println!("TmpReceiver ---> {:?}", msg); + + Ok(()) + } + + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver i32: sync"); + + Ok(()) + } +} + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result { + println!("TmpReceiver ---> {:?}", msg); + + Ok(()) + } + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver i16: sync"); + + Ok(()) + } +} + +#[async_trait] +impl AsyncHandler for TmpReceiver2 { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result { + println!("TmpReceiver2: ---> {:?}", msg); + + bus.send(MsgI16(5i16)).await?; + + Ok(()) + } + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver2: i32: sync"); + + Ok(()) + } +} + +impl Handler for TmpReceiver2 { + type Error = Error; + type Response = (); + + fn handle(&self, msg: MsgI16, _bus: &Bus) -> Result { + println!("TmpReceiver2: ---> {:?}", msg); + + Ok(()) + } + + fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver2: i16: sync"); + + Ok(()) + } +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let (b, poller) = Bus::build() + .register(TmpReceiver) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .done() + .register(TmpReceiver2) + .subscribe_async::(8, Default::default()) + .subscribe_sync::(8, Default::default()) + .done() + .build(); + + b.send(MsgF32(0f32)).await.unwrap(); + println!("flush"); + + b.flush().await; + + println!("sending boxed variant"); + + b.send_boxed(Box::new(MsgF32(0f32)), Default::default()) + .await + .unwrap(); + + println!("flush"); + b.flush().await; + + println!("close"); + b.close().await; + + println!("closed"); + + poller.await; + println!("[done]"); +} diff --git a/examples/demo_relay.rs b/examples/demo_relay.rs index fcb767f..ae2705a 100644 --- a/examples/demo_relay.rs +++ b/examples/demo_relay.rs @@ -28,7 +28,7 @@ impl Handler for TmpReceiver { fn module() -> Module { Module::new() .register(TmpReceiver) - .subscribe_sync::(8, Default::default()) + .subscribe_sync::(8, Default::default()) .done() } diff --git a/examples/demo_req_resp.rs b/examples/demo_req_resp.rs index f74037e..1d2b26b 100644 --- a/examples/demo_req_resp.rs +++ b/examples/demo_req_resp.rs @@ -2,12 +2,13 @@ use core::f32; use async_trait::async_trait; use messagebus::{ + derive::Message, error::{self, StdSyncSendError}, AsyncHandler, Bus, Message, }; use thiserror::Error; -#[derive(Debug, Error)] +#[derive(Debug, Error, messagebus::derive::Error)] enum Error { #[error("Error({0})")] Error(anyhow::Error), @@ -19,19 +20,55 @@ impl From> for Error { } } +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgF64(pub f64); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgF32(pub f32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI32(pub i32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU32(pub u32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU16(pub u16); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI16(pub i16); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgU8(pub u8); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI8(pub i8); + struct TmpReceiver1; struct TmpReceiver2; #[async_trait] -impl AsyncHandler for TmpReceiver1 { +impl AsyncHandler for TmpReceiver1 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: i32, bus: &Bus) -> Result { - let resp1 = bus.request::<_, f32>(10i16, Default::default()).await?; - let resp2 = bus.request::<_, f32>(20u16, Default::default()).await?; + async fn handle(&self, msg: MsgI32, bus: &Bus) -> Result { + let resp1 = bus + .request::<_, MsgF32>(MsgI16(10i16), Default::default()) + .await?; + let resp2 = bus + .request::<_, MsgF32>(MsgU16(20u16), Default::default()) + .await?; - Ok(msg as f32 + resp1 + resp2) + Ok(MsgF32(msg.0 as f32 + resp1.0 + resp2.0)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { @@ -42,12 +79,12 @@ impl AsyncHandler for TmpReceiver1 { } #[async_trait] -impl AsyncHandler for TmpReceiver1 { +impl AsyncHandler for TmpReceiver1 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: u32, _bus: &Bus) -> Result { - Ok(msg as f32) + async fn handle(&self, msg: MsgU32, _bus: &Bus) -> Result { + Ok(MsgF32(msg.0 as _)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { println!("TmpReceiver1 u32: sync"); @@ -57,15 +94,19 @@ impl AsyncHandler for TmpReceiver1 { } #[async_trait] -impl AsyncHandler for TmpReceiver1 { +impl AsyncHandler for TmpReceiver1 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: i16, bus: &Bus) -> Result { - let resp1 = bus.request::<_, f32>(1i8, Default::default()).await?; - let resp2 = bus.request::<_, f32>(2u8, Default::default()).await?; + async fn handle(&self, msg: MsgI16, bus: &Bus) -> Result { + let resp1 = bus + .request::<_, MsgF32>(MsgI8(1i8), Default::default()) + .await?; + let resp2 = bus + .request::<_, MsgF32>(MsgU8(2u8), Default::default()) + .await?; - Ok(msg as f32 + resp1 + resp2) + Ok(MsgF32(msg.0 as f32 + resp1.0 + resp2.0)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { @@ -76,12 +117,12 @@ impl AsyncHandler for TmpReceiver1 { } #[async_trait] -impl AsyncHandler for TmpReceiver1 { +impl AsyncHandler for TmpReceiver1 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: u16, _bus: &Bus) -> Result { - Ok(msg as f32) + async fn handle(&self, msg: MsgU16, _bus: &Bus) -> Result { + Ok(MsgF32(msg.0 as _)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { @@ -92,12 +133,12 @@ impl AsyncHandler for TmpReceiver1 { } #[async_trait] -impl AsyncHandler for TmpReceiver1 { +impl AsyncHandler for TmpReceiver1 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: i8, _bus: &Bus) -> Result { - Ok(msg as f32) + async fn handle(&self, msg: MsgI8, _bus: &Bus) -> Result { + Ok(MsgF32(msg.0 as _)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { @@ -108,12 +149,12 @@ impl AsyncHandler for TmpReceiver1 { } #[async_trait] -impl AsyncHandler for TmpReceiver1 { +impl AsyncHandler for TmpReceiver1 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: u8, _bus: &Bus) -> Result { - Ok(msg as f32) + async fn handle(&self, msg: MsgU8, _bus: &Bus) -> Result { + Ok(MsgF32(msg.0 as _)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { println!("TmpReceiver1 u8: sync"); @@ -123,16 +164,25 @@ impl AsyncHandler for TmpReceiver1 { } #[async_trait] -impl AsyncHandler for TmpReceiver2 { +impl AsyncHandler for TmpReceiver2 { type Error = Error; - type Response = f64; + type Response = MsgF64; - async fn handle(&self, msg: f64, bus: &Bus) -> Result { - let resp1 = bus.request::<_, f32>(100i32, Default::default()).await? as f64; - let resp2 = bus.request::<_, f32>(200u32, Default::default()).await? as f64; - let resp3 = bus.request::<_, f32>(300f32, Default::default()).await? as f64; + async fn handle(&self, msg: MsgF64, bus: &Bus) -> Result { + let resp1 = bus + .request::<_, MsgF32>(MsgI32(100i32), Default::default()) + .await? + .0 as f64; + let resp2 = bus + .request::<_, MsgF32>(MsgU32(200u32), Default::default()) + .await? + .0 as f64; + let resp3 = bus + .request::<_, MsgF32>(MsgF32(300f32), Default::default()) + .await? + .0 as f64; - Ok(msg + resp1 + resp2 + resp3) + Ok(MsgF64(msg.0 + resp1 + resp2 + resp3)) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { @@ -143,11 +193,11 @@ impl AsyncHandler for TmpReceiver2 { } #[async_trait] -impl AsyncHandler for TmpReceiver2 { +impl AsyncHandler for TmpReceiver2 { type Error = Error; - type Response = f32; + type Response = MsgF32; - async fn handle(&self, msg: f32, _bus: &Bus) -> Result { + async fn handle(&self, msg: MsgF32, _bus: &Bus) -> Result { Ok(msg) } async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { @@ -161,22 +211,28 @@ impl AsyncHandler for TmpReceiver2 { async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver1) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .register(TmpReceiver2) - .subscribe_async::(8, Default::default()) - .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .build(); println!( - "{:?}", - b.request_local_we::<_, f64, Error>(1000f64, Default::default()) + "plain {:?}", + b.request_we::<_, MsgF64, Error>(MsgF64(1000f64), Default::default()) + .await + ); + + println!( + "boxed {:?}", + b.request_boxed(Box::new(MsgF64(1000.)), Default::default()) .await ); diff --git a/examples/demo_shared.rs b/examples/demo_shared.rs new file mode 100644 index 0000000..44cad9a --- /dev/null +++ b/examples/demo_shared.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; +use messagebus::{error, AsyncHandler, Bus, Message}; +use thiserror::Error; + +#[derive(Debug, Error)] +enum Error { + #[error("Error({0})")] + Error(anyhow::Error), +} + +impl From> for Error { + fn from(err: error::Error) -> Self { + Self::Error(err.into()) + } +} + +#[derive(Debug, Clone)] +struct Msg; + +struct TmpReceiver; + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = (); + + async fn handle(&self, msg: Msg, _bus: &Bus) -> Result { + println!("---> f32 {:?}", msg); + Ok(()) + } +} + +#[tokio::main] +async fn main() { + let (b, poller) = Bus::build() + .register(TmpReceiver) + .subscribe_local_async::(8, Default::default()) + .done() + .build(); + + b.send_local_one(Msg).await.unwrap(); + + println!("flushing"); + b.flush().await; + + println!("closing"); + b.close().await; + + println!("closed"); + poller.await; + + println!("[done]"); +} diff --git a/examples/demo_slow.rs b/examples/demo_slow.rs index 83d665b..973db87 100644 --- a/examples/demo_slow.rs +++ b/examples/demo_slow.rs @@ -1,7 +1,10 @@ -use messagebus::{error, Bus, Handler, Message, Module}; +use messagebus::{ + derive::{Error as MbError, Message}, + error, Bus, Handler, Message, Module, +}; use thiserror::Error; -#[derive(Debug, Error)] +#[derive(Debug, Error, MbError)] enum Error { #[error("Error({0})")] Error(anyhow::Error), @@ -13,14 +16,23 @@ impl From> for Error { } } +#[derive(Debug, Clone, Message)] +struct MsgF32(pub f32); + +#[derive(Debug, Clone, Message)] +struct MsgU32(pub u32); + +#[derive(Debug, Clone, Message)] +struct MsgU16(pub u16); + struct TmpReceiver; -impl Handler for TmpReceiver { +impl Handler for TmpReceiver { type Error = Error; type Response = (); - fn handle(&self, msg: f32, _bus: &Bus) -> Result { - println!("---> f32 {}", msg); + fn handle(&self, msg: MsgF32, _bus: &Bus) -> Result { + println!("---> f32 {:?}", msg); std::thread::sleep(std::time::Duration::from_secs(5)); @@ -30,22 +42,22 @@ impl Handler for TmpReceiver { } } -impl Handler for TmpReceiver { +impl Handler for TmpReceiver { type Error = Error; type Response = (); - fn handle(&self, msg: u16, _bus: &Bus) -> Result { - println!("---> u16 {}", msg); + fn handle(&self, msg: MsgU16, _bus: &Bus) -> Result { + println!("---> u16 {:?}", msg); Ok(()) } } -impl Handler for TmpReceiver { +impl Handler for TmpReceiver { type Error = Error; type Response = (); - fn handle(&self, msg: u32, _bus: &Bus) -> Result { - println!("---> u32 {}", msg); + fn handle(&self, msg: MsgU32, _bus: &Bus) -> Result { + println!("---> u32 {:?}", msg); Ok(()) } } @@ -53,9 +65,9 @@ impl Handler for TmpReceiver { fn module() -> Module { Module::new() .register(TmpReceiver) - .subscribe_sync::(8, Default::default()) - .subscribe_sync::(8, Default::default()) - .subscribe_sync::(8, Default::default()) + .subscribe_sync::(8, Default::default()) + .subscribe_sync::(8, Default::default()) + .subscribe_sync::(8, Default::default()) .done() } @@ -63,9 +75,9 @@ fn module() -> Module { async fn main() { let (b, poller) = Bus::build().add_module(module()).build(); - b.send(32f32).await.unwrap(); - b.send(11u16).await.unwrap(); - b.send(32u32).await.unwrap(); + b.send(MsgF32(32f32)).await.unwrap(); + b.send(MsgU16(11u16)).await.unwrap(); + b.send(MsgU32(32u32)).await.unwrap(); println!("flush"); b.flush().await; diff --git a/examples/demo_sync_batch.rs b/examples/demo_sync_batch.rs index 8c57999..841e37b 100644 --- a/examples/demo_sync_batch.rs +++ b/examples/demo_sync_batch.rs @@ -2,11 +2,12 @@ use std::sync::Arc; use async_trait::async_trait; use messagebus::{ + derive::{Error as MbError, Message}, error, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Bus, Message, }; use thiserror::Error; -#[derive(Debug, Error, Clone)] +#[derive(Debug, Error, Clone, MbError)] enum Error { #[error("Error({0})")] Error(Arc), @@ -18,19 +19,26 @@ impl From> for Error { } } +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI32(i32); + +#[derive(Debug, Clone, Message)] +#[message(clone)] +struct MsgI16(i16); + struct TmpReceiver; #[async_trait] -impl AsyncBatchSynchronizedHandler for TmpReceiver { +impl AsyncBatchSynchronizedHandler for TmpReceiver { type Error = Error; type Response = (); - type InBatch = Vec; + type InBatch = Vec; type OutBatch = Vec<()>; - async fn handle( &mut self, - msg: Vec, + msg: Vec, _bus: &Bus, ) -> Result, Self::Error> { println!("---> [i32; {}] {:?}", msg.len(), msg); @@ -39,13 +47,13 @@ impl AsyncBatchSynchronizedHandler for TmpReceiver { } } -impl BatchSynchronizedHandler for TmpReceiver { +impl BatchSynchronizedHandler for TmpReceiver { type Error = Error; type Response = (); - type InBatch = Vec; + type InBatch = Vec; type OutBatch = Vec<()>; - fn handle(&mut self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { + fn handle(&mut self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { println!("---> [i16; {}] {:?}", msg.len(), msg); Ok(vec![]) } @@ -55,22 +63,22 @@ impl BatchSynchronizedHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register_unsync(TmpReceiver) - .subscribe_batch_async::(16, Default::default()) - .subscribe_batch_sync::(16, Default::default()) + .subscribe_batch_async::(16, Default::default()) + .subscribe_batch_sync::(16, Default::default()) .done() .build(); for i in 1..100i32 { - b.send(i).await.unwrap(); + b.send(MsgI32(i)).await.unwrap(); } - b.send(1i16).await.unwrap(); - b.send(2i16).await.unwrap(); - b.send(3i16).await.unwrap(); - b.send(4i16).await.unwrap(); - b.send(5i16).await.unwrap(); - b.send(6i16).await.unwrap(); - b.send(7i16).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgI16(2i16)).await.unwrap(); + b.send(MsgI16(3i16)).await.unwrap(); + b.send(MsgI16(4i16)).await.unwrap(); + b.send(MsgI16(5i16)).await.unwrap(); + b.send(MsgI16(6i16)).await.unwrap(); + b.send(MsgI16(7i16)).await.unwrap(); println!("flush"); b.flush().await; diff --git a/examples/non_sync.rs b/examples/non_sync.rs index 552c644..83fe489 100644 --- a/examples/non_sync.rs +++ b/examples/non_sync.rs @@ -1,8 +1,11 @@ use async_trait::async_trait; -use messagebus::{error, receivers, AsyncSynchronizedHandler, Bus, Message, SynchronizedHandler}; +use messagebus::{ + derive::{Error as MbError, Message}, + error, AsyncSynchronizedHandler, Bus, Message, SynchronizedHandler, +}; use thiserror::Error; -#[derive(Debug, Error)] +#[derive(Debug, Error, MbError)] enum Error { #[error("Error({0})")] Error(anyhow::Error), @@ -14,15 +17,21 @@ impl From> for Error { } } +#[derive(Debug, Clone, Message)] +struct MsgF32(pub f32); + +#[derive(Debug, Clone, Message)] +struct MsgI16(pub i16); + struct TmpReceiver; -impl SynchronizedHandler for TmpReceiver { +impl SynchronizedHandler for TmpReceiver { type Error = Error; type Response = (); - fn handle(&mut self, msg: f32, _bus: &Bus) -> Result { + fn handle(&mut self, msg: MsgF32, _bus: &Bus) -> Result { // std::thread::sleep(std::time::Duration::from_millis(100)); - println!("---> f32 {}", msg); + println!("---> f32 {:?}", msg); println!("done"); Ok(()) @@ -30,13 +39,13 @@ impl SynchronizedHandler for TmpReceiver { } #[async_trait] -impl AsyncSynchronizedHandler for TmpReceiver { +impl AsyncSynchronizedHandler for TmpReceiver { type Error = Error; type Response = (); - async fn handle(&mut self, msg: i16, _bus: &Bus) -> Result { + async fn handle(&mut self, msg: MsgI16, _bus: &Bus) -> Result { std::thread::sleep(std::time::Duration::from_millis(100)); - println!("---> i16 {}", msg); + println!("---> i16 {:?}", msg); println!("done"); Ok(()) @@ -47,30 +56,30 @@ impl AsyncSynchronizedHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register_unsync(TmpReceiver) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) + .subscribe_sync::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .build(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); - b.send(12.0f32).await.unwrap(); - b.send(1i16).await.unwrap(); + b.send(MsgF32(12.0f32)).await.unwrap(); + b.send(MsgI16(1i16)).await.unwrap(); println!("flush"); diff --git a/src/builder.rs b/src/builder.rs index 7efac06..f19a716 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,9 +1,16 @@ -use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc}; +use std::{collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc}; use futures::{Future, FutureExt}; use tokio::sync::Mutex; -use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, SynchronizedHandler, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers}; +use crate::{ + envelop::TypeTag, + error::StdSyncSendError, + receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, + AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, + Message, SynchronizedHandler, Untyped, +}; pub trait ReceiverSubscriberBuilder: SendUntypedReceiver + SendTypedReceiver + ReciveTypedReceiver @@ -36,7 +43,7 @@ pub struct RegisterEntry { payload: B, builder: F, receivers: HashMap< - TypeId, + TypeTag, Vec<( Receiver, Box< @@ -55,7 +62,7 @@ impl RegisterEntry where F: FnMut( &mut B, - (TypeId, Receiver), + (TypeTag, Receiver), Box Pin + Send>>>, Box Pin + Send>>>, ), @@ -65,7 +72,7 @@ where for (r, poller, poller2) in v { let poller = poller(self.item.clone()); - (self.builder)(&mut self.payload, (tid, r), poller, poller2); + (self.builder)(&mut self.payload, (tid.clone(), r), poller, poller2); } } @@ -87,7 +94,7 @@ impl RegisterEntry { let receiver = Receiver::new::(queue, inner); let poller2 = receiver.start_polling_events::(); self.receivers - .entry(TypeId::of::()) + .entry(M::type_tag_()) .or_insert_with(Vec::new) .push((receiver, poller, poller2)); @@ -99,6 +106,7 @@ impl RegisterEntry { where T: SynchronizedHandler + Send + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } @@ -108,24 +116,35 @@ impl RegisterEntry { where T: AsyncSynchronizedHandler + Send + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } #[inline] - pub fn subscribe_batch_sync(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self + pub fn subscribe_batch_sync( + self, + queue: u64, + cfg: receivers::SynchronizedBatchedConfig, + ) -> Self where T: BatchSynchronizedHandler + Send + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } #[inline] - pub fn subscribe_batch_async(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self + pub fn subscribe_batch_async( + self, + queue: u64, + cfg: receivers::SynchronizedBatchedConfig, + ) -> Self where T: AsyncBatchSynchronizedHandler + Send + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } @@ -145,7 +164,7 @@ impl RegisterEntry { let receiver = Receiver::new::(queue, inner); let poller2 = receiver.start_polling_events::(); self.receivers - .entry(TypeId::of::()) + .entry(M::type_tag_()) .or_insert_with(Vec::new) .push((receiver, poller, poller2)); @@ -157,6 +176,7 @@ impl RegisterEntry { where T: Handler + Send + Sync + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } @@ -173,26 +193,36 @@ impl RegisterEntry { } #[inline] - pub fn subscribe_batch_sync(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self + pub fn subscribe_batch_sync( + self, + queue: u64, + cfg: receivers::BufferUnorderedBatchedConfig, + ) -> Self where T: BatchHandler + Send + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } #[inline] - pub fn subscribe_batch_async(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self + pub fn subscribe_batch_async( + self, + queue: u64, + cfg: receivers::BufferUnorderedBatchedConfig, + ) -> Self where T: AsyncBatchHandler + Send + 'static, M: Message, + T::Response: Message, { self.subscribe::, T::Response, T::Error>(queue, cfg) } } pub struct Module { - receivers: Vec<(TypeId, Receiver)>, + receivers: Vec<(TypeTag, Receiver)>, pollings: Vec Pin + Send>>>>, } @@ -212,7 +242,7 @@ impl Module { T, impl FnMut( &mut Self, - (TypeId, Receiver), + (TypeTag, Receiver), Box Pin + Send>>>, Box Pin + Send>>>, ), @@ -239,7 +269,7 @@ impl Module { T, impl FnMut( &mut Self, - (TypeId, Receiver), + (TypeTag, Receiver), Box Pin + Send>>>, Box Pin + Send>>>, ), @@ -285,7 +315,7 @@ impl BusBuilder { T, impl FnMut( &mut Self, - (TypeId, Receiver), + (TypeTag, Receiver), Box Pin + Send>>>, Box Pin + Send>>>, ), @@ -312,7 +342,7 @@ impl BusBuilder { T, impl FnMut( &mut Self, - (TypeId, Receiver), + (TypeTag, Receiver), Box Pin + Send>>>, Box Pin + Send>>>, ), diff --git a/src/envelop.rs b/src/envelop.rs index bcc1b89..22a66b3 100644 --- a/src/envelop.rs +++ b/src/envelop.rs @@ -1,45 +1,338 @@ -use core::any::{Any, type_name}; +use core::any::Any; use core::fmt; -use serde::{de::DeserializeOwned, Serialize}; +use std::any::type_name; +use std::borrow::Cow; +use std::sync::Arc; -pub trait Message: - fmt::Debug + Unpin + Send + Sync + 'static -{ - fn type_name(&self) -> &str; +#[derive(Debug, Copy, Clone)] +#[repr(C)] +pub struct TraitObject { + pub data: *mut (), + pub vtable: *mut (), } -impl Message for T { - fn type_name(&self) -> &str { - type_name::() +pub type TypeTag = Cow<'static, str>; + +pub trait TypeTagged { + fn type_tag_() -> TypeTag + where + Self: Sized; + + fn type_tag(&self) -> TypeTag; + fn type_name(&self) -> Cow; +} + +pub trait Message: TypeTagged + fmt::Debug + Unpin + Send + Sync + 'static { + fn as_any_ref(&self) -> &dyn Any; + fn as_any_mut(&mut self) -> &mut dyn Any; + fn as_any_boxed(self: Box) -> Box; + fn as_any_arc(self: Arc) -> Arc; + + fn as_shared_ref(&self) -> Option<&dyn SharedMessage>; + fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage>; + fn as_shared_boxed(self: Box) -> Option>; + fn as_shared_arc(self: Arc) -> Option>; + + fn try_clone_into(&self, into: &mut dyn Any) -> bool; + fn try_clone_boxed(&self) -> Option>; +} + +impl TypeTagged for () { + fn type_tag_() -> TypeTag { + type_name::().into() + } + fn type_tag(&self) -> TypeTag { + type_name::().into() + } + fn type_name(&self) -> Cow { + type_name::().into() } } -pub trait TransferableMessage: Message + Serialize + DeserializeOwned -{ - fn into_boxed(self) -> BoxedMessage; -} -impl TransferableMessage for T { - fn into_boxed(self) -> BoxedMessage { - BoxedMessage(Box::new(self) as _) +impl Message for () { + fn as_any_ref(&self) -> &dyn Any { + self + } + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + fn as_any_boxed(self: Box) -> Box { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { + Some(self) + } + fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { + Some(self) + } + fn as_shared_boxed(self: Box) -> Option> { + Some(self) + } + fn as_shared_arc(self: Arc) -> Option> { + Some(self) + } + fn try_clone_into(&self, into: &mut dyn Any) -> bool { + let into = if let Some(inner) = into.downcast_mut::>() { + inner + } else { + return false; + }; + + into.replace(self.clone()); + true + } + fn try_clone_boxed(&self) -> Option> { + Some(Box::new(self.clone())) } } -pub trait SafeMessage: - Any + fmt::Debug + erased_serde::Serialize + Unpin + Send + Sync + 'static -{ - fn type_name(&self) -> &str; +pub trait IntoBoxedMessage { + fn into_boxed(self) -> Box; } -impl SafeMessage for T { - fn type_name(&self) -> &str { - type_name::() + +impl IntoBoxedMessage for T { + fn into_boxed(self) -> Box { + Box::new(self) } } -#[derive(Debug)] -pub struct BoxedMessage(Box); +pub trait SharedMessage: Message + erased_serde::Serialize {} +impl SharedMessage for T {} -impl From for BoxedMessage { - fn from(m: M) -> Self { - BoxedMessage(Box::new(m)) +// pub trait IntoTakeable { +// fn into_takeable(&mut self) -> Takeable<'_>; +// } + +// impl IntoTakeable for Option { +// fn into_takeable(&mut self) -> Takeable<'_> { +// Takeable { +// inner_ref: self +// } +// } +// } + +// pub struct Takeable<'a> { +// inner_ref: &'a mut dyn Any, +// } + +// impl Takeable<'_> { +// pub fn take(&mut self) -> Option { +// let m = self.inner_ref.downcast_mut::>()?; +// m.take() +// } +// } + +#[cfg(test)] +mod tests { + use super::*; + use erased_serde::Serializer; + use std::{any::type_name, borrow::Cow}; + + #[derive(Debug, Clone)] + struct Msg0; + + impl TypeTagged for Msg0 { + fn type_tag_() -> TypeTag { + type_name::().into() + } + fn type_tag(&self) -> TypeTag { + type_name::().into() + } + fn type_name(&self) -> Cow { + type_name::().into() + } + } + + impl Message for Msg0 { + fn as_any_ref(&self) -> &dyn Any { + self + } + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + fn as_any_boxed(self: Box) -> Box { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { + None + } + fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { + None + } + fn as_shared_boxed(self: Box) -> Option> { + None + } + fn as_shared_arc(self: Arc) -> Option> { + None + } + fn try_clone_into(&self, _: &mut dyn Any) -> bool { + false + } + fn try_clone_boxed(&self) -> Option> { + None + } + } + + #[derive(Debug, Clone)] + struct Msg1; + + impl TypeTagged for Msg1 { + fn type_tag_() -> TypeTag { + type_name::().into() + } + fn type_tag(&self) -> TypeTag { + type_name::().into() + } + fn type_name(&self) -> Cow { + type_name::().into() + } + } + + impl Message for Msg1 { + fn as_any_ref(&self) -> &dyn Any { + self + } + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + fn as_any_boxed(self: Box) -> Box { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { + None + } + fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { + None + } + fn as_shared_boxed(self: Box) -> Option> { + None + } + fn as_shared_arc(self: Arc) -> Option> { + None + } + fn try_clone_into(&self, into: &mut dyn Any) -> bool { + let into = if let Some(inner) = into.downcast_mut::>() { + inner + } else { + return false; + }; + + into.replace(self.clone()); + true + } + fn try_clone_boxed(&self) -> Option> { + Some(Box::new(self.clone())) + } + } + + #[derive(Debug, Clone, serde_derive::Serialize, serde_derive::Deserialize)] + struct Msg2 { + inner: [i32; 2], + } + + impl TypeTagged for Msg2 { + fn type_tag_() -> TypeTag { + type_name::().into() + } + fn type_tag(&self) -> TypeTag { + type_name::().into() + } + fn type_name(&self) -> Cow { + type_name::().into() + } + } + + impl Message for Msg2 { + fn as_any_ref(&self) -> &dyn Any { + self + } + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + fn as_any_boxed(self: Box) -> Box { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { + Some(self) + } + fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { + Some(self) + } + fn as_shared_boxed(self: Box) -> Option> { + Some(self) + } + fn as_shared_arc(self: Arc) -> Option> { + Some(self) + } + fn try_clone_into(&self, into: &mut dyn Any) -> bool { + let into = if let Some(inner) = into.downcast_mut::>() { + inner + } else { + return false; + }; + + into.replace(self.clone()); + true + } + fn try_clone_boxed(&self) -> Option> { + Some(Box::new(self.clone())) + } + } + + #[test] + fn test_static_upcast() { + let mut buff: Vec = Vec::new(); + let json = &mut serde_json::Serializer::new(&mut buff); + let mut json = ::erase(json); + + let x = Msg1; + let y = Msg2 { inner: [12, 13] }; + + assert!(x.as_shared_ref().is_none()); + assert!(y.as_shared_ref().is_some()); + assert!(y + .as_shared_ref() + .unwrap() + .erased_serialize(&mut json) + .is_ok()); + assert_eq!(buff.as_slice(), b"{\"inner\":[12,13]}"); + } + + #[test] + fn test_dyn_upcast() { + let mut buff: Vec = Vec::new(); + let json = &mut serde_json::Serializer::new(&mut buff); + let mut json = ::erase(json); + + let x = Msg1; + let y = Msg2 { inner: [12, 13] }; + + let x_dyn: &dyn Message = &x; + let y_dyn: &dyn Message = &y; + + assert!(x_dyn.as_shared_ref().is_none()); + assert!(y_dyn.as_shared_ref().is_some()); + assert!(y_dyn + .as_shared_ref() + .unwrap() + .erased_serialize(&mut json) + .is_ok()); + assert_eq!(buff.as_slice(), b"{\"inner\":[12,13]}"); } } diff --git a/src/error.rs b/src/error.rs index a117e60..f94ff32 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,16 +1,37 @@ use core::fmt; +use std::{any::type_name, borrow::Cow}; use thiserror::Error; use tokio::sync::oneshot; -use crate::{Message, envelop::{BoxedMessage, TransferableMessage}}; +use crate::{ + envelop::{IntoBoxedMessage, TypeTag, TypeTagged}, + Message, +}; -pub trait StdSyncSendError: std::error::Error + Send + Sync + Unpin + 'static {} -impl StdSyncSendError for T {} +pub trait StdSyncSendError: std::error::Error + TypeTagged + Send + Sync + Unpin + 'static {} +impl StdSyncSendError for T {} #[derive(Debug, Error)] pub enum VoidError {} +impl TypeTagged for VoidError { + fn type_name(&self) -> Cow { + type_name::().into() + } + + fn type_tag(&self) -> TypeTag { + type_name::().into() + } + + fn type_tag_() -> TypeTag + where + Self: Sized, + { + type_name::().into() + } +} + #[derive(Debug, Error)] pub enum SendError { #[error("Closed")] @@ -20,11 +41,11 @@ pub enum SendError { Full(M), } -impl SendError { - pub fn into_boxed(self) -> SendError { +impl SendError { + pub fn into_boxed(self) -> SendError> { match self { - SendError::Closed(m) => SendError::Closed(BoxedMessage::from(m)), - SendError::Full(m) => SendError::Closed(BoxedMessage::from(m)), + SendError::Closed(m) => SendError::Closed(m.into_boxed()), + SendError::Full(m) => SendError::Closed(m.into_boxed()), } } } @@ -40,6 +61,12 @@ pub enum Error { #[error("NoReceivers")] NoReceivers, + #[error("AddListenerError")] + AddListenerError, + + #[error("MessageCastError")] + MessageCastError, + #[error("Other({0})")] Other(E), @@ -48,6 +75,9 @@ pub enum Error { #[error("Other({0})")] OtherBoxed(Box), + + #[error("WrongMessageType()")] + WrongMessageType(M), } impl Error { @@ -59,6 +89,9 @@ impl Error { Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _), Error::OtherBoxed(inner) => Error::OtherBoxed(inner), + Error::WrongMessageType(inner) => Error::WrongMessageType(inner), + Error::AddListenerError => Error::AddListenerError, + Error::MessageCastError => Error::MessageCastError, } } @@ -70,25 +103,52 @@ impl Error { Error::Serialization(s) => Error::Serialization(s), Error::Other(_) => panic!("expected boxed error!"), Error::OtherBoxed(inner) => Error::Other(inner.into()), + Error::WrongMessageType(inner) => Error::WrongMessageType(inner), + Error::AddListenerError => Error::AddListenerError, + Error::MessageCastError => Error::MessageCastError, } } } impl Error<(), E> { - pub fn specify(self) -> Error { + pub fn specify(self) -> Error { match self { Error::SendError(_) => panic!("cannot specify type on typed error"), + Error::WrongMessageType(_) => panic!("cannot specify type on typed error"), Error::NoResponse => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers, Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::Other(inner), Error::OtherBoxed(inner) => Error::OtherBoxed(inner), + Error::AddListenerError => Error::AddListenerError, + Error::MessageCastError => Error::MessageCastError, } } } -impl From for Error { +impl From for Error { fn from(_: oneshot::error::RecvError) -> Self { Error::NoResponse } } + +impl Error> { + pub fn from_typed(err: Error) -> Self { + match err { + Error::SendError(SendError::Closed(m)) => { + Error::SendError(SendError::Closed(m.into_boxed())) + } + Error::SendError(SendError::Full(m)) => { + Error::SendError(SendError::Full(m.into_boxed())) + } + Error::WrongMessageType(m) => Error::WrongMessageType(m.into_boxed()), + Error::NoResponse => Error::NoReceivers, + Error::NoReceivers => Error::NoReceivers, + Error::Serialization(s) => Error::Serialization(s), + Error::Other(inner) => Error::Other(inner), + Error::OtherBoxed(inner) => Error::OtherBoxed(inner), + Error::AddListenerError => Error::AddListenerError, + Error::MessageCastError => Error::MessageCastError, + } + } +} diff --git a/src/handler.rs b/src/handler.rs index 8a7e77d..c4a0f10 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -89,7 +89,11 @@ pub trait AsyncBatchSynchronizedHandler: Send { type InBatch: FromIterator + Send; type OutBatch: IntoIterator + Send; - async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; + async fn handle( + &mut self, + msg: Self::InBatch, + bus: &Bus, + ) -> Result; async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -135,7 +139,11 @@ pub trait LocalAsyncBatchHandler { type InBatch: FromIterator + Send; type OutBatch: IntoIterator + Send; - async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; + async fn handle( + &mut self, + msg: Self::InBatch, + bus: &Bus, + ) -> Result; async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 7f6d6b2..594b849 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,21 +4,25 @@ pub mod error; mod handler; mod receiver; pub mod receivers; -mod trait_object; pub mod relay; +mod trait_object; #[macro_use] extern crate log; +pub mod derive { + pub use messagebus_derive::*; +} + use crate::receiver::Permit; use builder::BusBuilder; pub use builder::Module; -pub use relay::RelayTrait; -use core::any::{Any, TypeId}; -pub use envelop::{BoxedMessage, TransferableMessage, Message}; +use core::any::Any; +pub use envelop::{Message, SharedMessage, TypeTag, TypeTagged}; use error::{Error, SendError, StdSyncSendError}; pub use handler::*; use receiver::Receiver; +pub use relay::RelayTrait; use smallvec::SmallVec; use std::{ collections::HashMap, @@ -48,12 +52,12 @@ impl Default for SendOptions { } pub struct BusInner { - receivers: HashMap>, + receivers: HashMap>, closed: AtomicBool, } impl BusInner { - pub(crate) fn new(input: Vec<(TypeId, Receiver)>) -> Self { + pub(crate) fn new(input: Vec<(TypeTag, Receiver)>) -> Self { let mut receivers = HashMap::new(); for (key, value) in input { @@ -136,11 +140,11 @@ impl BusInner { } #[inline] - pub fn try_send(&self, msg: M) -> Result<(), Error> { + pub fn try_send(&self, msg: M) -> Result<(), Error> { self.try_send_ext(msg, SendOptions::Broadcast) } - pub fn try_send_ext( + pub fn try_send_ext( &self, msg: M, _options: SendOptions, @@ -149,10 +153,10 @@ impl BusInner { return Err(SendError::Closed(msg).into()); } + let tt = msg.type_tag(); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid) { + if let Some(rs) = self.receivers.get(&tt) { let permits = if let Some(x) = self.try_reserve(rs) { x } else { @@ -185,12 +189,12 @@ impl BusInner { } #[inline] - pub fn send_blocking(&self, msg: M) -> Result<(), Error> { + pub fn send_blocking(&self, msg: M) -> Result<(), Error> { self.send_blocking_ext(msg, SendOptions::Broadcast) } #[inline] - pub fn send_blocking_ext( + pub fn send_blocking_ext( &self, msg: M, options: SendOptions, @@ -199,11 +203,11 @@ impl BusInner { } #[inline] - pub async fn send(&self, msg: M) -> core::result::Result<(), Error> { + pub async fn send(&self, msg: M) -> core::result::Result<(), Error> { Ok(self.send_ext(msg, SendOptions::Broadcast).await?) } - pub async fn send_ext( + pub async fn send_ext( &self, msg: M, _options: SendOptions, @@ -212,10 +216,10 @@ impl BusInner { return Err(SendError::Closed(msg).into()); } + let tt = msg.type_tag(); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid) { + if let Some(rs) = self.receivers.get(&tt) { if let Some((last, head)) = rs.split_last() { for r in head { let _ = r.send(mid, msg.clone(), r.reserve().await); @@ -236,11 +240,11 @@ impl BusInner { } #[inline] - pub fn force_send(&self, msg: M) -> Result<(), Error> { + pub fn force_send(&self, msg: M) -> Result<(), Error> { self.force_send_ext(msg, SendOptions::Broadcast) } - pub fn force_send_ext( + pub fn force_send_ext( &self, msg: M, _options: SendOptions, @@ -249,10 +253,10 @@ impl BusInner { return Err(SendError::Closed(msg).into()); } + let tt = msg.type_tag(); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid) { + if let Some(rs) = self.receivers.get(&tt) { if let Some((last, head)) = rs.split_last() { for r in head { let _ = r.force_send(mid, msg.clone()); @@ -273,15 +277,15 @@ impl BusInner { } #[inline] - pub fn try_send_one(&self, msg: M) -> Result<(), Error> { + pub fn try_send_one(&self, msg: M) -> Result<(), Error> { if self.closed.load(Ordering::SeqCst) { return Err(SendError::Closed(msg).into()); } + let tt = msg.type_tag(); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) { + if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { let permits = if let Some(x) = rs.try_reserve() { x } else { @@ -294,30 +298,15 @@ impl BusInner { } } - pub async fn send_one(&self, msg: M) -> Result<(), Error> { + pub async fn send_one(&self, msg: M) -> Result<(), Error> { if self.closed.load(Ordering::SeqCst) { return Err(SendError::Closed(msg).into()); } + let tt = msg.type_tag(); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) { - Ok(rs.send(mid, msg, rs.reserve().await)?) - } else { - Err(Error::NoReceivers) - } - } - - pub async fn send_local_one(&self, msg: M) -> Result<(), Error> { - if self.closed.load(Ordering::SeqCst) { - return Err(SendError::Closed(msg).into()); - } - - let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let tid = TypeId::of::(); - - if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) { + if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { Ok(rs.send(mid, msg, rs.reserve().await)?) } else { Err(Error::NoReceivers) @@ -325,19 +314,19 @@ impl BusInner { } #[inline] - pub fn send_one_blocking(&self, msg: M) -> Result<(), Error> { + pub fn send_one_blocking(&self, msg: M) -> Result<(), Error> { futures::executor::block_on(self.send_one(msg)) } - pub async fn request( + pub async fn request( &self, req: M, options: SendOptions, ) -> Result> { - let tid = TypeId::of::(); - let rid = TypeId::of::(); + let tid = req.type_tag(); + let rid = R::type_tag_(); - let mut iter = self.select_receivers(tid, options, Some(rid), None); + let mut iter = self.select_receivers(&tid, options, Some(&rid), None); if let Some(rc) = iter.next() { let (tx, rx) = oneshot::channel(); let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; @@ -349,17 +338,17 @@ impl BusInner { } } - pub async fn request_local_we(&self, req: M, options: SendOptions) -> Result> + pub async fn request_we(&self, req: M, options: SendOptions) -> Result> where M: Message, R: Message, E: StdSyncSendError, { - let tid = TypeId::of::(); - let rid = TypeId::of::(); - let eid = TypeId::of::(); + let tid = M::type_tag_(); + let rid = R::type_tag_(); + let eid = E::type_tag_(); - let mut iter = self.select_receivers(tid, options, Some(rid), Some(eid)); + let mut iter = self.select_receivers(&tid, options, Some(&rid), Some(&eid)); if let Some(rc) = iter.next() { let (tx, rx) = oneshot::channel(); let mid = (rc.add_response_waiter_we(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; @@ -371,23 +360,94 @@ impl BusInner { } } - #[inline] - fn select_receivers( + pub async fn send_boxed( &self, - tid: TypeId, + msg: Box, _options: SendOptions, - rid: Option, - eid: Option, - ) -> impl Iterator + '_ { + ) -> Result<(), Error>> { + if self.closed.load(Ordering::SeqCst) { + return Err(SendError::Closed(msg).into()); + } + + let tt = msg.type_tag(); + let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); + + if let Some(rs) = self.receivers.get(&tt) { + if let Some((last, head)) = rs.split_last() { + for r in head { + let _ = r.send_boxed(mid, msg.try_clone_boxed().unwrap(), r.reserve().await); + } + + let _ = last.send_boxed(mid, msg, last.reserve().await); + + return Ok(()); + } + } + + warn!("Unhandled message: no receivers"); + + Ok(()) + } + + pub async fn send_boxed_one( + &self, + msg: Box, + _options: SendOptions, + ) -> Result<(), Error>> { + if self.closed.load(Ordering::SeqCst) { + return Err(SendError::Closed(msg).into()); + } + + let tt = msg.type_tag(); + let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); + + if let Some(rs) = self.receivers.get(&tt).and_then(|rs| rs.first()) { + Ok(rs.send_boxed(mid, msg, rs.reserve().await)?) + } else { + Err(Error::NoReceivers) + } + } + + pub async fn request_boxed( + &self, + req: Box, + options: SendOptions, + ) -> Result, Error>> { + if self.closed.load(Ordering::SeqCst) { + return Err(SendError::Closed(req).into()); + } + + let tt = req.type_tag(); + + let mut iter = self.select_receivers(&tt, options, None, None); + if let Some(rc) = iter.next() { + let (tx, rx) = oneshot::channel(); + let mid = (rc.add_response_waiter_boxed(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; + rc.send_boxed(mid, req, rc.reserve().await)?; + + rx.await?.map_err(|x| x.specify::>()) + } else { + Err(Error::NoReceivers) + } + } + + #[inline] + fn select_receivers<'a, 'b: 'a, 'c: 'a, 'd: 'a>( + &'a self, + tid: &'b TypeTag, + _options: SendOptions, + rid: Option<&'c TypeTag>, + eid: Option<&'d TypeTag>, + ) -> impl Iterator + 'a { self.receivers - .get(&tid) + .get(tid) .into_iter() .map(|item| item.iter()) .flatten() .filter(move |x| match (rid, eid) { - (Some(r), Some(e)) => x.resp_type_id() == r && x.err_type_id() == e, - (Some(r), None) => x.resp_type_id() == r, - (None, Some(e)) => x.err_type_id() == e, + (Some(r), Some(e)) => x.resp_type_tag() == r && x.err_type_tag() == e, + (Some(r), None) => x.resp_type_tag() == r, + (None, Some(e)) => x.err_type_tag() == e, (None, None) => true, }) } diff --git a/src/receiver.rs b/src/receiver.rs index a641405..7ccf619 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,4 +1,9 @@ -use crate::{Bus, Error, Message, envelop::{BoxedMessage, TransferableMessage}, error::{SendError, StdSyncSendError}, trait_object::TraitObject}; +use crate::{ + envelop::{IntoBoxedMessage, TypeTag}, + error::{SendError, StdSyncSendError}, + trait_object::TraitObject, + Bus, Error, Message, +}; use core::{ any::TypeId, fmt, @@ -7,7 +12,6 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use erased_serde::Deserializer; use futures::future::poll_fn; use futures::Future; use std::{ @@ -43,20 +47,36 @@ where fn poll_events(&self, ctx: &mut Context<'_>) -> Poll>; } +pub trait WrapperReturnTypeOnly: Send + Sync { + fn add_response_listener( + &self, + listener: oneshot::Sender>, + ) -> Result; +} + +pub trait WrapperReturnTypeAndError: Send + Sync { + fn add_response_listener( + &self, + listener: oneshot::Sender>>, + ) -> Result; + fn response(&self, mid: u64, resp: Result>) -> Result<(), Error>; +} + pub trait ReceiverTrait: Send + Sync { fn typed(&self) -> AnyReceiver<'_>; - fn poller(&self) -> AnyPoller<'_>; + fn wrapper(&self) -> AnyWrapper<'_>; fn name(&self) -> &str; + fn send_boxed(&self, mid: u64, msg: Box) -> Result<(), Error>>; + fn add_response_listener( + &self, + listener: oneshot::Sender, Error>>, + ) -> Result; fn stats(&self) -> Result<(), Error>; fn close(&self) -> Result<(), Error>; fn sync(&self) -> Result<(), Error>; fn flush(&self) -> Result<(), Error>; } -pub trait TransferableReceiverTrait: Send + Sync { - fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error>; -} - pub trait ReceiverPollerBuilder { fn build(bus: Bus) -> Box>; } @@ -107,9 +127,62 @@ where S: 'static, { inner: S, + waiters: Slab>, _m: PhantomData<(M, R, E)>, } +impl WrapperReturnTypeAndError for ReceiverWrapper +where + M: Message, + R: Message, + E: StdSyncSendError, + S: Send + Sync + 'static, +{ + fn add_response_listener( + &self, + listener: oneshot::Sender>>, + ) -> Result { + Ok(self + .waiters + .insert(Waiter::WithErrorType(listener)) + .ok_or_else(|| Error::AddListenerError)?) + } + + fn response(&self, mid: u64, resp: Result>) -> Result<(), Error> { + if let Some(waiter) = self.waiters.take(mid as _) { + match waiter { + Waiter::WithErrorType(sender) => sender.send(resp).unwrap(), + Waiter::WithoutErrorType(sender) => { + sender.send(resp.map_err(|e| e.into_dyn())).unwrap() + } + Waiter::Boxed(sender) => sender + .send(resp.map_err(|e| e.into_dyn()).map(|x| x.into_boxed())) + .unwrap(), + } + } + + Ok(()) + } +} + +impl WrapperReturnTypeOnly for ReceiverWrapper +where + M: Message, + R: Message, + E: StdSyncSendError, + S: Send + Sync + 'static, +{ + fn add_response_listener( + &self, + listener: oneshot::Sender>, + ) -> Result { + Ok(self + .waiters + .insert(Waiter::WithoutErrorType(listener)) + .ok_or_else(|| Error::AddListenerError)?) + } +} + impl ReceiverTrait for ReceiverWrapper where M: Message, @@ -125,8 +198,22 @@ where AnyReceiver::new(&self.inner) } - fn poller(&self) -> AnyPoller<'_> { - AnyPoller::new(&self.inner) + fn wrapper(&self) -> AnyWrapper<'_> { + AnyWrapper::new(self) + } + + fn send_boxed( + &self, + mid: u64, + boxed_msg: Box, + ) -> Result<(), Error>> { + let boxed = boxed_msg + .as_any_boxed() + .downcast::() + .map_err(|_| Error::MessageCastError)?; + + Ok(SendTypedReceiver::send(&self.inner, mid, *boxed) + .map_err(|err| Error::from(err.into_boxed()))?) } fn stats(&self) -> Result<(), Error> { @@ -144,21 +231,18 @@ where fn flush(&self) -> Result<(), Error> { Ok(SendUntypedReceiver::send(&self.inner, Action::Flush)?) } -} -impl TransferableReceiverTrait for ReceiverWrapper -where - M: TransferableMessage, - R: TransferableMessage, - E: StdSyncSendError, - S: SendUntypedReceiver + SendTypedReceiver + ReciveTypedReceiver + 'static, -{ - fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error> { - unimplemented!() + fn add_response_listener( + &self, + listener: oneshot::Sender, Error>>, + ) -> Result { + Ok(self + .waiters + .insert(Waiter::Boxed(listener)) + .ok_or_else(|| Error::AddListenerError)?) } } - pub struct Permit { pub(crate) fuse: bool, pub(crate) inner: Arc, @@ -173,64 +257,134 @@ impl Drop for Permit { } pub struct AnyReceiver<'a> { - dyn_typed_receiver_trait_object: TraitObject, - type_id: TypeId, + data: *mut (), + typed: (TypeId, *mut ()), + poller: (TypeId, *mut ()), _m: PhantomData<&'a usize>, } +impl<'a> AnyReceiver<'a> { + pub fn new(rcvr: &'a S) -> Self + where + M: Message, + R: Message, + E: StdSyncSendError, + S: SendTypedReceiver + ReciveTypedReceiver + 'static, + { + let send_typed_receiver = rcvr as &(dyn SendTypedReceiver); + let recive_typed_receiver = rcvr as &(dyn ReciveTypedReceiver); + + let send_typed_receiver: TraitObject = unsafe { mem::transmute(send_typed_receiver) }; + let recive_typed_receiver: TraitObject = unsafe { mem::transmute(recive_typed_receiver) }; + + Self { + data: send_typed_receiver.data, + typed: ( + TypeId::of::>(), + send_typed_receiver.vtable, + ), + poller: ( + TypeId::of::>(), + recive_typed_receiver.vtable, + ), + _m: Default::default(), + } + } + + pub fn cast_send_typed(&'a self) -> &'a dyn SendTypedReceiver { + assert_eq!(self.typed.0, TypeId::of::>()); + + unsafe { + mem::transmute(TraitObject { + data: self.data, + vtable: self.typed.1, + }) + } + } + + pub fn cast_recive_typed( + &'a self, + ) -> &'a dyn ReciveTypedReceiver { + assert_eq!(self.poller.0, TypeId::of::>()); + + unsafe { + mem::transmute(TraitObject { + data: self.data, + vtable: self.poller.1, + }) + } + } +} + unsafe impl Send for AnyReceiver<'_> {} -impl<'a> AnyReceiver<'a> { - pub fn new + 'static>(rcvr: &'a R) -> Self { - let trcvr = rcvr as &(dyn SendTypedReceiver); - - Self { - dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) }, - type_id: TypeId::of::>(), - _m: Default::default(), - } - } - - pub fn dyn_typed_receiver(&'a self) -> &'a dyn SendTypedReceiver { - assert_eq!(self.type_id, TypeId::of::>()); - - unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) } - } -} - -pub struct AnyPoller<'a> { - dyn_typed_receiver_trait_object: TraitObject, - type_id: TypeId, +pub struct AnyWrapper<'a> { + data: *mut (), + wrapper_r: (TypeId, *mut ()), + wrapper_re: (TypeId, *mut ()), _m: PhantomData<&'a usize>, } -unsafe impl Send for AnyPoller<'_> {} - -impl<'a> AnyPoller<'a> { - pub fn new(rcvr: &'a R) -> Self +impl<'a> AnyWrapper<'a> { + pub fn new(rcvr: &'a S) -> Self where - M: Message, + R: Message, E: StdSyncSendError, - R: ReciveTypedReceiver + 'static, + S: WrapperReturnTypeOnly + WrapperReturnTypeAndError + 'static, { - let trcvr = rcvr as &(dyn ReciveTypedReceiver); + let wrapper_r = rcvr as &(dyn WrapperReturnTypeOnly); + let wrapper_re = rcvr as &(dyn WrapperReturnTypeAndError); + + let wrapper_r: TraitObject = unsafe { mem::transmute(wrapper_r) }; + let wrapper_re: TraitObject = unsafe { mem::transmute(wrapper_re) }; Self { - dyn_typed_receiver_trait_object: unsafe { mem::transmute(trcvr) }, - type_id: TypeId::of::>(), + data: wrapper_r.data, + wrapper_r: ( + TypeId::of::>(), + wrapper_r.vtable, + ), + wrapper_re: ( + TypeId::of::>(), + wrapper_re.vtable, + ), _m: Default::default(), } } - pub fn dyn_typed_receiver( - &'a self, - ) -> &'a dyn ReciveTypedReceiver { - assert_eq!(self.type_id, TypeId::of::>()); + pub fn cast_ret_only(&'a self) -> &'a dyn WrapperReturnTypeOnly { + assert_eq!( + self.wrapper_r.0, + TypeId::of::>() + ); - unsafe { mem::transmute(self.dyn_typed_receiver_trait_object) } + unsafe { + mem::transmute(TraitObject { + data: self.data, + vtable: self.wrapper_r.1, + }) + } + } + + pub fn cast_ret_and_error( + &'a self, + ) -> &'a dyn WrapperReturnTypeAndError { + assert_eq!( + self.wrapper_re.0, + TypeId::of::>() + ); + + unsafe { + mem::transmute(TraitObject { + data: self.data, + vtable: self.wrapper_re.1, + }) + } } } +unsafe impl Send for AnyWrapper<'_> {} + #[derive(Debug, Clone)] pub struct ReceiverStats { pub name: Cow<'static, str>, @@ -255,8 +409,8 @@ impl fmt::Display for ReceiverStats { } struct ReceiverContext { - resp_type_id: TypeId, - err_type_id: TypeId, + resp_type_tag: TypeTag, + err_type_tag: TypeTag, limit: u64, processing: AtomicU64, need_flush: AtomicBool, @@ -272,11 +426,15 @@ impl PermitDrop for ReceiverContext { } } +enum Waiter { + WithErrorType(oneshot::Sender>>), + WithoutErrorType(oneshot::Sender>), + Boxed(oneshot::Sender, Error>>), +} + pub struct Receiver { inner: Arc, context: Arc, - waiters: Arc, - waiters_void: Arc, } impl fmt::Debug for Receiver { @@ -305,8 +463,8 @@ impl Receiver { { Self { context: Arc::new(ReceiverContext { - resp_type_id: TypeId::of::(), - err_type_id: TypeId::of::(), + resp_type_tag: R::type_tag_(), + err_type_tag: E::type_tag_(), limit, processing: AtomicU64::new(0), need_flush: AtomicBool::new(false), @@ -317,29 +475,20 @@ impl Receiver { }), inner: Arc::new(ReceiverWrapper { inner, + waiters: sharded_slab::Slab::>::new_with_config::(), _m: Default::default(), }), - waiters: Arc::new( - sharded_slab::Slab::>>>::new_with_config::< - SlabCfg, - >(), - ), - waiters_void: Arc::new( - sharded_slab::Slab::>>>::new_with_config::< - SlabCfg, - >(), - ), } } #[inline] - pub fn resp_type_id(&self) -> TypeId { - self.context.resp_type_id + pub fn resp_type_tag(&self) -> &TypeTag { + &self.context.resp_type_tag } #[inline] - pub fn err_type_id(&self) -> TypeId { - self.context.err_type_id + pub fn err_type_tag(&self) -> &TypeTag { + &self.context.err_type_tag } #[inline] @@ -404,7 +553,7 @@ impl Receiver { mut permit: Permit, ) -> Result<(), SendError> { let any_receiver = self.inner.typed(); - let receiver = any_receiver.dyn_typed_receiver::(); + let receiver = any_receiver.cast_send_typed::(); let res = receiver.send(mid, msg); permit.fuse = true; @@ -418,7 +567,7 @@ impl Receiver { #[inline] pub fn force_send(&self, mid: u64, msg: M) -> Result<(), SendError> { let any_receiver = self.inner.typed(); - let receiver = any_receiver.dyn_typed_receiver::(); + let receiver = any_receiver.cast_send_typed::(); let res = receiver.send(mid, msg); self.context.processing.fetch_add(1, Ordering::SeqCst); @@ -429,6 +578,23 @@ impl Receiver { res } + pub fn send_boxed( + &self, + mid: u64, + msg: Box, + mut permit: Permit, + ) -> Result<(), Error>> { + self.context.processing.fetch_add(1, Ordering::SeqCst); + let res = self.inner.send_boxed(mid, msg); + permit.fuse = true; + + if !res.is_err() { + self.context.need_flush.store(true, Ordering::SeqCst); + } + + Ok(()) + } + pub fn start_polling_events( &self, ) -> Box Pin + Send>>> @@ -439,22 +605,13 @@ impl Receiver { let ctx_clone = self.context.clone(); let inner_clone = self.inner.clone(); - let waiters = self - .waiters - .clone() - .downcast::>>>>() - .unwrap(); - - let waiters_void = self - .waiters_void - .clone() - .downcast::>>>>() - .unwrap(); - Box::new(move |_| { Box::pin(async move { - let any_receiver = inner_clone.poller(); - let receiver = any_receiver.dyn_typed_receiver::(); + let any_receiver = inner_clone.typed(); + let receiver = any_receiver.cast_recive_typed::(); + + let any_wrapper = inner_clone.wrapper(); + let wrapper = any_wrapper.cast_ret_and_error::(); loop { let event = poll_fn(move |ctx| receiver.poll_events(ctx)).await; @@ -469,16 +626,8 @@ impl Receiver { ctx_clone.processing.fetch_sub(1, Ordering::SeqCst); ctx_clone.response.notify_one(); - if let Some(waiter) = waiters.take(mid as usize) { - if waiter.send(resp).is_err() { - error!("Response cannot be processed!"); - } - } else if let Some(waiter) = waiters_void.take(mid as usize) { - if waiter.send(resp.map_err(|x| x.into_dyn())).is_err() { - error!("Response cannot be processed!"); - } - } else if TypeId::of::() != TypeId::of::<()>() { - warn!("Non-void response has no waiters!"); + if let Err(err) = wrapper.response(mid, resp) { + error!("Response error: {}", err); } } @@ -489,48 +638,36 @@ impl Receiver { }) } + #[inline] + pub(crate) fn add_response_waiter_boxed( + &self, + waiter: oneshot::Sender, Error>>, + ) -> Result { + self.inner.add_response_listener(waiter) + } + #[inline] pub(crate) fn add_response_waiter( &self, - waiter: oneshot::Sender>>, - ) -> Option { - let idx = self - .waiters_void - .downcast_ref::>>>>() - .unwrap() - .insert(waiter)?; + waiter: oneshot::Sender>, + ) -> Result { + let any_receiver = self.inner.wrapper(); + let receiver = any_receiver.cast_ret_only::(); - Some(idx) + receiver.add_response_listener(waiter) } #[inline] pub(crate) fn add_response_waiter_we( &self, waiter: oneshot::Sender>>, - ) -> Option { - let idx = self - .waiters - .downcast_ref::>>>>() - .unwrap() - .insert(waiter)?; + ) -> Result { + let any_receiver = self.inner.wrapper(); + let receiver = any_receiver.cast_ret_and_error::(); - Some(idx) + receiver.add_response_listener(waiter) } - // #[inline] - // pub(crate) fn add_response_waiter_dyn( - // &self, - // waiter: oneshot::Sender>>, - // ) -> Option { - // let idx = self - // .waiters - // .downcast_ref::>>>>() - // .unwrap() - // .insert(waiter)?; - - // Some(idx) - // } - #[inline] pub async fn close(&self) { let notified = self.context.closed.notified(); diff --git a/src/receivers/buffer_unordered/mod.rs b/src/receivers/buffer_unordered/mod.rs index 50f5bef..bd7ac88 100644 --- a/src/receivers/buffer_unordered/mod.rs +++ b/src/receivers/buffer_unordered/mod.rs @@ -4,8 +4,8 @@ mod sync; use std::sync::atomic::AtomicU64; pub use r#async::BufferUnorderedAsync; +use serde_derive::{Deserialize, Serialize}; pub use sync::BufferUnorderedSync; -use serde_derive::{Serialize, Deserialize}; #[derive(Debug)] pub struct BufferUnorderedStats { diff --git a/src/receivers/buffer_unordered_batched/mod.rs b/src/receivers/buffer_unordered_batched/mod.rs index bcec33c..bab0091 100644 --- a/src/receivers/buffer_unordered_batched/mod.rs +++ b/src/receivers/buffer_unordered_batched/mod.rs @@ -4,8 +4,8 @@ mod sync; use std::sync::atomic::AtomicU64; pub use r#async::BufferUnorderedBatchedAsync; +use serde_derive::{Deserialize, Serialize}; pub use sync::BufferUnorderedBatchedSync; -use serde_derive::{Serialize, Deserialize}; #[derive(Debug)] pub struct BufferUnorderedBatchedStats { diff --git a/src/receivers/mod.rs b/src/receivers/mod.rs index 8304744..772fc05 100644 --- a/src/receivers/mod.rs +++ b/src/receivers/mod.rs @@ -29,7 +29,9 @@ where } #[inline(always)] -pub(crate) fn fix_into_iter + Send>(x: T) -> impl IntoIterator + Send { +pub(crate) fn fix_into_iter + Send>( + x: T, +) -> impl IntoIterator + Send { x } diff --git a/src/receivers/synchronize_batched/async.rs b/src/receivers/synchronize_batched/async.rs index 2138e83..135cd20 100644 --- a/src/receivers/synchronize_batched/async.rs +++ b/src/receivers/synchronize_batched/async.rs @@ -9,7 +9,7 @@ use crate::{ builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, - receivers::{fix_type, fix_into_iter, Request}, + receivers::{fix_into_iter, fix_type, Request}, AsyncBatchSynchronizedHandler, Bus, Message, Untyped, }; diff --git a/src/receivers/synchronize_batched/mod.rs b/src/receivers/synchronize_batched/mod.rs index 7bd35a4..c14cfc4 100644 --- a/src/receivers/synchronize_batched/mod.rs +++ b/src/receivers/synchronize_batched/mod.rs @@ -4,9 +4,8 @@ mod sync; use std::sync::atomic::AtomicU64; pub use r#async::SynchronizedBatchedAsync; +use serde_derive::{Deserialize, Serialize}; pub use sync::SynchronizedBatchedSync; -use serde_derive::{Serialize, Deserialize}; - #[derive(Debug)] pub struct SynchronizedBatchedStats { diff --git a/src/receivers/synchronize_batched/sync.rs b/src/receivers/synchronize_batched/sync.rs index fd8c7dd..0489a98 100644 --- a/src/receivers/synchronize_batched/sync.rs +++ b/src/receivers/synchronize_batched/sync.rs @@ -9,7 +9,7 @@ use crate::{ builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, - receivers::{fix_type, fix_into_iter, Request}, + receivers::{fix_into_iter, fix_type, Request}, BatchSynchronizedHandler, Bus, Message, Untyped, }; diff --git a/src/receivers/synchronized/mod.rs b/src/receivers/synchronized/mod.rs index 8212368..786e060 100644 --- a/src/receivers/synchronized/mod.rs +++ b/src/receivers/synchronized/mod.rs @@ -4,8 +4,8 @@ mod sync; use std::sync::atomic::AtomicU64; pub use r#async::SynchronizedAsync; +use serde_derive::{Deserialize, Serialize}; pub use sync::SynchronizedSync; -use serde_derive::{Serialize, Deserialize}; #[derive(Debug)] pub struct SynchronizedStats { diff --git a/src/relay.rs b/src/relay.rs index 1e28821..0242911 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -1,14 +1,25 @@ -use std::{any::TypeId, borrow::Cow, collections::HashMap, sync::atomic::{AtomicU64, Ordering}}; +use std::{ + any::TypeId, + borrow::Cow, + collections::HashMap, + sync::atomic::{AtomicU64, Ordering}, +}; -use tokio::sync::oneshot::Sender; use sharded_slab::Slab; +use tokio::sync::oneshot::Sender; -use crate::{Bus, Message, envelop::SafeMessage, error::{Error, SendError}, receiver::Permit}; +use crate::{ + error::{Error, SendError}, + receiver::Permit, + Bus, Message, +}; pub trait RelayTrait { - // fn handle_message(&self, mid: u64, msg: &dyn SafeMessage, tx: Option>, bus: &Bus); - fn start_relay(&self, bus: &Bus) -> Result<(), Error> ; - fn stop_relay(&self); + type Context; + // fn handle_message(&self, mid: u64, msg: &dyn SafeMessage, ctx: Self::Context, bus: &Bus); + // fn handle_request(&self, mid: u64, msg: &dyn SafeMessage, ctx: Self::Context, bus: &Bus); + fn start_relay(&self, bus: &Bus) -> Result; + fn stop_relay(&self, ctx: Self::Context); } pub struct Relay { @@ -119,4 +130,4 @@ impl Relay { // warn!("flush failed!"); // } // } -} \ No newline at end of file +}