Update error handling for request/response

This commit is contained in:
Andrey Tkachenko 2021-06-28 17:41:00 +04:00
parent 2f4a267d09
commit 3e0f21153d
26 changed files with 486 additions and 380 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "messagebus" name = "messagebus"
version = "0.6.0" version = "0.6.2"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"] authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git" repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"] keywords = ["futures", "async", "tokio", "message", "bus"]
@ -15,13 +15,14 @@ tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync"] }
parking_lot = "0.11.1" parking_lot = "0.11.1"
async-trait = "0.1.42" async-trait = "0.1.42"
futures = "0.3.8" futures = "0.3.8"
anyhow = "1.0.34"
tokio-util = "0.6.7" tokio-util = "0.6.7"
async-stream = "0.3.2" async-stream = "0.3.2"
smallvec = "1.6.1" smallvec = "1.6.1"
log = "0.4.14" log = "0.4.14"
sharded-slab = "0.1.1" sharded-slab = "0.1.1"
thiserror = "1.0.25"
[dev-dependencies] [dev-dependencies]
anyhow = "1.0.41"
env_logger = "0.8.4" env_logger = "0.8.4"
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] } tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] }

View File

@ -1,5 +1,18 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{error::Error, receivers, AsyncHandler, Bus, Handler}; use messagebus::{AsyncHandler, Bus, Handler, Message, error, receivers};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error)
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver; struct TmpReceiver;
struct TmpReceiver2; struct TmpReceiver2;
@ -139,15 +152,15 @@ async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<f32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<u16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<u32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<i32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<i16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.done() .done()
.register(TmpReceiver2) .register(TmpReceiver2)
.subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<i32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default()) .subscribe::<i16, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.done() .done()
.build(); .build();

View File

@ -1,5 +1,18 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{error::Error, receivers, AsyncHandler, Bus}; use messagebus::{receivers, AsyncHandler, Message, error, Bus};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error)
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver; struct TmpReceiver;
@ -22,7 +35,7 @@ impl AsyncHandler<f32> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>( .subscribe::<f32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(
1, 1,
receivers::BufferUnorderedConfig { receivers::BufferUnorderedConfig {
buffer_size: 1, buffer_size: 1,

View File

@ -1,6 +1,20 @@
use async_trait::async_trait; use std::sync::Arc;
use messagebus::{error::Error, receivers, AsyncBatchHandler, BatchHandler, Bus};
use async_trait::async_trait;
use messagebus::{receivers, AsyncBatchHandler, BatchHandler, Message, error, Bus};
use thiserror::Error;
#[derive(Debug, Error, Clone)]
enum Error {
#[error("Error({0})")]
Error(Arc<anyhow::Error>)
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(Arc::new(err.into()))
}
}
struct TmpReceiver; struct TmpReceiver;
#[async_trait] #[async_trait]
@ -29,12 +43,12 @@ impl BatchHandler<i16> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<i32, receivers::BufferUnorderedBatchedAsync<_>, _, _>(16, Default::default()) .subscribe::<i32, receivers::BufferUnorderedBatchedAsync<_, _, _>, _, _>(16, Default::default())
.subscribe::<i16, receivers::BufferUnorderedBatchedSync<_>, _, _>(16, Default::default()) .subscribe::<i16, receivers::BufferUnorderedBatchedSync<_, _, _>, _, _>(16, Default::default())
.done() .done()
.build(); .build();
for i in 1..100 { for i in 1..100i32 {
b.send(i).await.unwrap(); b.send(i).await.unwrap();
} }

View File

@ -1,5 +1,21 @@
use core::f32;
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{error::Error, receivers, AsyncHandler, Bus}; use messagebus::{AsyncHandler, Bus, Message, error::{self, StdSyncSendError}, receivers};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error)
}
impl<M: Message, E: StdSyncSendError> From<error::Error<M, E>> for Error {
fn from(err: error::Error<M, E>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver1; struct TmpReceiver1;
struct TmpReceiver2; struct TmpReceiver2;
@ -143,22 +159,22 @@ impl AsyncHandler<f32> for TmpReceiver2 {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver1) .register(TmpReceiver1)
.subscribe::<i32, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<i32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<u32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i16, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<i16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<u16, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i8, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<i8, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u8, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<u8, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.done() .done()
.register(TmpReceiver2) .register(TmpReceiver2)
.subscribe::<f32, receivers::BufferUnorderedAsync<_, f32>, _, _>(8, Default::default()) .subscribe::<f32, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.subscribe::<f64, receivers::BufferUnorderedAsync<_, f64>, _, _>(8, Default::default()) .subscribe::<f64, receivers::BufferUnorderedAsync<_, _, _>, _, _>(8, Default::default())
.done() .done()
.build(); .build();
println!( println!(
"{:?}", "{:?}",
b.request::<_, f64>(1000f64, Default::default()).await b.request_we::<_, f64, Error>(1000f64, Default::default()).await
); );
println!("flush"); println!("flush");

View File

@ -1,4 +1,17 @@
use messagebus::{error::Error, receivers, Bus, Handler}; use messagebus::{receivers, Bus, Message, error, Handler};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error)
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver; struct TmpReceiver;
@ -41,9 +54,9 @@ impl Handler<u32> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default()) .subscribe::<f32, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u16, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default()) .subscribe::<u16, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.subscribe::<u32, receivers::BufferUnorderedSync<_>, _, _>(8, Default::default()) .subscribe::<u32, receivers::BufferUnorderedSync<_, _, _>, _, _>(8, Default::default())
.done() .done()
.build(); .build();

View File

@ -1,5 +1,20 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{error::Error, receivers, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Bus}; use messagebus::{receivers, AsyncBatchSynchronizedHandler, BatchSynchronizedHandler, Message, error, Bus};
use thiserror::Error;
#[derive(Debug, Error, Clone)]
enum Error {
#[error("Error({0})")]
Error(Arc<anyhow::Error>)
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(Arc::new(err.into()))
}
}
struct TmpReceiver; struct TmpReceiver;
@ -29,12 +44,12 @@ impl BatchSynchronizedHandler<i16> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register_unsync(TmpReceiver) .register_unsync(TmpReceiver)
.subscribe::<i32, receivers::SynchronizedBatchedAsync<_>, _, _>(16, Default::default()) .subscribe::<i32, receivers::SynchronizedBatchedAsync<_, _, _>, _, _>(16, Default::default())
.subscribe::<i16, receivers::SynchronizedBatchedSync<_>, _, _>(16, Default::default()) .subscribe::<i16, receivers::SynchronizedBatchedSync<_, _, _>, _, _>(16, Default::default())
.done() .done()
.build(); .build();
for i in 1..100 { for i in 1..100i32 {
b.send(i).await.unwrap(); b.send(i).await.unwrap();
} }

View File

