Generalize over batch type; subscribe_{sync|async} methods; relays begin

This commit is contained in:
Andrey Tkachenko 2021-07-02 20:21:53 +04:00
parent 308939801b
commit 76cc57e7ae
31 changed files with 686 additions and 194 deletions

View File

@ -1,4 +1,19 @@
## MewssageBus changelog ## MewssageBus pending changes:
### 0.8.0
#### new features:
* Generator Handlers
* Message Masking
### 0.7.0
#### new features:
* Bus scopes (`enter` and `leave` methods) instead clone
* Bus relays. Connect other message bus by IP address
#### breaking changes:
* Batched handlers now require `InBatch` and `OutBatch` types
## MewssageBus changelog:
### 0.6.5 ### 0.6.5
#### new features: #### new features:

View File

@ -19,6 +19,9 @@ smallvec = "1.6.1"
log = "0.4.14" log = "0.4.14"
sharded-slab = "0.1.1" sharded-slab = "0.1.1"
thiserror = "1.0.25" thiserror = "1.0.25"
erased-serde = "0.3.16"
serde = "1.0.126"
serde_derive = "1.0.126"
[dev-dependencies] [dev-dependencies]
anyhow = "1.0.41" anyhow = "1.0.41"

View File

@ -11,7 +11,7 @@ Inspired by Actix
### Basics ### Basics
1. Can deliver messages between actors using receivers (usually a queue implementations) 1. Can deliver messages between actors using receivers (usually a queue implementations)
2. Messages distincts and delivers by TypeId 2. Messages distincts and delivers by TypeId
3. Messages delivers in a broadcast fashion to many receivers (Cloned) 3. Messages delivers ether in a broadcast fashion to many receivers (Cloned) or addressed by recevier id, balanced (depends on queue load) or random
4. There are different kind of receivers implemented: 4. There are different kind of receivers implemented:
- BufferUnordered Receiver (sync and async) - BufferUnordered Receiver (sync and async)
- Synchronized (sync and async) - Synchronized (sync and async)
@ -21,9 +21,8 @@ Inspired by Actix
Here are the list of implmented handler kinds: Here are the list of implmented handler kinds:
```rust ```rust
pub trait Handler<M: Message>: Send + Sync { pub trait Handler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -34,7 +33,7 @@ pub trait Handler<M: Message>: Send + Sync {
#[async_trait] #[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync { pub trait AsyncHandler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -44,7 +43,7 @@ pub trait AsyncHandler<M: Message>: Send + Sync {
} }
pub trait SynchronizedHandler<M: Message>: Send { pub trait SynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -55,7 +54,7 @@ pub trait SynchronizedHandler<M: Message>: Send {
#[async_trait] #[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send { pub trait AsyncSynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -65,10 +64,12 @@ pub trait AsyncSynchronizedHandler<M: Message>: Send {
} }
pub trait BatchHandler<M: Message>: Send + Sync { pub trait BatchHandler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -76,20 +77,24 @@ pub trait BatchHandler<M: Message>: Send + Sync {
#[async_trait] #[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync { pub trait AsyncBatchHandler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
} }
pub trait BatchSynchronizedHandler<M: Message>: Send { pub trait BatchSynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -97,10 +102,12 @@ pub trait BatchSynchronizedHandler<M: Message>: Send {
#[async_trait] #[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send { pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }

View File

@ -1,11 +1,11 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{AsyncHandler, Bus, Handler, Message, error, receivers}; use messagebus::{error, AsyncHandler, Bus, Handler, Message};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(anyhow::Error) Error(anyhow::Error),
} }
impl<M: Message> From<error::Error<M>> for Error { impl<M: Message> From<error::Error<M>> for Error {
@ -152,15 +152,15 @@ async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<f32>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<u16>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<u32>(8, Default::default())
.subscribe::<i32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<i32>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<i16>(8, Default::default())
.done() .done()
.register(TmpReceiver2) .register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<i32>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default()) .subscribe_sync::<i16>(8, Default::default())
.done() .done()
.build(); .build();

View File

@ -1,11 +1,11 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{receivers, AsyncHandler, Message, error, Bus}; use messagebus::{error, receivers, AsyncHandler, Bus, Message};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(anyhow::Error) Error(anyhow::Error),
} }
impl<M: Message> From<error::Error<M>> for Error { impl<M: Message> From<error::Error<M>> for Error {
@ -35,13 +35,7 @@ impl AsyncHandler<f32> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_, _, _>, _, _>( .subscribe_async::<f32>(1, receivers::BufferUnorderedConfig { buffer_size: 1, max_parallel: 1 })
1,
receivers::BufferUnorderedConfig {
buffer_size: 1,
max_parallel: 1,
},
)
.done() .done()
.build(); .build();

View File

@ -1,13 +1,13 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{receivers, AsyncBatchHandler, BatchHandler, Message, error, Bus}; use messagebus::{error, AsyncBatchHandler, BatchHandler, Bus, Message};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error, Clone)] #[derive(Debug, Error, Clone)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(Arc<anyhow::Error>) Error(Arc<anyhow::Error>),
} }
impl<M: Message> From<error::Error<M>> for Error { impl<M: Message> From<error::Error<M>> for Error {
@ -21,6 +21,8 @@ struct TmpReceiver;
impl AsyncBatchHandler<i32> for TmpReceiver { impl AsyncBatchHandler<i32> for TmpReceiver {
type Error = Error; type Error = Error;
type Response = (); type Response = ();
type InBatch = Vec<i32>;
type OutBatch = Vec<()>;
async fn handle(&self, msg: Vec<i32>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> { async fn handle(&self, msg: Vec<i32>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i32; {}] {:?}", msg.len(), msg); println!("---> [i32; {}] {:?}", msg.len(), msg);
@ -32,6 +34,8 @@ impl AsyncBatchHandler<i32> for TmpReceiver {
impl BatchHandler<i16> for TmpReceiver { impl BatchHandler<i16> for TmpReceiver {
type Error = Error; type Error = Error;
type Response = (); type Response = ();
type InBatch = Vec<i16>;
type OutBatch = Vec<()>;
fn handle(&self, msg: Vec<i16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> { fn handle(&self, msg: Vec<i16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i16; {}] {:?}", msg.len(), msg); println!("---> [i16; {}] {:?}", msg.len(), msg);
@ -43,8 +47,8 @@ impl BatchHandler<i16> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<i32, receivers::BufferUnorderedBatchedAsync<_, _, _>, _, _>(16, Default::default()) .subscribe_batch_async::<i32>(16, Default::default())
.subscribe::<i16, receivers::BufferUnorderedBatchedSync<_, _, _>, _, _>(16, Default::default()) .subscribe_batch_sync::<i16>(16, Default::default())
.done() .done()
.build(); .build();

51
examples/demo_relay.rs Normal file
View File

@ -0,0 +1,51 @@
use messagebus::{error, Bus, Handler, Message, Module};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver;
impl Handler<u32> for TmpReceiver {
type Error = Error;
type Response = ();
fn handle(&self, msg: u32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> u32 {}", msg);
Ok(())
}
}
fn module() -> Module {
Module::new()
.register(TmpReceiver)
.subscribe_sync::<u32>(8, Default::default())
.done()
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build().add_module(module()).build();
// b.
println!("flush");
b.flush().await;
println!("close");
b.close().await;
println!("closed");
poller.await;
println!("[done]");
}

View File

@ -1,13 +1,16 @@
use core::f32; use core::f32;
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{AsyncHandler, Bus, Message, error::{self, StdSyncSendError}, receivers}; use messagebus::{
error::{self, StdSyncSendError},
AsyncHandler, Bus, Message,
};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(anyhow::Error) Error(anyhow::Error),
} }
impl<M: Message, E: StdSyncSendError> From<error::Error<M, E>> for Error { impl<M: Message, E: StdSyncSendError> From<error::Error<M, E>> for Error {
@ -16,7 +19,6 @@ impl<M: Message, E: StdSyncSendError> From<error::Error<M, E>> for Error {
} }
} }
struct TmpReceiver1; struct TmpReceiver1;
struct TmpReceiver2; struct TmpReceiver2;
@ -159,22 +161,23 @@ impl AsyncHandler<f32> for TmpReceiver2 {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver1) .register(TmpReceiver1)
.subscribe::<i32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<i32>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<u32>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<i16>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<u16>(8, Default::default())
.subscribe::<i8, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<i8>(8, Default::default())
.subscribe::<u8, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<u8>(8, Default::default())
.done() .done()
.register(TmpReceiver2) .register(TmpReceiver2)
.subscribe::<f32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<f32>(8, Default::default())
.subscribe::<f64, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default()) .subscribe_async::<f64>(8, Default::default())
.done() .done()
.build(); .build();
println!( println!(
"{:?}", "{:?}",
b.request_we::<_, f64, Error>(1000f64, Default::default()).await b.request_local_we::<_, f64, Error>(1000f64, Default::default())
.await
); );
println!("flush"); println!("flush");

View File

@ -1,10 +1,10 @@
use messagebus::{Bus, Handler, Message, Module, error, receivers}; use messagebus::{error, Bus, Handler, Message, Module};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(anyhow::Error) Error(anyhow::Error),
} }
impl<M: Message> From<error::Error<M>> for Error { impl<M: Message> From<error::Error<M>> for Error {
@ -52,18 +52,16 @@ impl Handler<u32> for TmpReceiver {
fn module() -> Module { fn module() -> Module {
Module::new() Module::new()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default()) .subscribe_sync::<f32>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default()) .subscribe_sync::<u16>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default()) .subscribe_sync::<u32>(8, Default::default())
.done() .done()
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build().add_module(module()).build();
.add_module(module())
.build();
b.send(32f32).await.unwrap(); b.send(32f32).await.unwrap();
b.send(11u16).await.unwrap(); b.send(11u16).await.unwrap();

View File

@ -1,13 +1,15 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{receivers, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Message, error, Bus}; use messagebus::{
error, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Bus, Message,
};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error, Clone)] #[derive(Debug, Error, Clone)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(Arc<anyhow::Error>) Error(Arc<anyhow::Error>),
} }
impl<M: Message> From<error::Error<M>> for Error { impl<M: Message> From<error::Error<M>> for Error {
@ -22,8 +24,15 @@ struct TmpReceiver;
impl AsyncBatchSynchronizedHandler<i32> for TmpReceiver { impl AsyncBatchSynchronizedHandler<i32> for TmpReceiver {
type Error = Error; type Error = Error;
type Response = (); type Response = ();
type InBatch = Vec<i32>;
type OutBatch = Vec<()>;
async fn handle(&mut self, msg: Vec<i32>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
async fn handle(
&mut self,
msg: Vec<i32>,
_bus: &Bus,
) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i32; {}] {:?}", msg.len(), msg); println!("---> [i32; {}] {:?}", msg.len(), msg);
Ok(vec![]) Ok(vec![])
@ -33,6 +42,8 @@ impl AsyncBatchSynchronizedHandler<i32> for TmpReceiver {
impl BatchSynchronizedHandler<i16> for TmpReceiver { impl BatchSynchronizedHandler<i16> for TmpReceiver {
type Error = Error; type Error = Error;
type Response = (); type Response = ();
type InBatch = Vec<i16>;
type OutBatch = Vec<()>;
fn handle(&mut self, msg: Vec<i16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> { fn handle(&mut self, msg: Vec<i16>, _bus: &Bus) -> Result<Vec<Self::Response>, Self::Error> {
println!("---> [i16; {}] {:?}", msg.len(), msg); println!("---> [i16; {}] {:?}", msg.len(), msg);
@ -44,8 +55,8 @@ impl BatchSynchronizedHandler<i16> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register_unsync(TmpReceiver) .register_unsync(TmpReceiver)
.subscribe::<i32, receivers::SynchronizedBatchedAsync<_, _, _>, _, _>(16, Default::default()) .subscribe_batch_async::<i32>(16, Default::default())
.subscribe::<i16, receivers::SynchronizedBatchedSync<_, _, _>, _, _>(16, Default::default()) .subscribe_batch_sync::<i16>(16, Default::default())
.done() .done()
.build(); .build();

View File

@ -1,11 +1,11 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{receivers, AsyncSynchronizedHandler, Bus, Message, error, SynchronizedHandler}; use messagebus::{error, receivers, AsyncSynchronizedHandler, Bus, Message, SynchronizedHandler};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum Error { enum Error {
#[error("Error({0})")] #[error("Error({0})")]
Error(anyhow::Error) Error(anyhow::Error),
} }
impl<M: Message> From<error::Error<M>> for Error { impl<M: Message> From<error::Error<M>> for Error {

View File

@ -3,7 +3,7 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync
use futures::{Future, FutureExt}; use futures::{Future, FutureExt};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::{Bus, BusInner, Message, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}}; use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, SynchronizedHandler, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers};
pub trait ReceiverSubscriberBuilder<T, M, R, E>: pub trait ReceiverSubscriberBuilder<T, M, R, E>:
SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
@ -52,7 +52,13 @@ pub struct RegisterEntry<K, T, F, B> {
} }
impl<K, T: 'static, F, B> RegisterEntry<K, T, F, B> impl<K, T: 'static, F, B> RegisterEntry<K, T, F, B>
where F: FnMut(&mut B, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), where
F: FnMut(
&mut B,
(TypeId, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
{ {
pub fn done(mut self) -> B { pub fn done(mut self) -> B {
for (tid, v) in self.receivers { for (tid, v) in self.receivers {
@ -87,6 +93,42 @@ impl<T, F, B> RegisterEntry<UnsyncEntry, T, F, B> {
self self
} }
#[inline]
pub fn subscribe_sync<M>(self, queue: u64, cfg: receivers::SynchronizedConfig) -> Self
where
T: SynchronizedHandler<M> + Send + 'static,
M: Message,
{
self.subscribe::<M, receivers::SynchronizedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_async<M>(self, queue: u64, cfg: receivers::SynchronizedConfig) -> Self
where
T: AsyncSynchronizedHandler<M> + Send + 'static,
M: Message,
{
self.subscribe::<M, receivers::SynchronizedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_sync<M>(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self
where
T: BatchSynchronizedHandler<M> + Send + 'static,
M: Message,
{
self.subscribe::<M, receivers::SynchronizedBatchedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_async<M>(self, queue: u64, cfg: receivers::SynchronizedBatchedConfig) -> Self
where
T: AsyncBatchSynchronizedHandler<M> + Send + 'static,
M: Message,
{
self.subscribe::<M, receivers::SynchronizedBatchedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
} }
impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> { impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
@ -109,8 +151,45 @@ impl<T, F, B> RegisterEntry<SyncEntry, T, F, B> {
self self
} }
}
#[inline]
pub fn subscribe_sync<M>(self, queue: u64, cfg: receivers::BufferUnorderedConfig) -> Self
where
T: Handler<M> + Send + Sync + 'static,
M: Message,
{
self.subscribe::<M, receivers::BufferUnorderedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_async<M>(self, queue: u64, cfg: receivers::BufferUnorderedConfig) -> Self
where
T: AsyncHandler<M> + Send + Sync + 'static,
M: Message,
T::Response: Message,
T::Error: StdSyncSendError,
{
self.subscribe::<M, receivers::BufferUnorderedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_sync<M>(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self
where
T: BatchHandler<M> + Send + 'static,
M: Message,
{
self.subscribe::<M, receivers::BufferUnorderedBatchedSync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
#[inline]
pub fn subscribe_batch_async<M>(self, queue: u64, cfg: receivers::BufferUnorderedBatchedConfig) -> Self
where
T: AsyncBatchHandler<M> + Send + 'static,
M: Message,
{
self.subscribe::<M, receivers::BufferUnorderedBatchedAsync<M, T::Response, T::Error>, T::Response, T::Error>(queue, cfg)
}
}
pub struct Module { pub struct Module {
receivers: Vec<(TypeId, Receiver)>, receivers: Vec<(TypeId, Receiver)>,
@ -125,7 +204,20 @@ impl Module {
} }
} }
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> { pub fn register<T: Send + Sync + 'static>(
self,
item: T,
) -> RegisterEntry<
SyncEntry,
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
Self,
> {
RegisterEntry { RegisterEntry {
item: Arc::new(item) as Untyped, item: Arc::new(item) as Untyped,
payload: self, payload: self,
@ -139,7 +231,20 @@ impl Module {
} }
} }
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> { pub fn register_unsync<T: Send + 'static>(
self,
item: T,
) -> RegisterEntry<
UnsyncEntry,
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
Self,
> {
RegisterEntry { RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped, item: Arc::new(Mutex::new(item)) as Untyped,
payload: self, payload: self,
@ -153,9 +258,11 @@ impl Module {
} }
} }
fn extend(&mut self, other: Module) { pub fn add_module(mut self, module: Module) -> Self {
self.receivers.extend(other.receivers.into_iter()); self.receivers.extend(module.receivers);
self.pollings.extend(other.pollings.into_iter()); self.pollings.extend(module.pollings);
self
} }
} }
@ -170,7 +277,20 @@ impl BusBuilder {
} }
} }
pub fn register<T: Send + Sync + 'static>(self, item: T) -> RegisterEntry<SyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> { pub fn register<T: Send + Sync + 'static>(
self,
item: T,
) -> RegisterEntry<
SyncEntry,
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
Self,
> {
RegisterEntry { RegisterEntry {
item: Arc::new(item) as Untyped, item: Arc::new(item) as Untyped,
payload: self, payload: self,
@ -184,7 +304,20 @@ impl BusBuilder {
} }
} }
pub fn register_unsync<T: Send + 'static>(self, item: T) -> RegisterEntry<UnsyncEntry, T, impl FnMut(&mut Self, (TypeId, Receiver), Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), Self> { pub fn register_unsync<T: Send + 'static>(
self,
item: T,
) -> RegisterEntry<
UnsyncEntry,
T,
impl FnMut(
&mut Self,
(TypeId, Receiver),
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
),
Self,
> {
RegisterEntry { RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped, item: Arc::new(Mutex::new(item)) as Untyped,
payload: self, payload: self,
@ -199,7 +332,7 @@ impl BusBuilder {
} }
pub fn add_module(mut self, module: Module) -> Self { pub fn add_module(mut self, module: Module) -> Self {
self.inner.extend(module); self.inner = self.inner.add_module(module);
self self
} }

View File

@ -1,6 +1,45 @@
use core::any::Any; use core::any::{Any, type_name};
use core::fmt; use core::fmt;
// use erased_serde::{Deserializer, Serialize}; use serde::{de::DeserializeOwned, Serialize};
pub trait Message: Any + fmt::Debug + Unpin + Send + Sync + 'static {} pub trait Message:
impl<T: Any + fmt::Debug + Unpin + Send + Sync> Message for T {} fmt::Debug + Unpin + Send + Sync + 'static
{
fn type_name(&self) -> &str;
}
impl<T: fmt::Debug + Unpin + Send + Sync + 'static> Message for T {
fn type_name(&self) -> &str {
type_name::<T>()
}
}
pub trait TransferableMessage: Message + Serialize + DeserializeOwned
{
fn into_boxed(self) -> BoxedMessage;
}
impl<T: Message + Serialize + DeserializeOwned> TransferableMessage for T {
fn into_boxed(self) -> BoxedMessage {
BoxedMessage(Box::new(self) as _)
}
}
pub trait SafeMessage:
Any + fmt::Debug + erased_serde::Serialize + Unpin + Send + Sync + 'static
{
fn type_name(&self) -> &str;
}
impl<T: Any + fmt::Debug + erased_serde::Serialize + Unpin + Send + Sync> SafeMessage for T {
fn type_name(&self) -> &str {
type_name::<T>()
}
}
#[derive(Debug)]
pub struct BoxedMessage(Box<dyn SafeMessage>);
impl<M: TransferableMessage> From<M> for BoxedMessage {
fn from(m: M) -> Self {
BoxedMessage(Box::new(m))
}
}

View File

@ -1,19 +1,18 @@
use core::panic; use core::fmt;
use thiserror::Error; use thiserror::Error;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::Message; use crate::{Message, envelop::{BoxedMessage, TransferableMessage}};
pub trait StdSyncSendError: std::error::Error + Send + Sync + Unpin + 'static {} pub trait StdSyncSendError: std::error::Error + Send + Sync + Unpin + 'static {}
impl<T: std::error::Error + Send + Sync + Unpin + 'static> StdSyncSendError for T {} impl<T: std::error::Error + Send + Sync + Unpin + 'static> StdSyncSendError for T {}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum VoidError {} pub enum VoidError {}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum SendError<M: Message> { pub enum SendError<M: fmt::Debug> {
#[error("Closed")] #[error("Closed")]
Closed(M), Closed(M),
@ -21,8 +20,17 @@ pub enum SendError<M: Message> {
Full(M), Full(M),
} }
impl<M: TransferableMessage> SendError<M> {
pub fn into_boxed(self) -> SendError<BoxedMessage> {
match self {
SendError::Closed(m) => SendError::Closed(BoxedMessage::from(m)),
SendError::Full(m) => SendError::Closed(BoxedMessage::from(m)),
}
}
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error<M: Message = (), E: StdSyncSendError = VoidError> { pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = VoidError> {
#[error("Message Send Error: {0}")] #[error("Message Send Error: {0}")]
SendError(#[from] SendError<M>), SendError(#[from] SendError<M>),
@ -35,6 +43,9 @@ pub enum Error<M: Message = (), E: StdSyncSendError = VoidError> {
#[error("Other({0})")] #[error("Other({0})")]
Other(E), Other(E),
#[error("Serialization({0})")]
Serialization(#[from] erased_serde::Error),
#[error("Other({0})")] #[error("Other({0})")]
OtherBoxed(Box<dyn StdSyncSendError>), OtherBoxed(Box<dyn StdSyncSendError>),
} }
@ -45,6 +56,7 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
Error::SendError(inner) => Error::SendError(inner), Error::SendError(inner) => Error::SendError(inner),
Error::NoResponse => Error::NoReceivers, Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _), Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner), Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
} }
@ -55,18 +67,20 @@ impl<M: Message, E: StdSyncSendError> Error<M, E> {
Error::SendError(inner) => Error::SendError(inner), Error::SendError(inner) => Error::SendError(inner),
Error::NoResponse => Error::NoReceivers, Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s),
Error::Other(_) => panic!("expected boxed error!"), Error::Other(_) => panic!("expected boxed error!"),
Error::OtherBoxed(inner) => Error::Other(inner.into()), Error::OtherBoxed(inner) => Error::Other(inner.into()),
} }
} }
} }
impl <E: StdSyncSendError> Error<(), E> { impl<E: StdSyncSendError> Error<(), E> {
pub fn specify<M: Message>(self) -> Error<M, E> { pub fn specify<M: Message>(self) -> Error<M, E> {
match self { match self {
Error::SendError(_) => panic!("cannot specify type on typed error"), Error::SendError(_) => panic!("cannot specify type on typed error"),
Error::NoResponse => Error::NoReceivers, Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers, Error::NoReceivers => Error::NoReceivers,
Error::Serialization(s) => Error::Serialization(s),
Error::Other(inner) => Error::Other(inner), Error::Other(inner) => Error::Other(inner),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner), Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
} }

View File

@ -1,4 +1,6 @@
use crate::{Bus, Message, error::StdSyncSendError}; use std::iter::FromIterator;
use crate::{error::StdSyncSendError, Bus, Message};
use async_trait::async_trait; use async_trait::async_trait;
pub trait Handler<M: Message>: Send + Sync { pub trait Handler<M: Message>: Send + Sync {
@ -46,8 +48,10 @@ pub trait AsyncSynchronizedHandler<M: Message>: Send {
pub trait BatchHandler<M: Message>: Send + Sync { pub trait BatchHandler<M: Message>: Send + Sync {
type Error: StdSyncSendError + Clone; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -57,8 +61,10 @@ pub trait BatchHandler<M: Message>: Send + Sync {
pub trait AsyncBatchHandler<M: Message>: Send + Sync { pub trait AsyncBatchHandler<M: Message>: Send + Sync {
type Error: StdSyncSendError + Clone; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -67,8 +73,10 @@ pub trait AsyncBatchHandler<M: Message>: Send + Sync {
pub trait BatchSynchronizedHandler<M: Message>: Send { pub trait BatchSynchronizedHandler<M: Message>: Send {
type Error: StdSyncSendError + Clone; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -78,8 +86,10 @@ pub trait BatchSynchronizedHandler<M: Message>: Send {
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send { pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
type Error: StdSyncSendError + Clone; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -109,8 +119,10 @@ pub trait LocalAsyncHandler<M: Message> {
pub trait LocalBatchHandler<M: Message> { pub trait LocalBatchHandler<M: Message> {
type Error: StdSyncSendError + Clone; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
@ -120,8 +132,10 @@ pub trait LocalBatchHandler<M: Message> {
pub trait LocalAsyncBatchHandler<M: Message> { pub trait LocalAsyncBatchHandler<M: Message> {
type Error: StdSyncSendError + Clone; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> { async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }

View File

@ -5,15 +5,18 @@ mod handler;
mod receiver; mod receiver;
pub mod receivers; pub mod receivers;
mod trait_object; mod trait_object;
pub mod relay;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
use crate::receiver::Permit; use crate::receiver::Permit;
use builder::BusBuilder; use builder::BusBuilder;
use core::any::{Any, TypeId};
pub use builder::Module; pub use builder::Module;
pub use envelop::Message; pub use relay::RelayTrait;
use core::any::{Any, TypeId};
pub use envelop::{BoxedMessage, TransferableMessage, Message};
use error::{Error, SendError, StdSyncSendError};
pub use handler::*; pub use handler::*;
use receiver::Receiver; use receiver::Receiver;
use smallvec::SmallVec; use smallvec::SmallVec;
@ -25,7 +28,6 @@ use std::{
}, },
}; };
use tokio::sync::oneshot; use tokio::sync::oneshot;
use error::{Error, SendError, StdSyncSendError};
pub type Untyped = Arc<dyn Any + Send + Sync>; pub type Untyped = Arc<dyn Any + Send + Sync>;
@ -134,11 +136,11 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn try_send<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> { pub fn try_send<M: TransferableMessage + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.try_send_ext(msg, SendOptions::Broadcast) self.try_send_ext(msg, SendOptions::Broadcast)
} }
pub fn try_send_ext<M: Message + Clone>( pub fn try_send_ext<M: TransferableMessage + Clone>(
&self, &self,
msg: M, msg: M,
_options: SendOptions, _options: SendOptions,
@ -163,13 +165,13 @@ impl BusInner {
while counter < total { while counter < total {
let (p, r) = iter.next().unwrap(); let (p, r) = iter.next().unwrap();
let _ = r.send(mid, p, msg.clone()); let _ = r.send(mid, msg.clone(), p);
counter += 1; counter += 1;
} }
if let Some((p, r)) = iter.next() { if let Some((p, r)) = iter.next() {
let _ = r.send(mid, p, msg); let _ = r.send(mid, msg, p);
return Ok(()); return Ok(());
} }
} }
@ -183,12 +185,12 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn send_blocking<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> { pub fn send_blocking<M: TransferableMessage + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.send_blocking_ext(msg, SendOptions::Broadcast) self.send_blocking_ext(msg, SendOptions::Broadcast)
} }
#[inline] #[inline]
pub fn send_blocking_ext<M: Message + Clone>( pub fn send_blocking_ext<M: TransferableMessage + Clone>(
&self, &self,
msg: M, msg: M,
options: SendOptions, options: SendOptions,
@ -197,11 +199,11 @@ impl BusInner {
} }
#[inline] #[inline]
pub async fn send<M: Message + Clone>(&self, msg: M) -> core::result::Result<(), Error<M>> { pub async fn send<M: TransferableMessage + Clone>(&self, msg: M) -> core::result::Result<(), Error<M>> {
Ok(self.send_ext(msg, SendOptions::Broadcast).await?) Ok(self.send_ext(msg, SendOptions::Broadcast).await?)
} }
pub async fn send_ext<M: Message + Clone>( pub async fn send_ext<M: TransferableMessage + Clone>(
&self, &self,
msg: M, msg: M,
_options: SendOptions, _options: SendOptions,
@ -216,10 +218,10 @@ impl BusInner {
if let Some(rs) = self.receivers.get(&tid) { if let Some(rs) = self.receivers.get(&tid) {
if let Some((last, head)) = rs.split_last() { if let Some((last, head)) = rs.split_last() {
for r in head { for r in head {
let _ = r.send(mid, r.reserve().await, msg.clone()); let _ = r.send(mid, msg.clone(), r.reserve().await);
} }
let _ = last.send(mid, last.reserve().await, msg); let _ = last.send(mid, msg, last.reserve().await);
return Ok(()); return Ok(());
} }
@ -234,11 +236,11 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn force_send<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> { pub fn force_send<M: TransferableMessage + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.force_send_ext(msg, SendOptions::Broadcast) self.force_send_ext(msg, SendOptions::Broadcast)
} }
pub fn force_send_ext<M: Message + Clone>( pub fn force_send_ext<M: TransferableMessage + Clone>(
&self, &self,
msg: M, msg: M,
_options: SendOptions, _options: SendOptions,
@ -271,7 +273,7 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn try_send_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> { pub fn try_send_one<M: TransferableMessage>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) { if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into()); return Err(SendError::Closed(msg).into());
} }
@ -279,20 +281,20 @@ impl BusInner {
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>(); let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs|rs.first()) { if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) {
let permits = if let Some(x) = rs.try_reserve() { let permits = if let Some(x) = rs.try_reserve() {
x x
} else { } else {
return Err(SendError::Full(msg).into()); return Err(SendError::Full(msg).into());
}; };
Ok(rs.send(mid, permits, msg)?) Ok(rs.send(mid, msg, permits)?)
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }
} }
pub async fn send_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> { pub async fn send_one<M: TransferableMessage>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) { if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into()); return Err(SendError::Closed(msg).into());
} }
@ -300,19 +302,34 @@ impl BusInner {
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed); let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>(); let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs|rs.first()) { if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) {
Ok(rs.send(mid, rs.reserve().await, msg)?) Ok(rs.send(mid, msg, rs.reserve().await)?)
} else {
Err(Error::NoReceivers)
}
}
pub async fn send_local_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs| rs.first()) {
Ok(rs.send(mid, msg, rs.reserve().await)?)
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }
} }
#[inline] #[inline]
pub fn send_one_blocking<M: Message>(&self, msg: M) -> Result<(), Error<M>> { pub fn send_one_blocking<M: TransferableMessage>(&self, msg: M) -> Result<(), Error<M>> {
futures::executor::block_on(self.send_one(msg)) futures::executor::block_on(self.send_one(msg))
} }
pub async fn request<M: Message, R: Message>( pub async fn request<M: TransferableMessage, R: TransferableMessage>(
&self, &self,
req: M, req: M,
options: SendOptions, options: SendOptions,
@ -324,23 +341,19 @@ impl BusInner {
if let Some(rc) = iter.next() { if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
rc.send(mid, rc.reserve().await, req)?; rc.send(mid, req, rc.reserve().await)?;
rx.await?.map_err(|x|x.specify::<M>()) rx.await?.map_err(|x| x.specify::<M>())
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }
} }
pub async fn request_we<M, R, E>( pub async fn request_local_we<M, R, E>(&self, req: M, options: SendOptions) -> Result<R, Error<M, E>>
&self, where
req: M, M: Message,
options: SendOptions, R: Message,
) -> Result<R, Error<M, E>> E: StdSyncSendError,
where
M: Message,
R: Message,
E: StdSyncSendError
{ {
let tid = TypeId::of::<M>(); let tid = TypeId::of::<M>();
let rid = TypeId::of::<R>(); let rid = TypeId::of::<R>();
@ -350,10 +363,9 @@ impl BusInner {
if let Some(rc) = iter.next() { if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter_we(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; let mid = (rc.add_response_waiter_we(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
rc.send(mid, rc.reserve().await, req)?; rc.send(mid, req, rc.reserve().await)?;
rx.await? rx.await?.map_err(|x| x.specify::<M>())
.map_err(|x|x.specify::<M>())
} else { } else {
Err(Error::NoReceivers) Err(Error::NoReceivers)
} }

View File

@ -1,4 +1,4 @@
use crate::{Bus, Error, Message, error::{SendError, StdSyncSendError}, trait_object::TraitObject}; use crate::{Bus, Error, Message, envelop::{BoxedMessage, TransferableMessage}, error::{SendError, StdSyncSendError}, trait_object::TraitObject};
use core::{ use core::{
any::TypeId, any::TypeId,
fmt, fmt,
@ -7,6 +7,7 @@ use core::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use erased_serde::Deserializer;
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::Future; use futures::Future;
use std::{ use std::{
@ -52,6 +53,10 @@ pub trait ReceiverTrait: Send + Sync {
fn flush(&self) -> Result<(), Error<Action>>; fn flush(&self) -> Result<(), Error<Action>>;
} }
pub trait TransferableReceiverTrait: Send + Sync {
fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error<BoxedMessage>>;
}
pub trait ReceiverPollerBuilder { pub trait ReceiverPollerBuilder {
fn build(bus: Bus) -> Box<dyn Future<Output = ()>>; fn build(bus: Bus) -> Box<dyn Future<Output = ()>>;
} }
@ -141,6 +146,19 @@ where
} }
} }
impl<M, R, E, S> TransferableReceiverTrait for ReceiverWrapper<M, R, E, S>
where
M: TransferableMessage,
R: TransferableMessage,
E: StdSyncSendError,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{
fn send(&self, mid: u64, de: &mut dyn Deserializer) -> Result<(), Error<BoxedMessage>> {
unimplemented!()
}
}
pub struct Permit { pub struct Permit {
pub(crate) fuse: bool, pub(crate) fuse: bool,
pub(crate) inner: Arc<dyn PermitDrop>, pub(crate) inner: Arc<dyn PermitDrop>,
@ -301,8 +319,16 @@ impl Receiver {
inner, inner,
_m: Default::default(), _m: Default::default(),
}), }),
waiters: Arc::new(sharded_slab::Slab::<oneshot::Sender<Result<R, Error<(), E>>>>::new_with_config::<SlabCfg>()), waiters: Arc::new(
waiters_void: Arc::new(sharded_slab::Slab::<oneshot::Sender<Result<R, Error<()>>>>::new_with_config::<SlabCfg>()), sharded_slab::Slab::<oneshot::Sender<Result<R, Error<(), E>>>>::new_with_config::<
SlabCfg,
>(),
),
waiters_void: Arc::new(
sharded_slab::Slab::<oneshot::Sender<Result<R, Error<()>>>>::new_with_config::<
SlabCfg,
>(),
),
} }
} }
@ -374,8 +400,8 @@ impl Receiver {
pub fn send<M: Message>( pub fn send<M: Message>(
&self, &self,
mid: u64, mid: u64,
mut permit: Permit,
msg: M, msg: M,
mut permit: Permit,
) -> Result<(), SendError<M>> { ) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed(); let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>(); let receiver = any_receiver.dyn_typed_receiver::<M>();
@ -416,13 +442,13 @@ impl Receiver {
let waiters = self let waiters = self
.waiters .waiters
.clone() .clone()
.downcast::<Slab::<oneshot::Sender<Result<R, Error<(), E>>>>>() .downcast::<Slab<oneshot::Sender<Result<R, Error<(), E>>>>>()
.unwrap(); .unwrap();
let waiters_void = self let waiters_void = self
.waiters_void .waiters_void
.clone() .clone()
.downcast::<Slab::<oneshot::Sender<Result<R, Error<()>>>>>() .downcast::<Slab<oneshot::Sender<Result<R, Error<()>>>>>()
.unwrap(); .unwrap();
Box::new(move |_| { Box::new(move |_| {
@ -448,13 +474,13 @@ impl Receiver {
error!("Response cannot be processed!"); error!("Response cannot be processed!");
} }
} else if let Some(waiter) = waiters_void.take(mid as usize) { } else if let Some(waiter) = waiters_void.take(mid as usize) {
if waiter.send(resp.map_err(|x|x.into_dyn())).is_err() { if waiter.send(resp.map_err(|x| x.into_dyn())).is_err() {
error!("Response cannot be processed!"); error!("Response cannot be processed!");
} }
} else if TypeId::of::<R>() != TypeId::of::<()>() { } else if TypeId::of::<R>() != TypeId::of::<()>() {
warn!("Non-void response has no waiters!"); warn!("Non-void response has no waiters!");
} }
}, }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -491,6 +517,19 @@ impl Receiver {
Some(idx) Some(idx)
} }
// #[inline]
// pub(crate) fn add_response_waiter_dyn(
// &self,
// waiter: oneshot::Sender<Result<R, Error<(), E>>>,
// ) -> Option<usize> {
// let idx = self
// .waiters
// .downcast_ref::<Slab<oneshot::Sender<Result<R, Error<(), E>>>>>()
// .unwrap()
// .insert(waiter)?;
// Some(idx)
// }
#[inline] #[inline]
pub async fn close(&self) { pub async fn close(&self) {

View File

@ -10,8 +10,8 @@ use std::{
use crate::{ use crate::{
buffer_unordered_poller_macro, buffer_unordered_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, StdSyncSendError, SendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request}, receivers::{fix_type, Request},
AsyncHandler, Bus, Message, Untyped, AsyncHandler, Bus, Message, Untyped,
}; };

View File

@ -5,6 +5,7 @@ use std::sync::atomic::AtomicU64;
pub use r#async::BufferUnorderedAsync; pub use r#async::BufferUnorderedAsync;
pub use sync::BufferUnorderedSync; pub use sync::BufferUnorderedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)] #[derive(Debug)]
pub struct BufferUnorderedStats { pub struct BufferUnorderedStats {
@ -14,7 +15,7 @@ pub struct BufferUnorderedStats {
pub parallel_total: AtomicU64, pub parallel_total: AtomicU64,
} }
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct BufferUnorderedConfig { pub struct BufferUnorderedConfig {
pub buffer_size: usize, pub buffer_size: usize,
pub max_parallel: usize, pub max_parallel: usize,
@ -96,7 +97,8 @@ macro_rules! buffer_unordered_poller_macro {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Some((mid, resp))) => { Poll::Ready(Some((mid, resp))) => {
let resp: Result<_, $t::Error> = resp; let resp: Result<_, $t::Error> = resp;
stx.send(Event::Response(mid, resp.map_err(Error::Other))).ok(); stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.ok();
} }
Poll::Ready(None) => break, Poll::Ready(None) => break,
} }
@ -114,7 +116,8 @@ macro_rules! buffer_unordered_poller_macro {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(resp) => { Poll::Ready(resp) => {
let resp: Result<_, E> = resp; let resp: Result<_, E> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.ok();
} }
} }
need_sync = false; need_sync = false;

View File

@ -7,15 +7,15 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{ use crate::{
buffer_unordered_poller_macro, buffer_unordered_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, StdSyncSendError, SendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request}, receivers::{fix_type, Request},
Bus, Handler, Message, Untyped, Bus, Handler, Message, Untyped,
}; };
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;

View File

@ -7,7 +7,14 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{AsyncBatchHandler, Bus, Message, Untyped, buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receivers::{fix_type, Request}}; use crate::{
buffer_unordered_batch_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
AsyncBatchHandler, Bus, Message, Untyped,
};
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
@ -39,7 +46,8 @@ where
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for BufferUnorderedBatchedAsync<M, R, T::Error> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
for BufferUnorderedBatchedAsync<M, R, T::Error>
where where
T: AsyncBatchHandler<M, Response = R> + 'static, T: AsyncBatchHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError + Clone, T::Error: StdSyncSendError + Clone,

View File

@ -5,6 +5,7 @@ use std::sync::atomic::AtomicU64;
pub use r#async::BufferUnorderedBatchedAsync; pub use r#async::BufferUnorderedBatchedAsync;
pub use sync::BufferUnorderedBatchedSync; pub use sync::BufferUnorderedBatchedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)] #[derive(Debug)]
pub struct BufferUnorderedBatchedStats { pub struct BufferUnorderedBatchedStats {
@ -16,7 +17,7 @@ pub struct BufferUnorderedBatchedStats {
pub batch_size: AtomicU64, pub batch_size: AtomicU64,
} }
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct BufferUnorderedBatchedConfig { pub struct BufferUnorderedBatchedConfig {
pub buffer_size: usize, pub buffer_size: usize,
pub max_parallel: usize, pub max_parallel: usize,
@ -145,7 +146,11 @@ macro_rules! buffer_unordered_batch_poller_macro {
} }
Err(er) => { Err(er) => {
for mid in mids { for mid in mids {
stx.send(Event::Response(mid, Err(Error::Other(er.clone())))).ok(); stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
))
.ok();
} }
} }
}, },
@ -165,7 +170,8 @@ macro_rules! buffer_unordered_batch_poller_macro {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(resp) => { Poll::Ready(resp) => {
let resp: Result<_, $t::Error> = resp; let resp: Result<_, $t::Error> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.ok();
} }
} }
need_sync = false; need_sync = false;

View File

@ -7,8 +7,15 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{BatchHandler, Bus, Message, Untyped, buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receivers::{fix_type, Request}};
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use crate::{
buffer_unordered_batch_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
BatchHandler, Bus, Message, Untyped,
};
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -48,7 +55,8 @@ where
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for BufferUnorderedBatchedSync<M, R, T::Error> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
for BufferUnorderedBatchedSync<M, R, T::Error>
where where
T: BatchHandler<M, Response = R> + 'static, T: BatchHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError, T::Error: StdSyncSendError,

View File

@ -28,6 +28,11 @@ where
Pin::new_unchecked(x) Pin::new_unchecked(x)
} }
#[inline(always)]
pub(crate) fn fix_into_iter<I, T: IntoIterator<Item = I> + Send>(x: T) -> impl IntoIterator<Item = I> + Send {
x
}
pub(crate) enum Request<M> { pub(crate) enum Request<M> {
Action(Action), Action(Action),
Request(u64, M), Request(u64, M),

View File

@ -6,10 +6,10 @@ use std::{
use crate::{ use crate::{
batch_synchronized_poller_macro, batch_synchronized_poller_macro,
error::{Error, SendError, StdSyncSendError},
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, error::{Error, SendError, StdSyncSendError},
receivers::{fix_type, Request}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, fix_into_iter, Request},
AsyncBatchSynchronizedHandler, Bus, Message, Untyped, AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
}; };
@ -40,7 +40,8 @@ where
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for SynchronizedBatchedAsync<M, R, T::Error> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
for SynchronizedBatchedAsync<M, R, T::Error>
where where
T: AsyncBatchSynchronizedHandler<M, Response = R> + 'static, T: AsyncBatchSynchronizedHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError + Clone, T::Error: StdSyncSendError + Clone,
@ -62,9 +63,8 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(batch_synchronized_poller::<T, M, R>( Box::pin(batch_synchronized_poller::<T, M, R>(rx, bus, ut, cfg, stx))
rx, bus, ut, cfg, stx, as Pin<Box<dyn Future<Output = ()> + Send>>
)) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });

View File

@ -5,6 +5,8 @@ use std::sync::atomic::AtomicU64;
pub use r#async::SynchronizedBatchedAsync; pub use r#async::SynchronizedBatchedAsync;
pub use sync::SynchronizedBatchedSync; pub use sync::SynchronizedBatchedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)] #[derive(Debug)]
pub struct SynchronizedBatchedStats { pub struct SynchronizedBatchedStats {
@ -14,7 +16,7 @@ pub struct SynchronizedBatchedStats {
pub batch_size: AtomicU64, pub batch_size: AtomicU64,
} }
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct SynchronizedBatchedConfig { pub struct SynchronizedBatchedConfig {
pub buffer_size: usize, pub buffer_size: usize,
pub batch_size: usize, pub batch_size: usize,
@ -65,7 +67,7 @@ macro_rules! batch_synchronized_poller_macro {
Poll::Ready((mids, res)) => match res { Poll::Ready((mids, res)) => match res {
Ok(re) => { Ok(re) => {
let mids: Vec<u64> = mids; let mids: Vec<u64> = mids;
let re: Vec<R> = re; let re = fix_into_iter::<R, _>(re);
let mut mids = mids.into_iter(); let mut mids = mids.into_iter();
let mut re = re.into_iter(); let mut re = re.into_iter();
@ -74,11 +76,7 @@ macro_rules! batch_synchronized_poller_macro {
if let Some(r) = re.next() { if let Some(r) = re.next() {
stx.send(Event::Response(mid, Ok(r))).ok(); stx.send(Event::Response(mid, Ok(r))).ok();
} else { } else {
stx.send(Event::Response( stx.send(Event::Response(mid, Err(Error::NoResponse))).ok();
mid,
Err(Error::NoResponse),
))
.ok();
} }
} }
} }
@ -86,10 +84,11 @@ macro_rules! batch_synchronized_poller_macro {
Err(er) => { Err(er) => {
let er: $t::Error = er; let er: $t::Error = er;
for mid in mids { for mid in mids {
stx.send(Event::Response(mid, Err(Error::Other(er.clone())))).ok(); stx.send(Event::Response(mid, Err(Error::Other(er.clone()))))
.ok();
} }
} }
} },
} }
} }
handle_future = None; handle_future = None;
@ -159,7 +158,8 @@ macro_rules! batch_synchronized_poller_macro {
Poll::Ready(resp) => { Poll::Ready(resp) => {
need_sync = false; need_sync = false;
let resp: Result<_, $t::Error> = resp; let resp: Result<_, $t::Error> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.ok();
} }
} }
sync_future = None; sync_future = None;

View File

@ -7,9 +7,9 @@ use std::{
use crate::{ use crate::{
batch_synchronized_poller_macro, batch_synchronized_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, StdSyncSendError, SendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request}, receivers::{fix_type, fix_into_iter, Request},
BatchSynchronizedHandler, Bus, Message, Untyped, BatchSynchronizedHandler, Bus, Message, Untyped,
}; };
@ -44,7 +44,8 @@ where
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for SynchronizedBatchedSync<M, R, T::Error> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
for SynchronizedBatchedSync<M, R, T::Error>
where where
T: BatchSynchronizedHandler<M, Response = R> + 'static, T: BatchSynchronizedHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError, T::Error: StdSyncSendError,
@ -66,9 +67,8 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(batch_synchronized_poller::<T, M, R>( Box::pin(batch_synchronized_poller::<T, M, R>(rx, bus, ut, cfg, stx))
rx, bus, ut, cfg, stx, as Pin<Box<dyn Future<Output = ()> + Send>>
)) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });

View File

@ -9,9 +9,9 @@ use futures::Future;
use super::SynchronizedConfig; use super::SynchronizedConfig;
use crate::{ use crate::{
error::{Error, StdSyncSendError, SendError},
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendTypedReceiver, Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request}, receivers::{fix_type, Request},
AsyncSynchronizedHandler, Bus, Message, Untyped, AsyncSynchronizedHandler, Bus, Message, Untyped,
}; };

View File

@ -5,6 +5,7 @@ use std::sync::atomic::AtomicU64;
pub use r#async::SynchronizedAsync; pub use r#async::SynchronizedAsync;
pub use sync::SynchronizedSync; pub use sync::SynchronizedSync;
use serde_derive::{Serialize, Deserialize};
#[derive(Debug)] #[derive(Debug)]
pub struct SynchronizedStats { pub struct SynchronizedStats {
@ -12,7 +13,7 @@ pub struct SynchronizedStats {
pub buffer_total: AtomicU64, pub buffer_total: AtomicU64,
} }
#[derive(Copy, Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SynchronizedConfig { pub struct SynchronizedConfig {
pub buffer_size: usize, pub buffer_size: usize,
} }
@ -51,7 +52,8 @@ macro_rules! synchronized_poller_macro {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready((mid, resp)) => { Poll::Ready((mid, resp)) => {
let resp: Result<_, $t::Error> = resp; let resp: Result<_, $t::Error> = resp;
stx.send(Event::Response(mid, resp.map_err(Error::Other))).ok(); stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.ok();
} }
} }
} }
@ -91,7 +93,8 @@ macro_rules! synchronized_poller_macro {
Poll::Ready(resp) => { Poll::Ready(resp) => {
need_sync = false; need_sync = false;
let resp: Result<_, $t::Error> = resp; let resp: Result<_, $t::Error> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok(); stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.ok();
} }
} }
sync_future = None; sync_future = None;

View File

@ -5,15 +5,15 @@ use std::{
}; };
use crate::synchronized_poller_macro; use crate::synchronized_poller_macro;
use futures::{Future, executor::block_on}; use futures::{executor::block_on, Future};
use super::SynchronizedConfig; use super::SynchronizedConfig;
use crate::{ use crate::{
error::{Error, StdSyncSendError, SendError},
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendTypedReceiver, Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request}, receivers::{fix_type, Request},
SynchronizedHandler, Bus, Message, Untyped, Bus, Message, SynchronizedHandler, Untyped,
}; };
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};

122
src/relay.rs Normal file
View File

@ -0,0 +1,122 @@
use std::{any::TypeId, borrow::Cow, collections::HashMap, sync::atomic::{AtomicU64, Ordering}};
use tokio::sync::oneshot::Sender;
use sharded_slab::Slab;
use crate::{Bus, Message, envelop::SafeMessage, error::{Error, SendError}, receiver::Permit};
pub trait RelayTrait {
// fn handle_message(&self, mid: u64, msg: &dyn SafeMessage, tx: Option<Sender<>>, bus: &Bus);
fn start_relay(&self, bus: &Bus) -> Result<(), Error> ;
fn stop_relay(&self);
}
pub struct Relay {
in_map: HashMap<Cow<'static, str>, TypeId>,
out_map: HashMap<TypeId, Cow<'static, str>>,
// waiters: Slab<Sender<Result<R, Error<(), E>>>>,
queue_size: AtomicU64,
limit: u64,
}
impl Relay {
// pub async fn reserve(&self) -> Permit {
// loop {
// let count = self.queue_size.load(Ordering::Relaxed);
// if count < self.limit {
// let res = self.processing.compare_exchange(
// count,
// count + 1,
// Ordering::SeqCst,
// Ordering::SeqCst,
// );
// if res.is_ok() {
// break Permit {
// fuse: false,
// inner: self.context.clone(),
// };
// }
// // continue
// } else {
// self.response.notified().await
// }
// }
// }
// pub fn try_reserve(&self) -> Option<Permit> {
// loop {
// let count = self.processing.load(Ordering::Relaxed);
// if count < self.limit {
// let res = self.processing.compare_exchange(
// count,
// count + 1,
// Ordering::SeqCst,
// Ordering::SeqCst,
// );
// if res.is_ok() {
// break Some(Permit {
// fuse: false,
// inner: self.context.clone(),
// });
// }
// // continue
// } else {
// break None;
// }
// }
// }
// #[inline]
// pub fn send<M: Message>(
// &self,
// mid: u64,
// msg: M,
// mut permit: Permit,
// ) -> Result<(), SendError<M>> {
// unimplemented!()
// }
// #[inline]
// pub fn force_send<M: Message + Clone>(&self, mid: u64, msg: M) -> Result<(), SendError<M>> {
// unimplemented!()
// }
// #[inline]
// pub fn need_flush(&self) -> bool {
// self.context.need_flush.load(Ordering::SeqCst)
// }
// #[inline]
// pub async fn close(&self) {
// let notified = self.context.closed.notified();
// if self.inner.close().is_ok() {
// notified.await;
// } else {
// warn!("close failed!");
// }
// }
// #[inline]
// pub async fn sync(&self) {
// let notified = self.context.synchronized.notified();
// if self.inner.sync().is_ok() {
// notified.await
// } else {
// warn!("sync failed!");
// }
// }
// #[inline]
// pub async fn flush(&self) {
// let notified = self.context.flushed.notified();
// if self.inner.flush().is_ok() {
// notified.await;
// self.context.need_flush.store(false, Ordering::SeqCst);
// } else {
// warn!("flush failed!");
// }
// }
}