diff --git a/CHANGELOG.md b/CHANGELOG.md index d3ff9af..39f3539 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,19 @@ -## MewssageBus changelog +## MewssageBus pending changes: + +### 0.8.0 +#### new features: +* Generator Handlers +* Message Masking + +### 0.7.0 +#### new features: +* Bus scopes (`enter` and `leave` methods) instead clone +* Bus relays. Connect other message bus by IP address + +#### breaking changes: +* Batched handlers now require `InBatch` and `OutBatch` types + +## MewssageBus changelog: ### 0.6.5 #### new features: diff --git a/Cargo.toml b/Cargo.toml index 127abf9..04e5b8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,9 @@ smallvec = "1.6.1" log = "0.4.14" sharded-slab = "0.1.1" thiserror = "1.0.25" +erased-serde = "0.3.16" +serde = "1.0.126" +serde_derive = "1.0.126" [dev-dependencies] anyhow = "1.0.41" diff --git a/README.md b/README.md index 9d4a934..202b3e4 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Inspired by Actix ### Basics 1. Can deliver messages between actors using receivers (usually a queue implementations) 2. Messages distincts and delivers by TypeId -3. Messages delivers in a broadcast fashion to many receivers (Cloned) +3. Messages delivers ether in a broadcast fashion to many receivers (Cloned) or addressed by recevier id, balanced (depends on queue load) or random 4. There are different kind of receivers implemented: - BufferUnordered Receiver (sync and async) - Synchronized (sync and async) @@ -21,9 +21,8 @@ Inspired by Actix Here are the list of implmented handler kinds: ```rust - pub trait Handler: Send + Sync { - type Error: crate::Error; + type Error: StdSyncSendError; type Response: Message; fn handle(&self, msg: M, bus: &Bus) -> Result; @@ -34,7 +33,7 @@ pub trait Handler: Send + Sync { #[async_trait] pub trait AsyncHandler: Send + Sync { - type Error: crate::Error; + type Error: StdSyncSendError; type Response: Message; async fn handle(&self, msg: M, bus: &Bus) -> Result; @@ -44,7 +43,7 @@ pub trait AsyncHandler: Send + Sync { } pub trait SynchronizedHandler: Send { - type Error: crate::Error; + type Error: StdSyncSendError; type Response: Message; fn handle(&mut self, msg: M, bus: &Bus) -> Result; @@ -55,7 +54,7 @@ pub trait SynchronizedHandler: Send { #[async_trait] pub trait AsyncSynchronizedHandler: Send { - type Error: crate::Error; + type Error: StdSyncSendError; type Response: Message; async fn handle(&mut self, msg: M, bus: &Bus) -> Result; @@ -65,10 +64,12 @@ pub trait AsyncSynchronizedHandler: Send { } pub trait BatchHandler: Send + Sync { - type Error: crate::Error; + type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - fn handle(&self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result; fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -76,20 +77,24 @@ pub trait BatchHandler: Send + Sync { #[async_trait] pub trait AsyncBatchHandler: Send + Sync { - type Error: crate::Error; + type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - async fn handle(&self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result; async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } } pub trait BatchSynchronizedHandler: Send { - type Error: crate::Error; + type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - fn handle(&mut self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -97,10 +102,12 @@ pub trait BatchSynchronizedHandler: Send { #[async_trait] pub trait AsyncBatchSynchronizedHandler: Send { - type Error: crate::Error; + type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - async fn handle(&mut self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } diff --git a/examples/demo_async.rs b/examples/demo_async.rs index 8a5cd00..4a6d96b 100644 --- a/examples/demo_async.rs +++ b/examples/demo_async.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; -use messagebus::{AsyncHandler, Bus, Handler, Message, error, receivers}; +use messagebus::{error, AsyncHandler, Bus, Handler, Message}; use thiserror::Error; #[derive(Debug, Error)] enum Error { #[error("Error({0})")] - Error(anyhow::Error) + Error(anyhow::Error), } impl From> for Error { @@ -152,15 +152,15 @@ async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .register(TmpReceiver2) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_sync::(8, Default::default()) .done() .build(); diff --git a/examples/demo_backpressure.rs b/examples/demo_backpressure.rs index 4e2e4c5..fe32649 100644 --- a/examples/demo_backpressure.rs +++ b/examples/demo_backpressure.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; -use messagebus::{receivers, AsyncHandler, Message, error, Bus}; +use messagebus::{error, receivers, AsyncHandler, Bus, Message}; use thiserror::Error; #[derive(Debug, Error)] enum Error { #[error("Error({0})")] - Error(anyhow::Error) + Error(anyhow::Error), } impl From> for Error { @@ -35,13 +35,7 @@ impl AsyncHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) - .subscribe::, _, _>( - 1, - receivers::BufferUnorderedConfig { - buffer_size: 1, - max_parallel: 1, - }, - ) + .subscribe_async::(1, receivers::BufferUnorderedConfig { buffer_size: 1, max_parallel: 1 }) .done() .build(); diff --git a/examples/demo_batch.rs b/examples/demo_batch.rs index ecf6286..438bcbb 100644 --- a/examples/demo_batch.rs +++ b/examples/demo_batch.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; -use messagebus::{receivers, AsyncBatchHandler, BatchHandler, Message, error, Bus}; +use messagebus::{error, AsyncBatchHandler, BatchHandler, Bus, Message}; use thiserror::Error; #[derive(Debug, Error, Clone)] enum Error { #[error("Error({0})")] - Error(Arc) + Error(Arc), } impl From> for Error { @@ -21,6 +21,8 @@ struct TmpReceiver; impl AsyncBatchHandler for TmpReceiver { type Error = Error; type Response = (); + type InBatch = Vec; + type OutBatch = Vec<()>; async fn handle(&self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { println!("---> [i32; {}] {:?}", msg.len(), msg); @@ -32,6 +34,8 @@ impl AsyncBatchHandler for TmpReceiver { impl BatchHandler for TmpReceiver { type Error = Error; type Response = (); + type InBatch = Vec; + type OutBatch = Vec<()>; fn handle(&self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { println!("---> [i16; {}] {:?}", msg.len(), msg); @@ -43,8 +47,8 @@ impl BatchHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver) - .subscribe::, _, _>(16, Default::default()) - .subscribe::, _, _>(16, Default::default()) + .subscribe_batch_async::(16, Default::default()) + .subscribe_batch_sync::(16, Default::default()) .done() .build(); diff --git a/examples/demo_relay.rs b/examples/demo_relay.rs new file mode 100644 index 0000000..fcb767f --- /dev/null +++ b/examples/demo_relay.rs @@ -0,0 +1,51 @@ +use messagebus::{error, Bus, Handler, Message, Module}; +use thiserror::Error; + +#[derive(Debug, Error)] +enum Error { + #[error("Error({0})")] + Error(anyhow::Error), +} + +impl From> for Error { + fn from(err: error::Error) -> Self { + Self::Error(err.into()) + } +} + +struct TmpReceiver; + +impl Handler for TmpReceiver { + type Error = Error; + type Response = (); + + fn handle(&self, msg: u32, _bus: &Bus) -> Result { + println!("---> u32 {}", msg); + Ok(()) + } +} + +fn module() -> Module { + Module::new() + .register(TmpReceiver) + .subscribe_sync::(8, Default::default()) + .done() +} + +#[tokio::main] +async fn main() { + let (b, poller) = Bus::build().add_module(module()).build(); + + // b. + + println!("flush"); + b.flush().await; + + println!("close"); + b.close().await; + + println!("closed"); + + poller.await; + println!("[done]"); +} diff --git a/examples/demo_req_resp.rs b/examples/demo_req_resp.rs index 32901d5..f74037e 100644 --- a/examples/demo_req_resp.rs +++ b/examples/demo_req_resp.rs @@ -1,13 +1,16 @@ use core::f32; use async_trait::async_trait; -use messagebus::{AsyncHandler, Bus, Message, error::{self, StdSyncSendError}, receivers}; +use messagebus::{ + error::{self, StdSyncSendError}, + AsyncHandler, Bus, Message, +}; use thiserror::Error; #[derive(Debug, Error)] enum Error { #[error("Error({0})")] - Error(anyhow::Error) + Error(anyhow::Error), } impl From> for Error { @@ -16,7 +19,6 @@ impl From> for Error { } } - struct TmpReceiver1; struct TmpReceiver2; @@ -159,22 +161,23 @@ impl AsyncHandler for TmpReceiver2 { async fn main() { let (b, poller) = Bus::build() .register(TmpReceiver1) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .register(TmpReceiver2) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) + .subscribe_async::(8, Default::default()) + .subscribe_async::(8, Default::default()) .done() .build(); println!( "{:?}", - b.request_we::<_, f64, Error>(1000f64, Default::default()).await + b.request_local_we::<_, f64, Error>(1000f64, Default::default()) + .await ); println!("flush"); diff --git a/examples/demo_slow.rs b/examples/demo_slow.rs index 1f01ca7..83d665b 100644 --- a/examples/demo_slow.rs +++ b/examples/demo_slow.rs @@ -1,10 +1,10 @@ -use messagebus::{Bus, Handler, Message, Module, error, receivers}; +use messagebus::{error, Bus, Handler, Message, Module}; use thiserror::Error; #[derive(Debug, Error)] enum Error { #[error("Error({0})")] - Error(anyhow::Error) + Error(anyhow::Error), } impl From> for Error { @@ -52,18 +52,16 @@ impl Handler for TmpReceiver { fn module() -> Module { Module::new() - .register(TmpReceiver) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .subscribe::, _, _>(8, Default::default()) - .done() + .register(TmpReceiver) + .subscribe_sync::(8, Default::default()) + .subscribe_sync::(8, Default::default()) + .subscribe_sync::(8, Default::default()) + .done() } #[tokio::main] async fn main() { - let (b, poller) = Bus::build() - .add_module(module()) - .build(); + let (b, poller) = Bus::build().add_module(module()).build(); b.send(32f32).await.unwrap(); b.send(11u16).await.unwrap(); diff --git a/examples/demo_sync_batch.rs b/examples/demo_sync_batch.rs index 21b7884..8c57999 100644 --- a/examples/demo_sync_batch.rs +++ b/examples/demo_sync_batch.rs @@ -1,13 +1,15 @@ use std::sync::Arc; use async_trait::async_trait; -use messagebus::{receivers, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Message, error, Bus}; +use messagebus::{ + error, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Bus, Message, +}; use thiserror::Error; #[derive(Debug, Error, Clone)] enum Error { #[error("Error({0})")] - Error(Arc) + Error(Arc), } impl From> for Error { @@ -22,8 +24,15 @@ struct TmpReceiver; impl AsyncBatchSynchronizedHandler for TmpReceiver { type Error = Error; type Response = (); + type InBatch = Vec; + type OutBatch = Vec<()>; - async fn handle(&mut self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { + + async fn handle( + &mut self, + msg: Vec, + _bus: &Bus, + ) -> Result, Self::Error> { println!("---> [i32; {}] {:?}", msg.len(), msg); Ok(vec![]) @@ -33,6 +42,8 @@ impl AsyncBatchSynchronizedHandler for TmpReceiver { impl BatchSynchronizedHandler for TmpReceiver { type Error = Error; type Response = (); + type InBatch = Vec; + type OutBatch = Vec<()>; fn handle(&mut self, msg: Vec, _bus: &Bus) -> Result, Self::Error> { println!("---> [i16; {}] {:?}", msg.len(), msg); @@ -44,8 +55,8 @@ impl BatchSynchronizedHandler for TmpReceiver { async fn main() { let (b, poller) = Bus::build() .register_unsync(TmpReceiver) - .subscribe::, _, _>(16, Default::default()) - .subscribe::, _, _>(16, Default::default()) + .subscribe_batch_async::(16, Default::default()) + .subscribe_batch_sync::(16, Default::default()) .done() .build(); diff --git a/examples/non_sync.rs b/examples/non_sync.rs index 78dfa62..552c644 100644 --- a/examples/non_sync.rs +++ b/examples/non_sync.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; -use messagebus::{receivers, AsyncSynchronizedHandler, Bus, Message, error, SynchronizedHandler}; +use messagebus::{error, receivers, AsyncSynchronizedHandler, Bus, Message, SynchronizedHandler}; use thiserror::Error; #[derive(Debug, Error)] enum Error { #[error("Error({0})")] - Error(anyhow::Error) + Error(anyhow::Error), } impl From> for Error { diff --git a/src/builder.rs b/src/builder.rs index 0c3c70c..7efac06 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -3,7 +3,7 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync use futures::{Future, FutureExt}; use tokio::sync::Mutex; -use crate::{Bus, BusInner, Message, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}}; +use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, SynchronizedHandler, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers}; pub trait ReceiverSubscriberBuilder: SendUntypedReceiver + SendTypedReceiver + ReciveTypedReceiver @@ -51,8 +51,14 @@ pub struct RegisterEntry { _m: PhantomData<(K, T)>, } -impl RegisterEntry - where F: FnMut(&mut B, (TypeId, Receiver), Box Pin + Send>>>, Box Pin + Send>>>), +impl RegisterEntry +where + F: FnMut( + &mut B, + (TypeId, Receiver), + Box Pin + Send>>>, + Box Pin + Send>>>, + ), { pub fn done(mut self) -> B { for (tid, v) in self.receivers { @@ -87,6 +93,42 @@ impl RegisterEntry { self } + + #[inline] + pub fn subscribe_sync(self, queue: u64, cfg: receivers::SynchronizedConfig) -> Self + where + T: SynchronizedHandler + Send + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } + + #[inline] + pub fn subscribe_async(self, queue: u64, cfg: receivers::SynchronizedConfig) -> Self + where + T: AsyncSynchronizedHandler + Send + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } + + #[inline] + pub fn subscribe_batch_sync(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self + where + T: BatchSynchronizedHandler + Send + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } + + #[inline] + pub fn subscribe_batch_async(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self + where + T: AsyncBatchSynchronizedHandler + Send + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } } impl RegisterEntry { @@ -109,8 +151,45 @@ impl RegisterEntry { self } -} + #[inline] + pub fn subscribe_sync(self, queue: u64, cfg: receivers::BufferUnorderedConfig) -> Self + where + T: Handler + Send + Sync + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } + + #[inline] + pub fn subscribe_async(self, queue: u64, cfg: receivers::BufferUnorderedConfig) -> Self + where + T: AsyncHandler + Send + Sync + 'static, + M: Message, + T::Response: Message, + T::Error: StdSyncSendError, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } + + #[inline] + pub fn subscribe_batch_sync(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self + where + T: BatchHandler + Send + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } + + #[inline] + pub fn subscribe_batch_async(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self + where + T: AsyncBatchHandler + Send + 'static, + M: Message, + { + self.subscribe::, T::Response, T::Error>(queue, cfg) + } +} pub struct Module { receivers: Vec<(TypeId, Receiver)>, @@ -125,7 +204,20 @@ impl Module { } } - pub fn register(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { + pub fn register( + self, + item: T, + ) -> RegisterEntry< + SyncEntry, + T, + impl FnMut( + &mut Self, + (TypeId, Receiver), + Box Pin + Send>>>, + Box Pin + Send>>>, + ), + Self, + > { RegisterEntry { item: Arc::new(item) as Untyped, payload: self, @@ -139,7 +231,20 @@ impl Module { } } - pub fn register_unsync(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { + pub fn register_unsync( + self, + item: T, + ) -> RegisterEntry< + UnsyncEntry, + T, + impl FnMut( + &mut Self, + (TypeId, Receiver), + Box Pin + Send>>>, + Box Pin + Send>>>, + ), + Self, + > { RegisterEntry { item: Arc::new(Mutex::new(item)) as Untyped, payload: self, @@ -153,9 +258,11 @@ impl Module { } } - fn extend(&mut self, other: Module) { - self.receivers.extend(other.receivers.into_iter()); - self.pollings.extend(other.pollings.into_iter()); + pub fn add_module(mut self, module: Module) -> Self { + self.receivers.extend(module.receivers); + self.pollings.extend(module.pollings); + + self } } @@ -170,7 +277,20 @@ impl BusBuilder { } } - pub fn register(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { + pub fn register( + self, + item: T, + ) -> RegisterEntry< + SyncEntry, + T, + impl FnMut( + &mut Self, + (TypeId, Receiver), + Box Pin + Send>>>, + Box Pin + Send>>>, + ), + Self, + > { RegisterEntry { item: Arc::new(item) as Untyped, payload: self, @@ -184,7 +304,20 @@ impl BusBuilder { } } - pub fn register_unsync(self, item: T) -> RegisterEntry Pin + Send>>>, Box Pin + Send>>>), Self> { + pub fn register_unsync( + self, + item: T, + ) -> RegisterEntry< + UnsyncEntry, + T, + impl FnMut( + &mut Self, + (TypeId, Receiver), + Box Pin + Send>>>, + Box Pin + Send>>>, + ), + Self, + > { RegisterEntry { item: Arc::new(Mutex::new(item)) as Untyped, payload: self, @@ -199,7 +332,7 @@ impl BusBuilder { } pub fn add_module(mut self, module: Module) -> Self { - self.inner.extend(module); + self.inner = self.inner.add_module(module); self } diff --git a/src/envelop.rs b/src/envelop.rs index 6edcf0c..bcc1b89 100644 --- a/src/envelop.rs +++ b/src/envelop.rs @@ -1,6 +1,45 @@ -use core::any::Any; +use core::any::{Any, type_name}; use core::fmt; -// use erased_serde::{Deserializer, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; -pub trait Message: Any + fmt::Debug + Unpin + Send + Sync + 'static {} -impl Message for T {} +pub trait Message: + fmt::Debug + Unpin + Send + Sync + 'static +{ + fn type_name(&self) -> &str; +} + +impl Message for T { + fn type_name(&self) -> &str { + type_name::() + } +} + +pub trait TransferableMessage: Message + Serialize + DeserializeOwned +{ + fn into_boxed(self) -> BoxedMessage; +} +impl TransferableMessage for T { + fn into_boxed(self) -> BoxedMessage { + BoxedMessage(Box::new(self) as _) + } +} + +pub trait SafeMessage: + Any + fmt::Debug + erased_serde::Serialize + Unpin + Send + Sync + 'static +{ + fn type_name(&self) -> &str; +} +impl SafeMessage for T { + fn type_name(&self) -> &str { + type_name::() + } +} + +#[derive(Debug)] +pub struct BoxedMessage(Box); + +impl From for BoxedMessage { + fn from(m: M) -> Self { + BoxedMessage(Box::new(m)) + } +} diff --git a/src/error.rs b/src/error.rs index 07054ab..a117e60 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,19 +1,18 @@ -use core::panic; +use core::fmt; use thiserror::Error; use tokio::sync::oneshot; -use crate::Message; +use crate::{Message, envelop::{BoxedMessage, TransferableMessage}}; pub trait StdSyncSendError: std::error::Error + Send + Sync + Unpin + 'static {} impl StdSyncSendError for T {} - #[derive(Debug, Error)] pub enum VoidError {} #[derive(Debug, Error)] -pub enum SendError { +pub enum SendError { #[error("Closed")] Closed(M), @@ -21,8 +20,17 @@ pub enum SendError { Full(M), } +impl SendError { + pub fn into_boxed(self) -> SendError { + match self { + SendError::Closed(m) => SendError::Closed(BoxedMessage::from(m)), + SendError::Full(m) => SendError::Closed(BoxedMessage::from(m)), + } + } +} + #[derive(Debug, Error)] -pub enum Error { +pub enum Error { #[error("Message Send Error: {0}")] SendError(#[from] SendError), @@ -35,6 +43,9 @@ pub enum Error { #[error("Other({0})")] Other(E), + #[error("Serialization({0})")] + Serialization(#[from] erased_serde::Error), + #[error("Other({0})")] OtherBoxed(Box), } @@ -45,28 +56,31 @@ impl Error { Error::SendError(inner) => Error::SendError(inner), Error::NoResponse => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers, + Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _), Error::OtherBoxed(inner) => Error::OtherBoxed(inner), } - } - + } + pub fn map> + StdSyncSendError>(self) -> Error { match self { Error::SendError(inner) => Error::SendError(inner), Error::NoResponse => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers, + Error::Serialization(s) => Error::Serialization(s), Error::Other(_) => panic!("expected boxed error!"), Error::OtherBoxed(inner) => Error::Other(inner.into()), } } } -impl Error<(), E> { +impl Error<(), E> { pub fn specify(self) -> Error { match self { Error::SendError(_) => panic!("cannot specify type on typed error"), Error::NoResponse => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers, + Error::Serialization(s) => Error::Serialization(s), Error::Other(inner) => Error::Other(inner), Error::OtherBoxed(inner) => Error::OtherBoxed(inner), } @@ -77,4 +91,4 @@ impl From for Error< fn from(_: oneshot::error::RecvError) -> Self { Error::NoResponse } -} \ No newline at end of file +} diff --git a/src/handler.rs b/src/handler.rs index 0a1a945..8a7e77d 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,4 +1,6 @@ -use crate::{Bus, Message, error::StdSyncSendError}; +use std::iter::FromIterator; + +use crate::{error::StdSyncSendError, Bus, Message}; use async_trait::async_trait; pub trait Handler: Send + Sync { @@ -46,8 +48,10 @@ pub trait AsyncSynchronizedHandler: Send { pub trait BatchHandler: Send + Sync { type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - fn handle(&self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result; fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -57,8 +61,10 @@ pub trait BatchHandler: Send + Sync { pub trait AsyncBatchHandler: Send + Sync { type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - async fn handle(&self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result; async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -67,8 +73,10 @@ pub trait AsyncBatchHandler: Send + Sync { pub trait BatchSynchronizedHandler: Send { type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - fn handle(&mut self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -78,8 +86,10 @@ pub trait BatchSynchronizedHandler: Send { pub trait AsyncBatchSynchronizedHandler: Send { type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - async fn handle(&mut self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -109,8 +119,10 @@ pub trait LocalAsyncHandler { pub trait LocalBatchHandler { type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - fn handle(&mut self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } @@ -120,8 +132,10 @@ pub trait LocalBatchHandler { pub trait LocalAsyncBatchHandler { type Error: StdSyncSendError + Clone; type Response: Message; + type InBatch: FromIterator + Send; + type OutBatch: IntoIterator + Send; - async fn handle(&mut self, msg: Vec, bus: &Bus) -> Result, Self::Error>; + async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result; async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 823fcb0..7f6d6b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,15 +5,18 @@ mod handler; mod receiver; pub mod receivers; mod trait_object; +pub mod relay; #[macro_use] extern crate log; use crate::receiver::Permit; use builder::BusBuilder; -use core::any::{Any, TypeId}; pub use builder::Module; -pub use envelop::Message; +pub use relay::RelayTrait; +use core::any::{Any, TypeId}; +pub use envelop::{BoxedMessage, TransferableMessage, Message}; +use error::{Error, SendError, StdSyncSendError}; pub use handler::*; use receiver::Receiver; use smallvec::SmallVec; @@ -25,7 +28,6 @@ use std::{ }, }; use tokio::sync::oneshot; -use error::{Error, SendError, StdSyncSendError}; pub type Untyped = Arc; @@ -134,11 +136,11 @@ impl BusInner { } #[inline] - pub fn try_send(&self, msg: M) -> Result<(), Error> { + pub fn try_send(&self, msg: M) -> Result<(), Error> { self.try_send_ext(msg, SendOptions::Broadcast) } - pub fn try_send_ext( + pub fn try_send_ext( &self, msg: M, _options: SendOptions, @@ -163,13 +165,13 @@ impl BusInner { while counter < total { let (p, r) = iter.next().unwrap(); - let _ = r.send(mid, p, msg.clone()); + let _ = r.send(mid, msg.clone(), p); counter += 1; } if let Some((p, r)) = iter.next() { - let _ = r.send(mid, p, msg); + let _ = r.send(mid, msg, p); return Ok(()); } } @@ -183,12 +185,12 @@ impl BusInner { } #[inline] - pub fn send_blocking(&self, msg: M) -> Result<(), Error> { + pub fn send_blocking(&self, msg: M) -> Result<(), Error> { self.send_blocking_ext(msg, SendOptions::Broadcast) } #[inline] - pub fn send_blocking_ext( + pub fn send_blocking_ext( &self, msg: M, options: SendOptions, @@ -197,11 +199,11 @@ impl BusInner { } #[inline] - pub async fn send(&self, msg: M) -> core::result::Result<(), Error> { + pub async fn send(&self, msg: M) -> core::result::Result<(), Error> { Ok(self.send_ext(msg, SendOptions::Broadcast).await?) } - pub async fn send_ext( + pub async fn send_ext( &self, msg: M, _options: SendOptions, @@ -216,10 +218,10 @@ impl BusInner { if let Some(rs) = self.receivers.get(&tid) { if let Some((last, head)) = rs.split_last() { for r in head { - let _ = r.send(mid, r.reserve().await, msg.clone()); + let _ = r.send(mid, msg.clone(), r.reserve().await); } - let _ = last.send(mid, last.reserve().await, msg); + let _ = last.send(mid, msg, last.reserve().await); return Ok(()); } @@ -234,11 +236,11 @@ impl BusInner { } #[inline] - pub fn force_send(&self, msg: M) -> Result<(), Error> { + pub fn force_send(&self, msg: M) -> Result<(), Error> { self.force_send_ext(msg, SendOptions::Broadcast) } - pub fn force_send_ext( + pub fn force_send_ext( &self, msg: M, _options: SendOptions, @@ -271,7 +273,7 @@ impl BusInner { } #[inline] - pub fn try_send_one(&self, msg: M) -> Result<(), Error> { + pub fn try_send_one(&self, msg: M) -> Result<(), Error> { if self.closed.load(Ordering::SeqCst) { return Err(SendError::Closed(msg).into()); } @@ -279,20 +281,20 @@ impl BusInner { let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid).and_then(|rs|rs.first()) { + if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) { let permits = if let Some(x) = rs.try_reserve() { x } else { return Err(SendError::Full(msg).into()); }; - Ok(rs.send(mid, permits, msg)?) + Ok(rs.send(mid, msg, permits)?) } else { Err(Error::NoReceivers) } } - pub async fn send_one(&self, msg: M) -> Result<(), Error> { + pub async fn send_one(&self, msg: M) -> Result<(), Error> { if self.closed.load(Ordering::SeqCst) { return Err(SendError::Closed(msg).into()); } @@ -300,19 +302,34 @@ impl BusInner { let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let tid = TypeId::of::(); - if let Some(rs) = self.receivers.get(&tid).and_then(|rs|rs.first()) { - Ok(rs.send(mid, rs.reserve().await, msg)?) + if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) { + Ok(rs.send(mid, msg, rs.reserve().await)?) + } else { + Err(Error::NoReceivers) + } + } + + pub async fn send_local_one(&self, msg: M) -> Result<(), Error> { + if self.closed.load(Ordering::SeqCst) { + return Err(SendError::Closed(msg).into()); + } + + let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); + let tid = TypeId::of::(); + + if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) { + Ok(rs.send(mid, msg, rs.reserve().await)?) } else { Err(Error::NoReceivers) } } #[inline] - pub fn send_one_blocking(&self, msg: M) -> Result<(), Error> { + pub fn send_one_blocking(&self, msg: M) -> Result<(), Error> { futures::executor::block_on(self.send_one(msg)) } - pub async fn request( + pub async fn request( &self, req: M, options: SendOptions, @@ -324,23 +341,19 @@ impl BusInner { if let Some(rc) = iter.next() { let (tx, rx) = oneshot::channel(); let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; - rc.send(mid, rc.reserve().await, req)?; + rc.send(mid, req, rc.reserve().await)?; - rx.await?.map_err(|x|x.specify::()) + rx.await?.map_err(|x| x.specify::()) } else { Err(Error::NoReceivers) } } - pub async fn request_we( - &self, - req: M, - options: SendOptions, - ) -> Result> - where - M: Message, - R: Message, - E: StdSyncSendError + pub async fn request_local_we(&self, req: M, options: SendOptions) -> Result> + where + M: Message, + R: Message, + E: StdSyncSendError, { let tid = TypeId::of::(); let rid = TypeId::of::(); @@ -350,10 +363,9 @@ impl BusInner { if let Some(rc) = iter.next() { let (tx, rx) = oneshot::channel(); let mid = (rc.add_response_waiter_we(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; - rc.send(mid, rc.reserve().await, req)?; + rc.send(mid, req, rc.reserve().await)?; - rx.await? - .map_err(|x|x.specify::()) + rx.await?.map_err(|x| x.specify::()) } else { Err(Error::NoReceivers) } diff --git a/src/receiver.rs b/src/receiver.rs index e8ee431..a641405 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,4 +1,4 @@ -use crate::{Bus, Error, Message, error::{SendError, StdSyncSendError}, trait_object::TraitObject}; +use crate::{Bus, Error, Message, envelop::{BoxedMessage, TransferableMessage}, error::{SendError, StdSyncSendError}, trait_object::TraitObject}; use core::{ any::TypeId, fmt, @@ -7,6 +7,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; +use erased_serde::Deserializer; use futures::future::poll_fn; use futures::Future; use std::{ @@ -52,6 +53,10 @@ pub trait ReceiverTrait: Send + Sync { fn flush(&self) -> Result<(), Error>; } +pub trait TransferableReceiverTrait: Send + Sync { + fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error>; +} + pub trait ReceiverPollerBuilder { fn build(bus: Bus) -> Box>; } @@ -141,6 +146,19 @@ where } } +impl TransferableReceiverTrait for ReceiverWrapper +where + M: TransferableMessage, + R: TransferableMessage, + E: StdSyncSendError, + S: SendUntypedReceiver + SendTypedReceiver + ReciveTypedReceiver + 'static, +{ + fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error> { + unimplemented!() + } +} + + pub struct Permit { pub(crate) fuse: bool, pub(crate) inner: Arc, @@ -301,8 +319,16 @@ impl Receiver { inner, _m: Default::default(), }), - waiters: Arc::new(sharded_slab::Slab::>>>::new_with_config::()), - waiters_void: Arc::new(sharded_slab::Slab::>>>::new_with_config::()), + waiters: Arc::new( + sharded_slab::Slab::>>>::new_with_config::< + SlabCfg, + >(), + ), + waiters_void: Arc::new( + sharded_slab::Slab::>>>::new_with_config::< + SlabCfg, + >(), + ), } } @@ -374,8 +400,8 @@ impl Receiver { pub fn send( &self, mid: u64, - mut permit: Permit, msg: M, + mut permit: Permit, ) -> Result<(), SendError> { let any_receiver = self.inner.typed(); let receiver = any_receiver.dyn_typed_receiver::(); @@ -416,13 +442,13 @@ impl Receiver { let waiters = self .waiters .clone() - .downcast::>>>>() + .downcast::>>>>() .unwrap(); let waiters_void = self .waiters_void .clone() - .downcast::>>>>() + .downcast::>>>>() .unwrap(); Box::new(move |_| { @@ -448,13 +474,13 @@ impl Receiver { error!("Response cannot be processed!"); } } else if let Some(waiter) = waiters_void.take(mid as usize) { - if waiter.send(resp.map_err(|x|x.into_dyn())).is_err() { + if waiter.send(resp.map_err(|x| x.into_dyn())).is_err() { error!("Response cannot be processed!"); } } else if TypeId::of::() != TypeId::of::<()>() { warn!("Non-void response has no waiters!"); } - }, + } _ => unimplemented!(), } @@ -491,6 +517,19 @@ impl Receiver { Some(idx) } + // #[inline] + // pub(crate) fn add_response_waiter_dyn( + // &self, + // waiter: oneshot::Sender>>, + // ) -> Option { + // let idx = self + // .waiters + // .downcast_ref::>>>>() + // .unwrap() + // .insert(waiter)?; + + // Some(idx) + // } #[inline] pub async fn close(&self) { diff --git a/src/receivers/buffer_unordered/async.rs b/src/receivers/buffer_unordered/async.rs index 1dbed45..c68b70a 100644 --- a/src/receivers/buffer_unordered/async.rs +++ b/src/receivers/buffer_unordered/async.rs @@ -8,10 +8,10 @@ use std::{ }; use crate::{ - buffer_unordered_poller_macro, + buffer_unordered_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, StdSyncSendError, SendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers::{fix_type, Request}, AsyncHandler, Bus, Message, Untyped, }; diff --git a/src/receivers/buffer_unordered/mod.rs b/src/receivers/buffer_unordered/mod.rs index 4ac994b..50f5bef 100644 --- a/src/receivers/buffer_unordered/mod.rs +++ b/src/receivers/buffer_unordered/mod.rs @@ -5,6 +5,7 @@ use std::sync::atomic::AtomicU64; pub use r#async::BufferUnorderedAsync; pub use sync::BufferUnorderedSync; +use serde_derive::{Serialize, Deserialize}; #[derive(Debug)] pub struct BufferUnorderedStats { @@ -14,7 +15,7 @@ pub struct BufferUnorderedStats { pub parallel_total: AtomicU64, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] pub struct BufferUnorderedConfig { pub buffer_size: usize, pub max_parallel: usize, @@ -96,7 +97,8 @@ macro_rules! buffer_unordered_poller_macro { Poll::Pending => return Poll::Pending, Poll::Ready(Some((mid, resp))) => { let resp: Result<_, $t::Error> = resp; - stx.send(Event::Response(mid, resp.map_err(Error::Other))).ok(); + stx.send(Event::Response(mid, resp.map_err(Error::Other))) + .ok(); } Poll::Ready(None) => break, } @@ -114,7 +116,8 @@ macro_rules! buffer_unordered_poller_macro { Poll::Pending => return Poll::Pending, Poll::Ready(resp) => { let resp: Result<_, E> = resp; - stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); + stx.send(Event::Synchronized(resp.map_err(Error::Other))) + .ok(); } } need_sync = false; diff --git a/src/receivers/buffer_unordered/sync.rs b/src/receivers/buffer_unordered/sync.rs index d395245..49b887e 100644 --- a/src/receivers/buffer_unordered/sync.rs +++ b/src/receivers/buffer_unordered/sync.rs @@ -7,15 +7,15 @@ use std::{ task::{Context, Poll}, }; +use super::{BufferUnorderedConfig, BufferUnorderedStats}; use crate::{ - buffer_unordered_poller_macro, + buffer_unordered_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, StdSyncSendError, SendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers::{fix_type, Request}, Bus, Handler, Message, Untyped, }; -use super::{BufferUnorderedConfig, BufferUnorderedStats}; use futures::{stream::FuturesUnordered, Future, StreamExt}; use parking_lot::Mutex; diff --git a/src/receivers/buffer_unordered_batched/async.rs b/src/receivers/buffer_unordered_batched/async.rs index 3cc6a00..872d06f 100644 --- a/src/receivers/buffer_unordered_batched/async.rs +++ b/src/receivers/buffer_unordered_batched/async.rs @@ -7,7 +7,14 @@ use std::{ task::{Context, Poll}, }; -use crate::{AsyncBatchHandler, Bus, Message, Untyped, buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receivers::{fix_type, Request}}; +use crate::{ + buffer_unordered_batch_poller_macro, + builder::ReceiverSubscriberBuilder, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receivers::{fix_type, Request}, + AsyncBatchHandler, Bus, Message, Untyped, +}; use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; use futures::{stream::FuturesUnordered, Future, StreamExt}; @@ -39,7 +46,8 @@ where srx: Mutex>>, } -impl ReceiverSubscriberBuilder for BufferUnorderedBatchedAsync +impl ReceiverSubscriberBuilder + for BufferUnorderedBatchedAsync where T: AsyncBatchHandler + 'static, T::Error: StdSyncSendError + Clone, diff --git a/src/receivers/buffer_unordered_batched/mod.rs b/src/receivers/buffer_unordered_batched/mod.rs index 904071d..bcec33c 100644 --- a/src/receivers/buffer_unordered_batched/mod.rs +++ b/src/receivers/buffer_unordered_batched/mod.rs @@ -5,6 +5,7 @@ use std::sync::atomic::AtomicU64; pub use r#async::BufferUnorderedBatchedAsync; pub use sync::BufferUnorderedBatchedSync; +use serde_derive::{Serialize, Deserialize}; #[derive(Debug)] pub struct BufferUnorderedBatchedStats { @@ -16,7 +17,7 @@ pub struct BufferUnorderedBatchedStats { pub batch_size: AtomicU64, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] pub struct BufferUnorderedBatchedConfig { pub buffer_size: usize, pub max_parallel: usize, @@ -145,7 +146,11 @@ macro_rules! buffer_unordered_batch_poller_macro { } Err(er) => { for mid in mids { - stx.send(Event::Response(mid, Err(Error::Other(er.clone())))).ok(); + stx.send(Event::Response( + mid, + Err(Error::Other(er.clone())), + )) + .ok(); } } }, @@ -165,7 +170,8 @@ macro_rules! buffer_unordered_batch_poller_macro { Poll::Pending => return Poll::Pending, Poll::Ready(resp) => { let resp: Result<_, $t::Error> = resp; - stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); + stx.send(Event::Synchronized(resp.map_err(Error::Other))) + .ok(); } } need_sync = false; diff --git a/src/receivers/buffer_unordered_batched/sync.rs b/src/receivers/buffer_unordered_batched/sync.rs index d6d1049..ca57b20 100644 --- a/src/receivers/buffer_unordered_batched/sync.rs +++ b/src/receivers/buffer_unordered_batched/sync.rs @@ -7,8 +7,15 @@ use std::{ task::{Context, Poll}, }; -use crate::{BatchHandler, Bus, Message, Untyped, buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receivers::{fix_type, Request}}; use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; +use crate::{ + buffer_unordered_batch_poller_macro, + builder::ReceiverSubscriberBuilder, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receivers::{fix_type, Request}, + BatchHandler, Bus, Message, Untyped, +}; use futures::{stream::FuturesUnordered, Future, StreamExt}; use parking_lot::Mutex; @@ -48,7 +55,8 @@ where srx: Mutex>>, } -impl ReceiverSubscriberBuilder for BufferUnorderedBatchedSync +impl ReceiverSubscriberBuilder + for BufferUnorderedBatchedSync where T: BatchHandler + 'static, T::Error: StdSyncSendError, diff --git a/src/receivers/mod.rs b/src/receivers/mod.rs index ea33707..8304744 100644 --- a/src/receivers/mod.rs +++ b/src/receivers/mod.rs @@ -28,6 +28,11 @@ where Pin::new_unchecked(x) } +#[inline(always)] +pub(crate) fn fix_into_iter + Send>(x: T) -> impl IntoIterator + Send { + x +} + pub(crate) enum Request { Action(Action), Request(u64, M), diff --git a/src/receivers/synchronize_batched/async.rs b/src/receivers/synchronize_batched/async.rs index 5c314e2..2138e83 100644 --- a/src/receivers/synchronize_batched/async.rs +++ b/src/receivers/synchronize_batched/async.rs @@ -6,10 +6,10 @@ use std::{ use crate::{ batch_synchronized_poller_macro, - error::{Error, SendError, StdSyncSendError}, builder::ReceiverSubscriberBuilder, - receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, - receivers::{fix_type, Request}, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receivers::{fix_type, fix_into_iter, Request}, AsyncBatchSynchronizedHandler, Bus, Message, Untyped, }; @@ -40,7 +40,8 @@ where srx: parking_lot::Mutex>>, } -impl ReceiverSubscriberBuilder for SynchronizedBatchedAsync +impl ReceiverSubscriberBuilder + for SynchronizedBatchedAsync where T: AsyncBatchSynchronizedHandler + 'static, T::Error: StdSyncSendError + Clone, @@ -62,9 +63,8 @@ where let poller = Box::new(move |ut| { Box::new(move |bus| { - Box::pin(batch_synchronized_poller::( - rx, bus, ut, cfg, stx, - )) as Pin + Send>> + Box::pin(batch_synchronized_poller::(rx, bus, ut, cfg, stx)) + as Pin + Send>> }) as Box Pin + Send>>> }); diff --git a/src/receivers/synchronize_batched/mod.rs b/src/receivers/synchronize_batched/mod.rs index 8608307..7bd35a4 100644 --- a/src/receivers/synchronize_batched/mod.rs +++ b/src/receivers/synchronize_batched/mod.rs @@ -5,6 +5,8 @@ use std::sync::atomic::AtomicU64; pub use r#async::SynchronizedBatchedAsync; pub use sync::SynchronizedBatchedSync; +use serde_derive::{Serialize, Deserialize}; + #[derive(Debug)] pub struct SynchronizedBatchedStats { @@ -14,7 +16,7 @@ pub struct SynchronizedBatchedStats { pub batch_size: AtomicU64, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] pub struct SynchronizedBatchedConfig { pub buffer_size: usize, pub batch_size: usize, @@ -65,7 +67,7 @@ macro_rules! batch_synchronized_poller_macro { Poll::Ready((mids, res)) => match res { Ok(re) => { let mids: Vec = mids; - let re: Vec = re; + let re = fix_into_iter::(re); let mut mids = mids.into_iter(); let mut re = re.into_iter(); @@ -74,11 +76,7 @@ macro_rules! batch_synchronized_poller_macro { if let Some(r) = re.next() { stx.send(Event::Response(mid, Ok(r))).ok(); } else { - stx.send(Event::Response( - mid, - Err(Error::NoResponse), - )) - .ok(); + stx.send(Event::Response(mid, Err(Error::NoResponse))).ok(); } } } @@ -86,10 +84,11 @@ macro_rules! batch_synchronized_poller_macro { Err(er) => { let er: $t::Error = er; for mid in mids { - stx.send(Event::Response(mid, Err(Error::Other(er.clone())))).ok(); + stx.send(Event::Response(mid, Err(Error::Other(er.clone())))) + .ok(); } } - } + }, } } handle_future = None; @@ -159,7 +158,8 @@ macro_rules! batch_synchronized_poller_macro { Poll::Ready(resp) => { need_sync = false; let resp: Result<_, $t::Error> = resp; - stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); + stx.send(Event::Synchronized(resp.map_err(Error::Other))) + .ok(); } } sync_future = None; diff --git a/src/receivers/synchronize_batched/sync.rs b/src/receivers/synchronize_batched/sync.rs index 8e06607..fd8c7dd 100644 --- a/src/receivers/synchronize_batched/sync.rs +++ b/src/receivers/synchronize_batched/sync.rs @@ -5,11 +5,11 @@ use std::{ }; use crate::{ - batch_synchronized_poller_macro, + batch_synchronized_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, StdSyncSendError, SendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, - receivers::{fix_type, Request}, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receivers::{fix_type, fix_into_iter, Request}, BatchSynchronizedHandler, Bus, Message, Untyped, }; @@ -44,7 +44,8 @@ where srx: parking_lot::Mutex>>, } -impl ReceiverSubscriberBuilder for SynchronizedBatchedSync +impl ReceiverSubscriberBuilder + for SynchronizedBatchedSync where T: BatchSynchronizedHandler + 'static, T::Error: StdSyncSendError, @@ -66,9 +67,8 @@ where let poller = Box::new(move |ut| { Box::new(move |bus| { - Box::pin(batch_synchronized_poller::( - rx, bus, ut, cfg, stx, - )) as Pin + Send>> + Box::pin(batch_synchronized_poller::(rx, bus, ut, cfg, stx)) + as Pin + Send>> }) as Box Pin + Send>>> }); diff --git a/src/receivers/synchronized/async.rs b/src/receivers/synchronized/async.rs index 586799c..73f6600 100644 --- a/src/receivers/synchronized/async.rs +++ b/src/receivers/synchronized/async.rs @@ -9,9 +9,9 @@ use futures::Future; use super::SynchronizedConfig; use crate::{ - error::{Error, StdSyncSendError, SendError}, builder::ReceiverSubscriberBuilder, - receiver::{SendTypedReceiver, Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers::{fix_type, Request}, AsyncSynchronizedHandler, Bus, Message, Untyped, }; diff --git a/src/receivers/synchronized/mod.rs b/src/receivers/synchronized/mod.rs index a3039dc..8212368 100644 --- a/src/receivers/synchronized/mod.rs +++ b/src/receivers/synchronized/mod.rs @@ -5,6 +5,7 @@ use std::sync::atomic::AtomicU64; pub use r#async::SynchronizedAsync; pub use sync::SynchronizedSync; +use serde_derive::{Serialize, Deserialize}; #[derive(Debug)] pub struct SynchronizedStats { @@ -12,7 +13,7 @@ pub struct SynchronizedStats { pub buffer_total: AtomicU64, } -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct SynchronizedConfig { pub buffer_size: usize, } @@ -51,7 +52,8 @@ macro_rules! synchronized_poller_macro { Poll::Pending => return Poll::Pending, Poll::Ready((mid, resp)) => { let resp: Result<_, $t::Error> = resp; - stx.send(Event::Response(mid, resp.map_err(Error::Other))).ok(); + stx.send(Event::Response(mid, resp.map_err(Error::Other))) + .ok(); } } } @@ -91,7 +93,8 @@ macro_rules! synchronized_poller_macro { Poll::Ready(resp) => { need_sync = false; let resp: Result<_, $t::Error> = resp; - stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); + stx.send(Event::Synchronized(resp.map_err(Error::Other))) + .ok(); } } sync_future = None; diff --git a/src/receivers/synchronized/sync.rs b/src/receivers/synchronized/sync.rs index 676b5e8..8c0510b 100644 --- a/src/receivers/synchronized/sync.rs +++ b/src/receivers/synchronized/sync.rs @@ -5,15 +5,15 @@ use std::{ }; use crate::synchronized_poller_macro; -use futures::{Future, executor::block_on}; +use futures::{executor::block_on, Future}; use super::SynchronizedConfig; use crate::{ - error::{Error, StdSyncSendError, SendError}, builder::ReceiverSubscriberBuilder, - receiver::{SendTypedReceiver, Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, + error::{Error, SendError, StdSyncSendError}, + receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers::{fix_type, Request}, - SynchronizedHandler, Bus, Message, Untyped, + Bus, Message, SynchronizedHandler, Untyped, }; use tokio::sync::{mpsc, Mutex}; diff --git a/src/relay.rs b/src/relay.rs new file mode 100644 index 0000000..1e28821 --- /dev/null +++ b/src/relay.rs @@ -0,0 +1,122 @@ +use std::{any::TypeId, borrow::Cow, collections::HashMap, sync::atomic::{AtomicU64, Ordering}}; + +use tokio::sync::oneshot::Sender; +use sharded_slab::Slab; + +use crate::{Bus, Message, envelop::SafeMessage, error::{Error, SendError}, receiver::Permit}; + +pub trait RelayTrait { + // fn handle_message(&self, mid: u64, msg: &dyn SafeMessage, tx: Option>, bus: &Bus); + fn start_relay(&self, bus: &Bus) -> Result<(), Error> ; + fn stop_relay(&self); +} + +pub struct Relay { + in_map: HashMap, TypeId>, + out_map: HashMap>, + // waiters: Slab>>>, + queue_size: AtomicU64, + limit: u64, +} + +impl Relay { + // pub async fn reserve(&self) -> Permit { + // loop { + // let count = self.queue_size.load(Ordering::Relaxed); + // if count < self.limit { + // let res = self.processing.compare_exchange( + // count, + // count + 1, + // Ordering::SeqCst, + // Ordering::SeqCst, + // ); + // if res.is_ok() { + // break Permit { + // fuse: false, + // inner: self.context.clone(), + // }; + // } + + // // continue + // } else { + // self.response.notified().await + // } + // } + // } + + // pub fn try_reserve(&self) -> Option { + // loop { + // let count = self.processing.load(Ordering::Relaxed); + + // if count < self.limit { + // let res = self.processing.compare_exchange( + // count, + // count + 1, + // Ordering::SeqCst, + // Ordering::SeqCst, + // ); + // if res.is_ok() { + // break Some(Permit { + // fuse: false, + // inner: self.context.clone(), + // }); + // } + + // // continue + // } else { + // break None; + // } + // } + // } + + // #[inline] + // pub fn send( + // &self, + // mid: u64, + // msg: M, + // mut permit: Permit, + // ) -> Result<(), SendError> { + // unimplemented!() + // } + + // #[inline] + // pub fn force_send(&self, mid: u64, msg: M) -> Result<(), SendError> { + // unimplemented!() + // } + + // #[inline] + // pub fn need_flush(&self) -> bool { + // self.context.need_flush.load(Ordering::SeqCst) + // } + + // #[inline] + // pub async fn close(&self) { + // let notified = self.context.closed.notified(); + // if self.inner.close().is_ok() { + // notified.await; + // } else { + // warn!("close failed!"); + // } + // } + + // #[inline] + // pub async fn sync(&self) { + // let notified = self.context.synchronized.notified(); + // if self.inner.sync().is_ok() { + // notified.await + // } else { + // warn!("sync failed!"); + // } + // } + + // #[inline] + // pub async fn flush(&self) { + // let notified = self.context.flushed.notified(); + // if self.inner.flush().is_ok() { + // notified.await; + // self.context.need_flush.store(false, Ordering::SeqCst); + // } else { + // warn!("flush failed!"); + // } + // } +} \ No newline at end of file