@ -1,5 +1,18 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{error::Error, receivers, AsyncSynchronizedHandler, Bus, SynchronizedHandler}; use messagebus::{receivers, AsyncSynchronizedHandler, Bus, Message, error, SynchronizedHandler};
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error)
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
struct TmpReceiver; struct TmpReceiver;
@ -34,8 +47,8 @@ impl AsyncSynchronizedHandler<i16> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register_unsync(TmpReceiver) .register_unsync(TmpReceiver)
.subscribe::<f32, receivers::SynchronizedSync<_>, _, _>(8, Default::default()) .subscribe::<f32, receivers::SynchronizedSync<_, _, _>, _, _>(8, Default::default())
.subscribe::<i16, receivers::SynchronizedAsync<_>, _, _>(8, Default::default()) .subscribe::<i16, receivers::SynchronizedAsync<_, _, _>, _, _>(8, Default::default())
.done() .done()
.build(); .build();

View File

@ -3,10 +3,7 @@ use std::{any::TypeId, collections::HashMap, marker::PhantomData, pin::Pin, sync
use futures::{Future, FutureExt}; use futures::{Future, FutureExt};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::{ use crate::{Bus, BusInner, Message, Untyped, error::StdSyncSendError, receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}};
receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
Bus, BusInner, Message, Untyped,
};
pub trait ReceiverSubscriberBuilder<T, M, R, E>: pub trait ReceiverSubscriberBuilder<T, M, R, E>:
SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
@ -14,7 +11,7 @@ where
T: 'static, T: 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
type Config: Default; type Config: Default;
@ -75,7 +72,7 @@ impl<T> RegisterEntry<UnsyncEntry, T> {
T: Send + 'static, T: Send + 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
S: ReceiverSubscriberBuilder<T, M, R, E> + 'static, S: ReceiverSubscriberBuilder<T, M, R, E> + 'static,
{ {
let (inner, poller) = S::build(cfg); let (inner, poller) = S::build(cfg);
@ -97,7 +94,7 @@ impl<T> RegisterEntry<SyncEntry, T> {
T: Send + Sync + 'static, T: Send + Sync + 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
S: ReceiverSubscriberBuilder<T, M, R, E> + 'static, S: ReceiverSubscriberBuilder<T, M, R, E> + 'static,
{ {
let (inner, poller) = S::build(cfg); let (inner, poller) = S::build(cfg);

View File

@ -1,43 +1,86 @@
use core::fmt; use core::panic;
use std::{any::type_name, sync::Arc};
use anyhow::anyhow; use thiserror::Error;
use tokio::sync::oneshot;
use crate::{Message, SendError}; use crate::Message;
#[derive(Clone)] pub trait StdSyncSendError: std::error::Error + Send + Sync + Unpin + 'static {}
pub struct Error { impl<T: std::error::Error + Send + Sync + Unpin + 'static> StdSyncSendError for T {}
inner: Arc<anyhow::Error>,
#[derive(Debug, Error)]
pub enum VoidError {}
#[derive(Debug, Error)]
pub enum SendError<M: Message> {
#[error("Closed")]
Closed(M),
#[error("Full")]
Full(M),
} }
impl fmt::Debug for Error { #[derive(Debug, Error)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { pub enum Error<M: Message = (), E: StdSyncSendError = VoidError> {
fmt::Debug::fmt(&self.inner, f) #[error("Message Send Error: {0}")]
} SendError(#[from] SendError<M>),
#[error("NoResponse")]
NoResponse,
#[error("NoReceivers")]
NoReceivers,
#[error("Other({0})")]
Other(E),
#[error("Other({0})")]
OtherBoxed(Box<dyn StdSyncSendError>),
} }
impl fmt::Display for Error { impl<M: Message, E: StdSyncSendError> Error<M, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { pub fn into_dyn(self) -> Error<M> {
fmt::Display::fmt(&self.inner, f) match self {
} Error::SendError(inner) => Error::SendError(inner),
} Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers,
impl std::error::Error for Error {} Error::Other(inner) => Error::OtherBoxed(Box::new(inner) as _),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
impl From<anyhow::Error> for Error { }
fn from(t: anyhow::Error) -> Self { }
Self { inner: Arc::new(t) }
} pub fn map<U: From<Box<dyn StdSyncSendError>> + StdSyncSendError>(self) -> Error<M, U> {
} match self {
Error::SendError(inner) => Error::SendError(inner),
impl<T: Message> From<SendError<T>> for Error { Error::NoResponse => Error::NoReceivers,
fn from(t: SendError<T>) -> Self { Error::NoReceivers => Error::NoReceivers,
Self { Error::Other(_) => panic!("expected boxed error!"),
inner: Arc::new(anyhow!( Error::OtherBoxed(inner) => Error::Other(inner.into()),
"Message <{}> Sending Error: {:?}",
type_name::<T>(),
t
)),
} }
} }
} }
impl <E: StdSyncSendError> Error<(), E> {
pub fn specify<M: Message>(self) -> Error<M, E> {
match self {
Error::SendError(_) => panic!("cannot specify type on typed error"),
Error::NoResponse => Error::NoReceivers,
Error::NoReceivers => Error::NoReceivers,
Error::Other(inner) => Error::Other(inner),
Error::OtherBoxed(inner) => Error::OtherBoxed(inner),
}
}
}
impl<M: Message, E: StdSyncSendError> From<oneshot::error::RecvError> for Error<M, E> {
fn from(_: oneshot::error::RecvError) -> Self {
Error::NoResponse
}
}
// impl<M: Message, E: StdSyncSendError, U: StdSyncSendError> From<oneshot::error::RecvError> for Error<M, E> {
// fn from(_: oneshot::error::RecvError) -> Self {
// Error::NoResponse
// }
// }

View File

@ -1,8 +1,8 @@
use crate::{Bus, Message}; use crate::{Bus, Message, error::StdSyncSendError};
use async_trait::async_trait; use async_trait::async_trait;
pub trait Handler<M: Message>: Send + Sync { pub trait Handler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -13,7 +13,7 @@ pub trait Handler<M: Message>: Send + Sync {
#[async_trait] #[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync { pub trait AsyncHandler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -23,7 +23,7 @@ pub trait AsyncHandler<M: Message>: Send + Sync {
} }
pub trait SynchronizedHandler<M: Message>: Send { pub trait SynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -34,7 +34,7 @@ pub trait SynchronizedHandler<M: Message>: Send {
#[async_trait] #[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send { pub trait AsyncSynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>; async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -44,7 +44,7 @@ pub trait AsyncSynchronizedHandler<M: Message>: Send {
} }
pub trait BatchHandler<M: Message>: Send + Sync { pub trait BatchHandler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
@ -55,7 +55,7 @@ pub trait BatchHandler<M: Message>: Send + Sync {
#[async_trait] #[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync { pub trait AsyncBatchHandler<M: Message>: Send + Sync {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
async fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
@ -65,7 +65,7 @@ pub trait AsyncBatchHandler<M: Message>: Send + Sync {
} }
pub trait BatchSynchronizedHandler<M: Message>: Send { pub trait BatchSynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
@ -76,7 +76,7 @@ pub trait BatchSynchronizedHandler<M: Message>: Send {
#[async_trait] #[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send { pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
@ -86,7 +86,7 @@ pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
} }
pub trait LocalHandler<M: Message> { pub trait LocalHandler<M: Message> {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Self::Response, Self::Error>; fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -97,7 +97,7 @@ pub trait LocalHandler<M: Message> {
#[async_trait] #[async_trait]
pub trait LocalAsyncHandler<M: Message> { pub trait LocalAsyncHandler<M: Message> {
type Error: crate::Error; type Error: StdSyncSendError;
type Response: Message; type Response: Message;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Self::Response, Self::Error>; async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Self::Response, Self::Error>;
@ -107,7 +107,7 @@ pub trait LocalAsyncHandler<M: Message> {
} }
pub trait LocalBatchHandler<M: Message> { pub trait LocalBatchHandler<M: Message> {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;
@ -118,7 +118,7 @@ pub trait LocalBatchHandler<M: Message> {
#[async_trait] #[async_trait]
pub trait LocalAsyncBatchHandler<M: Message> { pub trait LocalAsyncBatchHandler<M: Message> {
type Error: crate::Error; type Error: StdSyncSendError + Clone;
type Response: Message; type Response: Message;
async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>; async fn handle(&mut self, msg: Vec<M>, bus: &Bus) -> Result<Vec<Self::Response>, Self::Error>;

View File

@ -2,7 +2,6 @@ mod builder;
mod envelop; mod envelop;
pub mod error; pub mod error;
mod handler; mod handler;
pub mod msgs;
mod receiver; mod receiver;
pub mod receivers; pub mod receivers;
mod trait_object; mod trait_object;
@ -11,13 +10,11 @@ mod trait_object;
extern crate log; extern crate log;
use crate::receiver::Permit; use crate::receiver::Permit;
use anyhow::bail; pub use builder::BusBuilder;
use builder::BusBuilder;
use core::any::{Any, TypeId}; use core::any::{Any, TypeId};
pub use envelop::Message; pub use envelop::Message;
pub use handler::*; pub use handler::*;
use receiver::Receiver; use receiver::Receiver;
pub use receiver::SendError;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -27,13 +24,10 @@ use std::{
}, },
}; };
use tokio::sync::oneshot; use tokio::sync::oneshot;
use error::{Error, SendError, StdSyncSendError};
pub type Untyped = Arc<dyn Any + Send + Sync>; pub type Untyped = Arc<dyn Any + Send + Sync>;
// pub trait ErrorTrait: std::error::Error + Send + Sync + 'static {}
pub trait Error: From<anyhow::Error> + std::error::Error + Clone + Send + Sync + 'static {}
impl<T: From<anyhow::Error> + std::error::Error + Clone + Send + Sync + 'static> Error for T {}
static ID_COUNTER: AtomicU64 = AtomicU64::new(1); static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Clone, Copy, PartialEq)] #[derive(Debug, Clone, Copy, PartialEq)]
@ -114,7 +108,7 @@ impl BusInner {
} }
} }
pub async fn flash_and_sync(&self) { pub async fn flush_and_sync(&self) {
self.flush().await; self.flush().await;
for (_, rs) in &self.receivers { for (_, rs) in &self.receivers {
@ -124,13 +118,6 @@ impl BusInner {
} }
} }
// pub fn stats(&self) -> impl Iterator<Item = ReceiverStats> + '_ {
// self.receivers.iter()
// .map(|(_, i)|i.iter())
// .flatten()
// .map(|r| r.stats())
// }
fn try_reserve(&self, rs: &[Receiver]) -> Option<SmallVec<[Permit; 32]>> { fn try_reserve(&self, rs: &[Receiver]) -> Option<SmallVec<[Permit; 32]>> {
let mut permits = SmallVec::<[Permit; 32]>::new(); let mut permits = SmallVec::<[Permit; 32]>::new();
@ -210,8 +197,8 @@ impl BusInner {
} }
#[inline] #[inline]
pub async fn send<M: Message>(&self, msg: M) -> core::result::Result<(), SendError<M>> { pub async fn send<M: Message>(&self, msg: M) -> core::result::Result<(), Error<M>> {
self.send_ext(msg, SendOptions::Broadcast).await Ok(self.send_ext(msg, SendOptions::Broadcast).await?)
} }
pub async fn send_ext<M: Message>( pub async fn send_ext<M: Message>(
@ -287,19 +274,46 @@ impl BusInner {
&self, &self,
req: M, req: M,
options: SendOptions, options: SendOptions,
) -> anyhow::Result<R> { ) -> Result<R, Error<M>> {
let tid = TypeId::of::<M>(); let tid = TypeId::of::<M>();
let rid = TypeId::of::<R>(); let rid = TypeId::of::<R>();
let mut iter = self.select_receivers(tid, options, Some(rid)); let mut iter = self.select_receivers(tid, options, Some(rid), None);
if let Some(rc) = iter.next() { if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64; let mid = (rc.add_response_waiter(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
rc.send(mid, rc.reserve().await, req)?; rc.send(mid, rc.reserve().await, req)?;
Ok(rx.await?) rx.await?.map_err(|x|x.specify::<M>())
} else { } else {
bail!("No Receivers!"); Err(Error::NoReceivers)
}
}
pub async fn request_we<M, R, E>(
&self,
req: M,
options: SendOptions,
) -> Result<R, Error<M, E>>
where
M: Message,
R: Message,
E: StdSyncSendError
{
let tid = TypeId::of::<M>();
let rid = TypeId::of::<R>();
let eid = TypeId::of::<E>();
let mut iter = self.select_receivers(tid, options, Some(rid), Some(eid));
if let Some(rc) = iter.next() {
let (tx, rx) = oneshot::channel();
let mid = (rc.add_response_waiter_we(tx).unwrap() | 1 << (usize::BITS - 1)) as u64;
rc.send(mid, rc.reserve().await, req)?;
rx.await?
.map_err(|x|x.specify::<M>())
} else {
Err(Error::NoReceivers)
} }
} }
@ -309,20 +323,18 @@ impl BusInner {
tid: TypeId, tid: TypeId,
_options: SendOptions, _options: SendOptions,
rid: Option<TypeId>, rid: Option<TypeId>,
eid: Option<TypeId>,
) -> impl Iterator<Item = &Receiver> + '_ { ) -> impl Iterator<Item = &Receiver> + '_ {
self.receivers self.receivers
.get(&tid) .get(&tid)
.into_iter() .into_iter()
.map(|item| item.iter()) .map(|item| item.iter())
.flatten() .flatten()
.filter(move |x| { .filter(move |x| match (rid, eid) {
let ret_ty = if let Some(rid) = rid { (Some(r), Some(e)) => x.resp_type_id() == r && x.err_type_id() == e,
x.resp_type_id() == rid (Some(r), None) => x.resp_type_id() == r,
} else { (None, Some(e)) => x.err_type_id() == e,
true (None, None) => true,
};
ret_ty
}) })
} }
} }

View File

@ -1,10 +0,0 @@
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Error(pub Arc<anyhow::Error>);
impl<T: Into<anyhow::Error>> From<T> for Error {
fn from(e: T) -> Self {
Self(Arc::new(e.into()))
}
}

View File

@ -1,4 +1,4 @@
use crate::{msgs, trait_object::TraitObject, Bus, Error, Message}; use crate::{Bus, Error, Message, error::{SendError, StdSyncSendError}, trait_object::TraitObject};
use core::{ use core::{
any::TypeId, any::TypeId,
fmt, fmt,
@ -37,7 +37,7 @@ pub trait SendTypedReceiver<M: Message>: Sync {
pub trait ReciveTypedReceiver<M, E>: Sync pub trait ReciveTypedReceiver<M, E>: Sync
where where
M: Message, M: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>; fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<M, E>>;
} }
@ -46,10 +46,10 @@ pub trait ReceiverTrait: Send + Sync {
fn typed(&self) -> AnyReceiver<'_>; fn typed(&self) -> AnyReceiver<'_>;
fn poller(&self) -> AnyPoller<'_>; fn poller(&self) -> AnyPoller<'_>;
fn name(&self) -> &str; fn name(&self) -> &str;
fn stats(&self) -> Result<(), SendError<()>>; fn stats(&self) -> Result<(), Error<Action>>;
fn close(&self) -> Result<(), SendError<()>>; fn close(&self) -> Result<(), Error<Action>>;
fn sync(&self) -> Result<(), SendError<()>>; fn sync(&self) -> Result<(), Error<Action>>;
fn flush(&self) -> Result<(), SendError<()>>; fn flush(&self) -> Result<(), Error<Action>>;
} }
pub trait ReceiverPollerBuilder { pub trait ReceiverPollerBuilder {
@ -76,7 +76,7 @@ pub struct Stats {
} }
#[non_exhaustive] #[non_exhaustive]
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum Action { pub enum Action {
Flush, Flush,
Sync, Sync,
@ -85,10 +85,10 @@ pub enum Action {
} }
#[non_exhaustive] #[non_exhaustive]
#[derive(Debug, Clone)] #[derive(Debug)]
pub enum Event<M, E> { pub enum Event<M, E: StdSyncSendError> {
Response(u64, Result<M, E>), Response(u64, Result<M, Error<(), E>>),
Synchronized(Result<(), E>), Synchronized(Result<(), Error<(), E>>),
Stats(Stats), Stats(Stats),
Flushed, Flushed,
Exited, Exited,
@ -98,7 +98,7 @@ struct ReceiverWrapper<M, R, E, S>
where where
M: Message, M: Message,
R: Message, R: Message,
E: Error, E: StdSyncSendError,
S: 'static, S: 'static,
{ {
inner: S, inner: S,
@ -109,7 +109,7 @@ impl<M, R, E, S> ReceiverTrait for ReceiverWrapper<M, R, E, S>
where where
M: Message, M: Message,
R: Message, R: Message,
E: Error, E: StdSyncSendError,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static, S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{ {
fn name(&self) -> &str { fn name(&self) -> &str {
@ -124,20 +124,20 @@ where
AnyPoller::new(&self.inner) AnyPoller::new(&self.inner)
} }
fn stats(&self) -> Result<(), SendError<()>> { fn stats(&self) -> Result<(), Error<Action>> {
SendUntypedReceiver::send(&self.inner, Action::Stats).map_err(|_| SendError::Closed(())) Ok(SendUntypedReceiver::send(&self.inner, Action::Stats)?)
} }
fn close(&self) -> Result<(), SendError<()>> { fn close(&self) -> Result<(), Error<Action>> {
SendUntypedReceiver::send(&self.inner, Action::Close).map_err(|_| SendError::Closed(())) Ok(SendUntypedReceiver::send(&self.inner, Action::Close)?)
} }
fn sync(&self) -> Result<(), SendError<()>> { fn sync(&self) -> Result<(), Error<Action>> {
SendUntypedReceiver::send(&self.inner, Action::Sync).map_err(|_| SendError::Closed(())) Ok(SendUntypedReceiver::send(&self.inner, Action::Sync)?)
} }
fn flush(&self) -> Result<(), SendError<()>> { fn flush(&self) -> Result<(), Error<Action>> {
SendUntypedReceiver::send(&self.inner, Action::Flush).map_err(|_| SendError::Closed(())) Ok(SendUntypedReceiver::send(&self.inner, Action::Flush)?)
} }
} }
@ -192,7 +192,7 @@ impl<'a> AnyPoller<'a> {
pub fn new<M, E, R>(rcvr: &'a R) -> Self pub fn new<M, E, R>(rcvr: &'a R) -> Self
where where
M: Message, M: Message,
E: crate::Error, E: StdSyncSendError,
R: ReciveTypedReceiver<M, E> + 'static, R: ReciveTypedReceiver<M, E> + 'static,
{ {
let trcvr = rcvr as &(dyn ReciveTypedReceiver<M, E>); let trcvr = rcvr as &(dyn ReciveTypedReceiver<M, E>);
@ -204,7 +204,7 @@ impl<'a> AnyPoller<'a> {
} }
} }
pub fn dyn_typed_receiver<M: Message, E: crate::Error>( pub fn dyn_typed_receiver<M: Message, E: StdSyncSendError>(
&'a self, &'a self,
) -> &'a dyn ReciveTypedReceiver<M, E> { ) -> &'a dyn ReciveTypedReceiver<M, E> {
assert_eq!(self.type_id, TypeId::of::<dyn ReciveTypedReceiver<M, E>>()); assert_eq!(self.type_id, TypeId::of::<dyn ReciveTypedReceiver<M, E>>());
@ -213,35 +213,6 @@ impl<'a> AnyPoller<'a> {
} }
} }
pub enum SendError<M> {
Full(M),
Closed(M),
}
impl<M: fmt::Debug> fmt::Debug for SendError<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::Full(m) => write!(f, "SendError::Full({:?})", m)?,
SendError::Closed(m) => write!(f, "SendError::Closed({:?})", m)?,
}
Ok(())
}
}
impl<M: fmt::Debug> fmt::Display for SendError<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::Full(m) => write!(f, "SendError::Full({:?})", m)?,
SendError::Closed(m) => write!(f, "SendError::Closed({:?})", m)?,
}
Ok(())
}
}
impl<M: fmt::Debug> std::error::Error for SendError<M> {}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ReceiverStats { pub struct ReceiverStats {
pub name: Cow<'static, str>, pub name: Cow<'static, str>,
@ -267,6 +238,7 @@ impl fmt::Display for ReceiverStats {
struct ReceiverContext { struct ReceiverContext {
resp_type_id: TypeId, resp_type_id: TypeId,
err_type_id: TypeId,
limit: u64, limit: u64,
processing: AtomicU64, processing: AtomicU64,
need_flush: AtomicBool, need_flush: AtomicBool,
@ -286,6 +258,7 @@ pub struct Receiver {
inner: Arc<dyn ReceiverTrait>, inner: Arc<dyn ReceiverTrait>,
context: Arc<ReceiverContext>, context: Arc<ReceiverContext>,
waiters: Arc<dyn Any + Send + Sync>, waiters: Arc<dyn Any + Send + Sync>,
waiters_void: Arc<dyn Any + Send + Sync>,
} }
impl fmt::Debug for Receiver { impl fmt::Debug for Receiver {
@ -309,12 +282,13 @@ impl Receiver {
where where
M: Message, M: Message,
R: Message, R: Message,
E: Error, E: StdSyncSendError,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static, S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{ {
Self { Self {
context: Arc::new(ReceiverContext { context: Arc::new(ReceiverContext {
resp_type_id: TypeId::of::<R>(), resp_type_id: TypeId::of::<R>(),
err_type_id: TypeId::of::<E>(),
limit, limit,
processing: AtomicU64::new(0), processing: AtomicU64::new(0),
need_flush: AtomicBool::new(false), need_flush: AtomicBool::new(false),
@ -327,9 +301,8 @@ impl Receiver {
inner, inner,
_m: Default::default(), _m: Default::default(),
}), }),
waiters: Arc::new(sharded_slab::Slab::<oneshot::Sender<R>>::new_with_config::< waiters: Arc::new(sharded_slab::Slab::<oneshot::Sender<Result<R, Error<(), E>>>>::new_with_config::<SlabCfg>()),
SlabCfg, waiters_void: Arc::new(sharded_slab::Slab::<oneshot::Sender<Result<R, Error<()>>>>::new_with_config::<SlabCfg>()),
>()),
} }
} }
@ -338,6 +311,11 @@ impl Receiver {
self.context.resp_type_id self.context.resp_type_id
} }
#[inline]
pub fn err_type_id(&self) -> TypeId {
self.context.err_type_id
}
#[inline] #[inline]
pub fn need_flush(&self) -> bool { pub fn need_flush(&self) -> bool {
self.context.need_flush.load(Ordering::SeqCst) self.context.need_flush.load(Ordering::SeqCst)
@ -430,17 +408,24 @@ impl Receiver {
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> ) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
where where
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
let ctx_clone = self.context.clone(); let ctx_clone = self.context.clone();
let inner_clone = self.inner.clone(); let inner_clone = self.inner.clone();
let waiters = self let waiters = self
.waiters .waiters
.clone() .clone()
.downcast::<Slab<oneshot::Sender<R>>>() .downcast::<Slab::<oneshot::Sender<Result<R, Error<(), E>>>>>()
.unwrap(); .unwrap();
Box::new(move |bus| { let waiters_void = self
.waiters_void
.clone()
.downcast::<Slab::<oneshot::Sender<Result<R, Error<()>>>>>()
.unwrap();
Box::new(move |_| {
Box::pin(async move { Box::pin(async move {
let any_receiver = inner_clone.poller(); let any_receiver = inner_clone.poller();
let receiver = any_receiver.dyn_typed_receiver::<R, E>(); let receiver = any_receiver.dyn_typed_receiver::<R, E>();
@ -458,21 +443,19 @@ impl Receiver {
ctx_clone.processing.fetch_sub(1, Ordering::SeqCst); ctx_clone.processing.fetch_sub(1, Ordering::SeqCst);
ctx_clone.response.notify_one(); ctx_clone.response.notify_one();
match resp { if let Some(waiter) = waiters.take(mid as usize) {
Ok(msg) => { if waiter.send(resp).is_err() {
if let Some(waiter) = waiters.take(mid as usize) { error!("Response cannot be processed!");
if let Err(_msg) = waiter.send(msg) {
error!("Response cannot be processed!");
}
} else if TypeId::of::<R>() != TypeId::of::<()>() {
warn!("Non-void response has no listeners!");
}
} }
Err(err) => { } else if let Some(waiter) = waiters_void.take(mid as usize) {
bus.try_send(msgs::Error(Arc::new(err.into()))).ok(); if waiter.send(resp.map_err(|x|x.into_dyn())).is_err() {
error!("Response cannot be processed!");
} }
} else if TypeId::of::<R>() != TypeId::of::<()>() {
warn!("Non-void response has no listeners!");
} }
} },
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -483,17 +466,32 @@ impl Receiver {
#[inline] #[inline]
pub(crate) fn add_response_waiter<R: Message>( pub(crate) fn add_response_waiter<R: Message>(
&self, &self,
waiter: oneshot::Sender<R>, waiter: oneshot::Sender<Result<R, Error<()>>>,
) -> Option<usize> { ) -> Option<usize> {
let idx = self let idx = self
.waiters .waiters_void
.downcast_ref::<Slab<oneshot::Sender<R>>>() .downcast_ref::<Slab<oneshot::Sender<Result<R, Error<()>>>>>()
.unwrap() .unwrap()
.insert(waiter)?; .insert(waiter)?;
Some(idx) Some(idx)
} }
#[inline]
pub(crate) fn add_response_waiter_we<R: Message, E: StdSyncSendError>(
&self,
waiter: oneshot::Sender<Result<R, Error<(), E>>>,
) -> Option<usize> {
let idx = self
.waiters
.downcast_ref::<Slab<oneshot::Sender<Result<R, Error<(), E>>>>>()
.unwrap()
.insert(waiter)?;
Some(idx)
}
#[inline] #[inline]
pub async fn close(&self) { pub async fn close(&self) {
let notified = self.context.closed.notified(); let notified = self.context.closed.notified();

View File

@ -8,19 +8,17 @@ use std::{
}; };
use crate::{ use crate::{
buffer_unordered_poller_macro, buffer_unordered_poller_macro,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
};
use anyhow::Result;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, error::{Error, StdSyncSendError, SendError},
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver},
receivers::{fix_type, Request},
AsyncHandler, Bus, Message, Untyped, AsyncHandler, Bus, Message, Untyped,
}; };
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -33,11 +31,11 @@ buffer_unordered_poller_macro!(
|bus, ut: Arc<T>| { async move { ut.sync(&bus).await } } |bus, ut: Arc<T>| { async move { ut.sync(&bus).await } }
); );
pub struct BufferUnorderedAsync<M, R = (), E = crate::error::Error> pub struct BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedStats>, stats: Arc<BufferUnorderedStats>,
@ -49,7 +47,7 @@ where
T: AsyncHandler<M, Response = R, Error = E> + 'static, T: AsyncHandler<M, Response = R, Error = E> + 'static,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error, E: StdSyncSendError,
{ {
type Config = BufferUnorderedConfig; type Config = BufferUnorderedConfig;
@ -100,7 +98,7 @@ impl<M, R, E> SendUntypedReceiver for BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, m: Action) -> Result<(), SendError<Action>> { fn send(&self, m: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(m)) { match self.tx.send(Request::Action(m)) {
@ -115,7 +113,7 @@ impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -134,7 +132,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -44,7 +44,7 @@ macro_rules! buffer_unordered_poller_macro {
$t: $h<M, Response = R, Error = E> + 'static, $t: $h<M, Response = R, Error = E> + 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
let ut = ut.downcast::<$t>().unwrap(); let ut = ut.downcast::<$t>().unwrap();
let mut queue = FuturesUnordered::new(); let mut queue = FuturesUnordered::new();
@ -94,8 +94,9 @@ macro_rules! buffer_unordered_poller_macro {
loop { loop {
match queue.poll_next_unpin(cx) { match queue.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Some((mid, res))) => { Poll::Ready(Some((mid, resp))) => {
stx.send(Event::Response(mid, res)).ok(); let resp: Result<_, $t::Error> = resp;
stx.send(Event::Response(mid, resp.map_err(Error::Other))).ok();
} }
Poll::Ready(None) => break, Poll::Ready(None) => break,
} }
@ -111,8 +112,9 @@ macro_rules! buffer_unordered_poller_macro {
if let Some(fut) = sync_future.as_mut() { if let Some(fut) = sync_future.as_mut() {
match unsafe { fix_type(fut) }.poll(cx) { match unsafe { fix_type(fut) }.poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(res) => { Poll::Ready(resp) => {
stx.send(Event::Synchronized(res)).ok(); let resp: Result<_, E> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok();
} }
} }
need_sync = false; need_sync = false;

View File

@ -8,19 +8,16 @@ use std::{
}; };
use crate::{ use crate::{
buffer_unordered_poller_macro, buffer_unordered_poller_macro,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
};
use anyhow::Result;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, error::{Error, StdSyncSendError, SendError},
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver},
receivers::{fix_type, Request},
Bus, Handler, Message, Untyped, Bus, Handler, Message, Untyped,
}; };
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -39,11 +36,11 @@ buffer_unordered_poller_macro!(
} }
); );
pub struct BufferUnorderedSync<M, R = (), E = crate::error::Error> pub struct BufferUnorderedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedStats>, stats: Arc<BufferUnorderedStats>,
@ -55,7 +52,7 @@ where
T: Handler<M, Response = R, Error = E> + 'static, T: Handler<M, Response = R, Error = E> + 'static,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error, E: StdSyncSendError,
{ {
type Config = BufferUnorderedConfig; type Config = BufferUnorderedConfig;
@ -106,7 +103,7 @@ impl<M, R, E> SendUntypedReceiver for BufferUnorderedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, msg: Action) -> Result<(), SendError<Action>> { fn send(&self, msg: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(msg)) { match self.tx.send(Request::Action(msg)) {
@ -121,7 +118,7 @@ impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -140,7 +137,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -7,19 +7,10 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{ use crate::{AsyncBatchHandler, Bus, Message, Untyped, buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receivers::{fix_type, Request}};
buffer_unordered_batch_poller_macro,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use crate::{ use futures::{stream::FuturesUnordered, Future, StreamExt};
builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver},
AsyncBatchHandler, Bus, Message, Untyped,
};
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -37,23 +28,23 @@ buffer_unordered_batch_poller_macro!(
|bus, ut: Arc<T>| { async move { ut.sync(&bus).await } } |bus, ut: Arc<T>| { async move { ut.sync(&bus).await } }
); );
pub struct BufferUnorderedBatchedAsync<M, R = (), E = crate::error::Error> pub struct BufferUnorderedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedBatchedStats>, stats: Arc<BufferUnorderedBatchedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedBatchedAsync<M, R, E> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for BufferUnorderedBatchedAsync<M, R, T::Error>
where where
T: AsyncBatchHandler<M, Response = R, Error = E> + 'static, T: AsyncBatchHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError + Clone,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error,
{ {
type Config = BufferUnorderedBatchedConfig; type Config = BufferUnorderedBatchedConfig;
@ -80,7 +71,7 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(buffer_unordered_batch_poller::<T, M, R, E>( Box::pin(buffer_unordered_batch_poller::<T, M, R>(
rx, rx,
bus, bus,
ut, ut,
@ -92,7 +83,7 @@ where
}); });
( (
BufferUnorderedBatchedAsync::<M, R, E> { BufferUnorderedBatchedAsync::<M, R, T::Error> {
tx, tx,
stats, stats,
srx: Mutex::new(srx), srx: Mutex::new(srx),
@ -106,7 +97,7 @@ impl<M, R, E> SendUntypedReceiver for BufferUnorderedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, m: Action) -> Result<(), SendError<Action>> { fn send(&self, m: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(m)) { match self.tx.send(Request::Action(m)) {
@ -121,7 +112,7 @@ impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -140,7 +131,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -38,19 +38,18 @@ impl Default for BufferUnorderedBatchedConfig {
#[macro_export] #[macro_export]
macro_rules! buffer_unordered_batch_poller_macro { macro_rules! buffer_unordered_batch_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => { ($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn buffer_unordered_batch_poller<$t, M, R, E>( fn buffer_unordered_batch_poller<$t, M, R>(
mut rx: mpsc::UnboundedReceiver<Request<M>>, mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus, bus: Bus,
ut: Untyped, ut: Untyped,
stats: Arc<BufferUnorderedBatchedStats>, stats: Arc<BufferUnorderedBatchedStats>,
cfg: BufferUnorderedBatchedConfig, cfg: BufferUnorderedBatchedConfig,
stx: mpsc::UnboundedSender<Event<R, E>>, stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
) -> impl Future<Output = ()> ) -> impl Future<Output = ()>
where where
$t: $h<M, Response = R, Error = E> + 'static, $t: $h<M, Response = R> + 'static,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error,
{ {
let ut = ut.downcast::<$t>().unwrap(); let ut = ut.downcast::<$t>().unwrap();
let mut buffer_mid = Vec::with_capacity(cfg.batch_size); let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
@ -138,8 +137,7 @@ macro_rules! buffer_unordered_batch_poller_macro {
} else { } else {
stx.send(Event::Response( stx.send(Event::Response(
mid, mid,
Err(anyhow::anyhow!("no response from batch!") Err(Error::NoResponse),
.into()),
)) ))
.ok(); .ok();
} }
@ -147,7 +145,7 @@ macro_rules! buffer_unordered_batch_poller_macro {
} }
Err(er) => { Err(er) => {
for mid in mids { for mid in mids {
stx.send(Event::Response(mid, Err(er.clone()))).ok(); stx.send(Event::Response(mid, Err(Error::Other(er.clone())))).ok();
} }
} }
}, },
@ -165,8 +163,9 @@ macro_rules! buffer_unordered_batch_poller_macro {
if let Some(fut) = sync_future.as_mut() { if let Some(fut) = sync_future.as_mut() {
match unsafe { fix_type(fut) }.poll(cx) { match unsafe { fix_type(fut) }.poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(res) => { Poll::Ready(resp) => {
stx.send(Event::Synchronized(res)).ok(); let resp: Result<_, $t::Error> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok();
} }
} }
need_sync = false; need_sync = false;

View File

@ -7,21 +7,10 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{ use crate::{BatchHandler, Bus, Message, Untyped, buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver}, receivers::{fix_type, Request}};
buffer_unordered_batch_poller_macro,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
BatchHandler,
};
use anyhow::Result;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use crate::{
builder::ReceiverSubscriberBuilder, use futures::{stream::FuturesUnordered, Future, StreamExt};
receiver::{SendError, SendTypedReceiver},
Bus, Message, Untyped,
};
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -48,23 +37,23 @@ buffer_unordered_batch_poller_macro!(
} }
); );
pub struct BufferUnorderedBatchedSync<M, R = (), E = crate::error::Error> pub struct BufferUnorderedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedBatchedStats>, stats: Arc<BufferUnorderedBatchedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedBatchedSync<M, R, E> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for BufferUnorderedBatchedSync<M, R, T::Error>
where where
T: BatchHandler<M, Response = R, Error = E> + 'static, T: BatchHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error,
{ {
type Config = BufferUnorderedBatchedConfig; type Config = BufferUnorderedBatchedConfig;
@ -91,7 +80,7 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(buffer_unordered_batch_poller::<T, M, R, E>( Box::pin(buffer_unordered_batch_poller::<T, M, R>(
rx, rx,
bus, bus,
ut, ut,
@ -103,7 +92,7 @@ where
}); });
( (
BufferUnorderedBatchedSync::<M, R, E> { BufferUnorderedBatchedSync::<M, R, T::Error> {
tx, tx,
stats, stats,
srx: Mutex::new(srx), srx: Mutex::new(srx),
@ -117,7 +106,7 @@ impl<M, R, E> SendUntypedReceiver for BufferUnorderedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, msg: Action) -> Result<(), SendError<Action>> { fn send(&self, msg: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(msg)) { match self.tx.send(Request::Action(msg)) {
@ -132,7 +121,7 @@ impl<M, R, E> SendTypedReceiver<M> for BufferUnorderedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -151,7 +140,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for BufferUnorderedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -6,18 +6,17 @@ use std::{
use crate::{ use crate::{
batch_synchronized_poller_macro, batch_synchronized_poller_macro,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, error::{Error, SendError, StdSyncSendError},
builder::ReceiverSubscriberBuilder,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver},
receivers::{fix_type, Request}, receivers::{fix_type, Request},
AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
}; };
use anyhow::Result;
use futures::Future; use futures::Future;
use super::SynchronizedBatchedConfig; use super::SynchronizedBatchedConfig;
use crate::{
builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver},
AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
};
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
batch_synchronized_poller_macro! { batch_synchronized_poller_macro! {
@ -31,22 +30,22 @@ batch_synchronized_poller_macro! {
|bus, ut: Arc<Mutex<T>>| { async move { ut.lock().await.sync(&bus).await } } |bus, ut: Arc<Mutex<T>>| { async move { ut.lock().await.sync(&bus).await } }
} }
pub struct SynchronizedBatchedAsync<M, R = (), E = crate::error::Error> pub struct SynchronizedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedBatchedAsync<M, R, E> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for SynchronizedBatchedAsync<M, R, T::Error>
where where
T: AsyncBatchSynchronizedHandler<M, Response = R, Error = E> + 'static, T: AsyncBatchSynchronizedHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError + Clone,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error,
{ {
type Config = SynchronizedBatchedConfig; type Config = SynchronizedBatchedConfig;
@ -63,14 +62,14 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(batch_synchronized_poller::<T, M, R, E>( Box::pin(batch_synchronized_poller::<T, M, R>(
rx, bus, ut, cfg, stx, rx, bus, ut, cfg, stx,
)) as Pin<Box<dyn Future<Output = ()> + Send>> )) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
( (
SynchronizedBatchedAsync::<M, R, E> { SynchronizedBatchedAsync::<M, R, T::Error> {
tx, tx,
srx: parking_lot::Mutex::new(srx), srx: parking_lot::Mutex::new(srx),
}, },
@ -83,7 +82,7 @@ impl<M, R, E> SendUntypedReceiver for SynchronizedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, m: Action) -> Result<(), SendError<Action>> { fn send(&self, m: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(m)) { match self.tx.send(Request::Action(m)) {
@ -98,7 +97,7 @@ impl<M, R, E> SendTypedReceiver<M> for SynchronizedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -113,7 +112,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedBatchedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -34,19 +34,19 @@ impl Default for SynchronizedBatchedConfig {
#[macro_export] #[macro_export]
macro_rules! batch_synchronized_poller_macro { macro_rules! batch_synchronized_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => { ($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn batch_synchronized_poller<$t, M, R, E>( fn batch_synchronized_poller<$t, M, R>(
mut rx: mpsc::UnboundedReceiver<Request<M>>, mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus, bus: Bus,
ut: Untyped, ut: Untyped,
// stats: Arc<SynchronizedBatchedStats>, // stats: Arc<SynchronizedBatchedStats>,
cfg: SynchronizedBatchedConfig, cfg: SynchronizedBatchedConfig,
stx: mpsc::UnboundedSender<Event<R, E>>, stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
) -> impl Future<Output = ()> ) -> impl Future<Output = ()>
where where
$t: $h<M, Response = R, Error = E> + 'static, $t: $h<M, Response = R> + 'static,
$t::Error: StdSyncSendError + Clone,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error,
{ {
let ut = ut.downcast::<Mutex<T>>().unwrap(); let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut buffer_mid = Vec::with_capacity(cfg.batch_size); let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
@ -76,8 +76,7 @@ macro_rules! batch_synchronized_poller_macro {
} else { } else {
stx.send(Event::Response( stx.send(Event::Response(
mid, mid,
Err(anyhow::anyhow!("no response from batch!") Err(Error::NoResponse),
.into()),
)) ))
.ok(); .ok();
} }
@ -85,9 +84,9 @@ macro_rules! batch_synchronized_poller_macro {
} }
Err(er) => { Err(er) => {
let er: E = er; let er: $t::Error = er;
for mid in mids { for mid in mids {
stx.send(Event::Response(mid, Err(er.clone()))).ok(); stx.send(Event::Response(mid, Err(Error::Other(er.clone())))).ok();
} }
} }
} }
@ -157,9 +156,10 @@ macro_rules! batch_synchronized_poller_macro {
// SAFETY: safe bacause pinnet to async generator `stack` which should be pinned // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
match unsafe { fix_type(fut) }.poll(cx) { match unsafe { fix_type(fut) }.poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(res) => { Poll::Ready(resp) => {
need_sync = false; need_sync = false;
stx.send(Event::Synchronized(res)).ok(); let resp: Result<_, $t::Error> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok();
} }
} }
sync_future = None; sync_future = None;

View File

@ -5,19 +5,16 @@ use std::{
}; };
use crate::{ use crate::{
batch_synchronized_poller_macro, batch_synchronized_poller_macro,
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
};
use anyhow::Result;
use futures::{executor::block_on, Future};
use super::SynchronizedBatchedConfig;
use crate::{
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, error::{Error, StdSyncSendError, SendError},
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver, SendTypedReceiver},
receivers::{fix_type, Request},
BatchSynchronizedHandler, Bus, Message, Untyped, BatchSynchronizedHandler, Bus, Message, Untyped,
}; };
use super::SynchronizedBatchedConfig;
use futures::{executor::block_on, Future};
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
batch_synchronized_poller_macro! { batch_synchronized_poller_macro! {
@ -37,22 +34,22 @@ batch_synchronized_poller_macro! {
} }
} }
pub struct SynchronizedBatchedSync<M, R = (), E = crate::error::Error> pub struct SynchronizedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedBatchedSync<M, R, E> impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error> for SynchronizedBatchedSync<M, R, T::Error>
where where
T: BatchSynchronizedHandler<M, Response = R, Error = E> + 'static, T: BatchSynchronizedHandler<M, Response = R> + 'static,
T::Error: StdSyncSendError,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error,
{ {
type Config = SynchronizedBatchedConfig; type Config = SynchronizedBatchedConfig;
@ -69,14 +66,14 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(batch_synchronized_poller::<T, M, R, E>( Box::pin(batch_synchronized_poller::<T, M, R>(
rx, bus, ut, cfg, stx, rx, bus, ut, cfg, stx,
)) as Pin<Box<dyn Future<Output = ()> + Send>> )) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
( (
SynchronizedBatchedSync::<M, R, E> { SynchronizedBatchedSync::<M, R, T::Error> {
tx, tx,
srx: parking_lot::Mutex::new(srx), srx: parking_lot::Mutex::new(srx),
}, },
@ -89,7 +86,7 @@ impl<M, R, E> SendUntypedReceiver for SynchronizedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, msg: Action) -> Result<(), SendError<Action>> { fn send(&self, msg: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(msg)) { match self.tx.send(Request::Action(msg)) {
@ -104,7 +101,7 @@ impl<M, R, E> SendTypedReceiver<M> for SynchronizedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -119,7 +116,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedBatchedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -4,18 +4,15 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{ use crate::synchronized_poller_macro;
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
synchronized_poller_macro,
};
use anyhow::Result;
use futures::Future; use futures::Future;
use super::SynchronizedConfig; use super::SynchronizedConfig;
use crate::{ use crate::{
error::{Error, StdSyncSendError, SendError},
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, receiver::{SendTypedReceiver, Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::{fix_type, Request},
AsyncSynchronizedHandler, Bus, Message, Untyped, AsyncSynchronizedHandler, Bus, Message, Untyped,
}; };
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
@ -31,11 +28,11 @@ synchronized_poller_macro! {
} }
} }
pub struct SynchronizedAsync<M, R = (), E = crate::error::Error> pub struct SynchronizedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
@ -46,7 +43,7 @@ where
T: AsyncSynchronizedHandler<M, Response = R, Error = E> + 'static, T: AsyncSynchronizedHandler<M, Response = R, Error = E> + 'static,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error, E: StdSyncSendError,
{ {
type Config = SynchronizedConfig; type Config = SynchronizedConfig;
@ -63,7 +60,7 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(synchronized_poller::<T, M, R, E>(rx, bus, ut, stx)) Box::pin(synchronized_poller::<T, M, R>(rx, bus, ut, stx))
as Pin<Box<dyn Future<Output = ()> + Send>> as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
@ -82,7 +79,7 @@ impl<M, R, E> SendUntypedReceiver for SynchronizedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, m: Action) -> Result<(), SendError<Action>> { fn send(&self, m: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(m)) { match self.tx.send(Request::Action(m)) {
@ -97,7 +94,7 @@ impl<M, R, E> SendTypedReceiver<M> for SynchronizedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -112,7 +109,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedAsync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);

View File

@ -26,17 +26,17 @@ impl Default for SynchronizedConfig {
#[macro_export] #[macro_export]
macro_rules! synchronized_poller_macro { macro_rules! synchronized_poller_macro {
($t: tt, $h: tt, $st1: expr, $st2: expr) => { ($t: tt, $h: tt, $st1: expr, $st2: expr) => {
fn synchronized_poller<$t, M, R, E>( fn synchronized_poller<$t, M, R>(
mut rx: mpsc::UnboundedReceiver<Request<M>>, mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus, bus: Bus,
ut: Untyped, ut: Untyped,
stx: mpsc::UnboundedSender<Event<R, E>>, stx: mpsc::UnboundedSender<Event<R, $t::Error>>,
) -> impl Future<Output = ()> ) -> impl Future<Output = ()>
where where
$t: $h<M, Response = R, Error = E> + 'static, $t: $h<M, Response = R> + 'static,
$t::Error: StdSyncSendError,
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error,
{ {
let ut = ut.downcast::<Mutex<T>>().unwrap(); let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut handle_future = None; let mut handle_future = None;
@ -50,7 +50,8 @@ macro_rules! synchronized_poller_macro {
match unsafe { fix_type(fut) }.poll(cx) { match unsafe { fix_type(fut) }.poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready((mid, resp)) => { Poll::Ready((mid, resp)) => {
stx.send(Event::Response(mid, resp)).ok(); let resp: Result<_, $t::Error> = resp;
stx.send(Event::Response(mid, resp.map_err(Error::Other))).ok();
} }
} }
} }
@ -87,9 +88,10 @@ macro_rules! synchronized_poller_macro {
// SAFETY: safe bacause pinnet to async generator `stack` which should be pinned // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
match unsafe { fix_type(fut) }.poll(cx) { match unsafe { fix_type(fut) }.poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(res) => { Poll::Ready(resp) => {
need_sync = false; need_sync = false;
stx.send(Event::Synchronized(res)).ok(); let resp: Result<_, $t::Error> = resp;
stx.send(Event::Synchronized(resp.map_err(Error::Other))).ok();
} }
} }
sync_future = None; sync_future = None;

View File

@ -4,19 +4,16 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{ use crate::synchronized_poller_macro;
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, use futures::{Future, executor::block_on};
receivers::{fix_type, Request},
synchronized_poller_macro,
};
use anyhow::Result;
use futures::{executor::block_on, Future};
use super::SynchronizedConfig; use super::SynchronizedConfig;
use crate::{ use crate::{
error::{Error, StdSyncSendError, SendError},
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, receiver::{SendTypedReceiver, Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
Bus, Message, SynchronizedHandler, Untyped, receivers::{fix_type, Request},
SynchronizedHandler, Bus, Message, Untyped,
}; };
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
@ -37,11 +34,11 @@ synchronized_poller_macro! {
} }
} }
pub struct SynchronizedSync<M, R = (), E = crate::error::Error> pub struct SynchronizedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
@ -52,7 +49,7 @@ where
T: SynchronizedHandler<M, Response = R, Error = E> + 'static, T: SynchronizedHandler<M, Response = R, Error = E> + 'static,
R: Message, R: Message,
M: Message, M: Message,
E: crate::Error, E: StdSyncSendError,
{ {
type Config = SynchronizedConfig; type Config = SynchronizedConfig;
@ -69,7 +66,7 @@ where
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(synchronized_poller::<T, M, R, E>(rx, bus, ut, stx)) Box::pin(synchronized_poller::<T, M, R>(rx, bus, ut, stx))
as Pin<Box<dyn Future<Output = ()> + Send>> as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
@ -88,7 +85,7 @@ impl<M, R, E> SendUntypedReceiver for SynchronizedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, msg: Action) -> Result<(), SendError<Action>> { fn send(&self, msg: Action) -> Result<(), SendError<Action>> {
match self.tx.send(Request::Action(msg)) { match self.tx.send(Request::Action(msg)) {
@ -103,7 +100,7 @@ impl<M, R, E> SendTypedReceiver<M> for SynchronizedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
@ -118,7 +115,7 @@ impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedSync<M, R, E>
where where
M: Message, M: Message,
R: Message, R: Message,
E: crate::Error, E: StdSyncSendError,
{ {
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> { fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx); let poll = self.srx.lock().poll_recv(ctx);