make clippy happy

This commit is contained in:
Andrey Tkachenko 2021-09-21 14:47:21 +04:00
parent 9d014d38a1
commit 22d1d4a569
21 changed files with 107 additions and 144 deletions

View File

@ -141,15 +141,11 @@ struct TypeTag {
impl Parse for TypeTag { impl Parse for TypeTag {
fn parse(input: ParseStream) -> Result<Self> { fn parse(input: ParseStream) -> Result<Self> {
let mut inner = None;
let content; let content;
parenthesized!(content in input); parenthesized!(content in input);
let punctuated = Punctuated::<syn::LitStr, Comma>::parse_terminated(&content)?; let punctuated = Punctuated::<syn::LitStr, Comma>::parse_terminated(&content)?;
for pair in punctuated.pairs() { let inner = punctuated.pairs().map(|x| x.into_value()).next();
inner = Some(pair.into_value());
break;
}
Ok(TypeTag { Ok(TypeTag {
inner: inner.unwrap().to_owned(), inner: inner.unwrap().to_owned(),

View File

@ -9,12 +9,21 @@ use tokio::sync::Mutex;
use crate::{ use crate::{
envelop::TypeTag, envelop::TypeTag,
error::{Error, StdSyncSendError}, error::{Error, StdSyncSendError},
receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
BusPollerCallback, Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler,
AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler,
IntoBoxedMessage, Message, Relay, SynchronizedHandler, Untyped, IntoBoxedMessage, Message, Relay, SynchronizedHandler, Untyped,
}; };
type MessageDeserializerCallback = Box<
dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result<Box<dyn Message>, Error>
+ Send
+ Sync,
>;
pub trait ReceiverSubscriberBuilder<T, M, R, E>: pub trait ReceiverSubscriberBuilder<T, M, R, E>:
SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E>
where where
@ -25,14 +34,7 @@ where
{ {
type Config: Default; type Config: Default;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback)
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
)
where where
Self: Sized; Self: Sized;
} }
@ -47,14 +49,14 @@ pub struct RegisterEntry<K, T, F, P, B> {
builder: F, builder: F,
poller: P, poller: P,
receivers: HashMap<TypeTag, Receiver>, receivers: HashMap<TypeTag, Receiver>,
pollers: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>, pollers: Vec<BusPollerCallback>,
_m: PhantomData<(K, T)>, _m: PhantomData<(K, T)>,
} }
impl<K, T: 'static, F, P, B> RegisterEntry<K, T, F, P, B> impl<K, T: 'static, F, P, B> RegisterEntry<K, T, F, P, B>
where where
F: FnMut(&mut B, TypeTag, Receiver), F: FnMut(&mut B, TypeTag, Receiver),
P: FnMut(&mut B, Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>), P: FnMut(&mut B, BusPollerCallback),
{ {
pub fn done(mut self) -> B { pub fn done(mut self) -> B {
for (tid, v) in self.receivers { for (tid, v) in self.receivers {
@ -209,11 +211,7 @@ impl<T, F, P, B> RegisterEntry<SyncEntry, T, F, P, B> {
} }
pub struct MessageTypeDescriptor { pub struct MessageTypeDescriptor {
de: Box< de: MessageDeserializerCallback,
dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result<Box<dyn Message>, Error>
+ Send
+ Sync,
>,
} }
impl MessageTypeDescriptor { impl MessageTypeDescriptor {
@ -226,10 +224,11 @@ impl MessageTypeDescriptor {
} }
} }
#[derive(Default)]
pub struct Module { pub struct Module {
message_types: HashMap<TypeTag, MessageTypeDescriptor>, message_types: HashMap<TypeTag, MessageTypeDescriptor>,
receivers: HashMap<TypeTag, SmallVec<[Receiver; 4]>>, receivers: HashMap<TypeTag, SmallVec<[Receiver; 4]>>,
pollings: Vec<Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>>, pollings: Vec<BusPollerCallback>,
} }
impl Module { impl Module {

View File

@ -136,11 +136,11 @@ impl Message for () {
return false; return false;
}; };
into.replace(self.clone()); into.replace(());
true true
} }
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> { fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(self.clone())) Some(Box::new(()))
} }
} }

View File

@ -95,7 +95,7 @@ impl Bus {
} }
pub(crate) fn init(&self) { pub(crate) fn init(&self) {
for (_, rs) in &self.inner.receivers { for rs in self.inner.receivers.values() {
for r in rs { for r in rs {
r.init(self).unwrap(); r.init(self).unwrap();
} }
@ -103,7 +103,7 @@ impl Bus {
} }
pub async fn ready(&self) { pub async fn ready(&self) {
for (_, rs) in &self.inner.receivers { for rs in self.inner.receivers.values() {
for r in rs { for r in rs {
r.ready().await; r.ready().await;
} }
@ -114,7 +114,7 @@ impl Bus {
let _handle = self.inner.maintain.lock().await; let _handle = self.inner.maintain.lock().await;
self.inner.closed.store(true, Ordering::SeqCst); self.inner.closed.store(true, Ordering::SeqCst);
for (_, rs) in &self.inner.receivers { for rs in self.inner.receivers.values() {
for r in rs { for r in rs {
let err = tokio::time::timeout(Duration::from_secs(20), r.close(self)).await; let err = tokio::time::timeout(Duration::from_secs(20), r.close(self)).await;
@ -133,7 +133,7 @@ impl Bus {
for _ in 0..fuse_count { for _ in 0..fuse_count {
iters += 1; iters += 1;
let mut flushed = false; let mut flushed = false;
for (_, rs) in &self.inner.receivers { for rs in self.inner.receivers.values() {
for r in rs { for r in rs {
if r.need_flush() { if r.need_flush() {
flushed = true; flushed = true;
@ -163,7 +163,7 @@ impl Bus {
self.flush().await; self.flush().await;
let _handle = self.inner.maintain.lock().await; let _handle = self.inner.maintain.lock().await;
for (_, rs) in &self.inner.receivers { for rs in self.inner.receivers.values() {
for r in rs { for r in rs {
r.sync(self).await; r.sync(self).await;
} }
@ -598,7 +598,7 @@ impl Bus {
.inner .inner
.message_types .message_types
.get(&tt) .get(&tt)
.ok_or_else(|| Error::TypeTagNotRegistered(tt))?; .ok_or(Error::TypeTagNotRegistered(tt))?;
md.deserialize_boxed(de) md.deserialize_boxed(de)
.map_err(|err| err.specify::<Box<dyn Message>>()) .map_err(|err| err.specify::<Box<dyn Message>>())

View File

@ -1,5 +1,6 @@
use crate::relay::RelayWrapper; use crate::relay::RelayWrapper;
use crate::stats::Stats; use crate::stats::Stats;
use crate::Untyped;
use crate::{ use crate::{
envelop::{IntoBoxedMessage, TypeTag}, envelop::{IntoBoxedMessage, TypeTag},
error::{GenericError, SendError, StdSyncSendError}, error::{GenericError, SendError, StdSyncSendError},
@ -19,6 +20,9 @@ use futures::{Future, FutureExt, StreamExt};
use std::{borrow::Cow, sync::Arc}; use std::{borrow::Cow, sync::Arc};
use tokio::sync::{oneshot, Notify}; use tokio::sync::{oneshot, Notify};
pub type BusPollerCallback = Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>;
pub type UntypedPollerCallback = Box<dyn FnOnce(Untyped) -> BusPollerCallback>;
struct SlabCfg; struct SlabCfg;
impl sharded_slab::Config for SlabCfg { impl sharded_slab::Config for SlabCfg {
const RESERVED_BITS: usize = 1; const RESERVED_BITS: usize = 1;
@ -74,9 +78,7 @@ pub trait WrapperErrorTypeOnly<E: StdSyncSendError>: Send + Sync {
} }
pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Sync { pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Sync {
fn start_polling_events( fn start_polling_events(self: Arc<Self>) -> BusPollerCallback;
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>;
fn add_response_listener( fn add_response_listener(
&self, &self,
listener: oneshot::Sender<Result<R, Error<(), E>>>, listener: oneshot::Sender<Result<R, Error<(), E>>>,
@ -123,9 +125,7 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync {
fn reserve_notify(&self, tt: &TypeTag) -> Arc<Notify>; fn reserve_notify(&self, tt: &TypeTag) -> Arc<Notify>;
fn increment_processing(&self, tt: &TypeTag); fn increment_processing(&self, tt: &TypeTag);
fn start_polling( fn start_polling(self: Arc<Self>) -> BusPollerCallback;
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>;
} }
pub trait ReceiverPollerBuilder { pub trait ReceiverPollerBuilder {
@ -181,9 +181,7 @@ where
E: StdSyncSendError, E: StdSyncSendError,
S: SendUntypedReceiver + ReciveTypedReceiver<R, E> + Send + Sync + 'static, S: SendUntypedReceiver + ReciveTypedReceiver<R, E> + Send + Sync + 'static,
{ {
fn start_polling_events( fn start_polling_events(self: Arc<Self>) -> BusPollerCallback {
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
Box::new(move |_| { Box::new(move |_| {
Box::pin(async move { Box::pin(async move {
let this = self.clone(); let this = self.clone();
@ -253,7 +251,7 @@ where
Ok(self Ok(self
.waiters .waiters
.insert(Waiter::WithErrorType(listener)) .insert(Waiter::WithErrorType(listener))
.ok_or_else(|| Error::AddListenerError)? as _) .ok_or(Error::AddListenerError)? as _)
} }
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error> { fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error> {
@ -291,7 +289,7 @@ where
Ok(self Ok(self
.waiters .waiters
.insert(Waiter::WithoutErrorType(listener)) .insert(Waiter::WithoutErrorType(listener))
.ok_or_else(|| Error::AddListenerError)? as _) .ok_or(Error::AddListenerError)? as _)
} }
} }
@ -309,7 +307,7 @@ where
Ok(self Ok(self
.waiters .waiters
.insert(Waiter::BoxedWithError(listener)) .insert(Waiter::BoxedWithError(listener))
.ok_or_else(|| Error::AddListenerError)? as _) .ok_or(Error::AddListenerError)? as _)
} }
} }
@ -372,8 +370,8 @@ where
.downcast::<M>() .downcast::<M>()
.map_err(|_| Error::MessageCastError)?; .map_err(|_| Error::MessageCastError)?;
Ok(SendTypedReceiver::send(&self.inner, mid, *boxed, req, bus) SendTypedReceiver::send(&self.inner, mid, *boxed, req, bus)
.map_err(|err| Error::from(err.into_boxed()))?) .map_err(|err| Error::from(err.into_boxed()))
} }
fn stats(&self) -> Stats { fn stats(&self) -> Stats {
@ -417,7 +415,7 @@ where
Ok(self Ok(self
.waiters .waiters
.insert(Waiter::Boxed(listener)) .insert(Waiter::Boxed(listener))
.ok_or_else(|| Error::AddListenerError)? as _) .ok_or(Error::AddListenerError)? as _)
} }
fn ready_notify(&self) -> &Notify { fn ready_notify(&self) -> &Notify {
@ -465,9 +463,7 @@ where
self.context.response.clone() self.context.response.clone()
} }
fn start_polling( fn start_polling(self: Arc<Self>) -> BusPollerCallback {
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
self.start_polling_events() self.start_polling_events()
} }
@ -840,9 +836,7 @@ impl Receiver {
} }
#[inline] #[inline]
pub fn start_polling( pub fn start_polling(&self) -> BusPollerCallback {
&self,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
self.inner.clone().start_polling() self.inner.clone().start_polling()
} }

View File

@ -10,7 +10,10 @@ use crate::{
buffer_unordered_poller_macro, buffer_unordered_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers::Request, receivers::Request,
AsyncHandler, Bus, Message, Untyped, AsyncHandler, Bus, Message, Untyped,
}; };
@ -56,14 +59,7 @@ where
{ {
type Config = BufferUnorderedConfig; type Config = BufferUnorderedConfig;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) {
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let stats = Arc::new(BufferUnorderedStats { let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0), buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _), buffer_total: AtomicU64::new(cfg.buffer_size as _),

View File

@ -52,6 +52,7 @@ macro_rules! buffer_unordered_poller_macro {
while let Some(msg) = rx.recv().await { while let Some(msg) = rx.recv().await {
match msg { match msg {
Request::Request(mid, msg, _req) => { Request::Request(mid, msg, _req) => {
#[allow(clippy::redundant_closure_call)]
let _ = ($st1)( let _ = ($st1)(
mid, mid,
msg, msg,
@ -74,6 +75,8 @@ macro_rules! buffer_unordered_poller_macro {
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {
let lock = semaphore.acquire_many(cfg.max_parallel as _).await; let lock = semaphore.acquire_many(cfg.max_parallel as _).await;
#[allow(clippy::redundant_closure_call)]
let resp = ($st2)(bus.clone(), ut.clone()).await; let resp = ($st2)(bus.clone(), ut.clone()).await;
drop(lock); drop(lock);

View File

@ -11,7 +11,10 @@ use crate::{
buffer_unordered_poller_macro, buffer_unordered_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers::Request, receivers::Request,
Bus, Handler, Message, Untyped, Bus, Handler, Message, Untyped,
}; };
@ -59,14 +62,7 @@ where
{ {
type Config = BufferUnorderedConfig; type Config = BufferUnorderedConfig;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) {
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let stats = Arc::new(BufferUnorderedStats { let stats = Arc::new(BufferUnorderedStats {
buffer: AtomicU64::new(0), buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _), buffer_total: AtomicU64::new(cfg.buffer_size as _),

View File

@ -10,7 +10,10 @@ use crate::{
buffer_unordered_batch_poller_macro, buffer_unordered_batch_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers::Request, receivers::Request,
AsyncBatchHandler, Bus, Message, Untyped, AsyncBatchHandler, Bus, Message, Untyped,
}; };
@ -55,14 +58,7 @@ where
{ {
type Config = BufferUnorderedBatchedConfig; type Config = BufferUnorderedBatchedConfig;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) {
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let stats = Arc::new(BufferUnorderedBatchedStats { let stats = Arc::new(BufferUnorderedBatchedStats {
buffer: AtomicU64::new(0), buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _), buffer_total: AtomicU64::new(cfg.buffer_size as _),

View File

@ -74,6 +74,7 @@ macro_rules! buffer_unordered_batch_poller_macro {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>(); let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect(); let buffer_clone = buffer.drain(..).collect();
#[allow(clippy::redundant_closure_call)]
let _ = let _ =
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx); ($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
} }
@ -92,6 +93,7 @@ macro_rules! buffer_unordered_batch_poller_macro {
let buffer_clone = buffer.drain(..).collect(); let buffer_clone = buffer.drain(..).collect();
let task_permit = semaphore.clone().acquire_owned().await; let task_permit = semaphore.clone().acquire_owned().await;
#[allow(clippy::redundant_closure_call)]
let _ = let _ =
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx); ($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
} }
@ -102,6 +104,8 @@ macro_rules! buffer_unordered_batch_poller_macro {
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {
let lock = semaphore.acquire_many(cfg.max_parallel as _).await; let lock = semaphore.acquire_many(cfg.max_parallel as _).await;
#[allow(clippy::redundant_closure_call)]
let resp = ($st2)(bus.clone(), ut.clone()).await; let resp = ($st2)(bus.clone(), ut.clone()).await;
drop(lock); drop(lock);

View File

@ -11,7 +11,10 @@ use crate::{
buffer_unordered_batch_poller_macro, buffer_unordered_batch_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers::Request, receivers::Request,
BatchHandler, Bus, Message, Untyped, BatchHandler, Bus, Message, Untyped,
}; };
@ -61,14 +64,7 @@ where
{ {
type Config = BufferUnorderedBatchedConfig; type Config = BufferUnorderedBatchedConfig;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) {
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let stats = Arc::new(BufferUnorderedBatchedStats { let stats = Arc::new(BufferUnorderedBatchedStats {
buffer: AtomicU64::new(0), buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _), buffer_total: AtomicU64::new(cfg.buffer_size as _),

View File

@ -5,7 +5,10 @@ use crate::{
batch_synchronized_poller_macro, batch_synchronized_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers::Request, receivers::Request,
AsyncBatchSynchronizedHandler, Bus, Message, Untyped, AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
}; };
@ -49,14 +52,7 @@ where
{ {
type Config = SynchronizedBatchedConfig; type Config = SynchronizedBatchedConfig;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) {
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let (stx, srx) = mpsc::unbounded_channel(); let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();

View File

@ -67,6 +67,7 @@ macro_rules! batch_synchronized_poller_macro {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>(); let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect(); let buffer_clone = buffer.drain(..).collect();
#[allow(clippy::redundant_closure_call)]
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx); let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
} }
} }
@ -83,6 +84,7 @@ macro_rules! batch_synchronized_poller_macro {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>(); let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect(); let buffer_clone = buffer.drain(..).collect();
#[allow(clippy::redundant_closure_call)]
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx); let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
} }
@ -90,6 +92,7 @@ macro_rules! batch_synchronized_poller_macro {
} }
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {
#[allow(clippy::redundant_closure_call)]
let resp = ($st2)(bus.clone(), ut.clone()).await; let resp = ($st2)(bus.clone(), ut.clone()).await;
stx.send(Event::Synchronized(resp.map_err(Error::Other))) stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap(); .unwrap();

View File

@ -4,7 +4,10 @@ use crate::{
batch_synchronized_poller_macro, batch_synchronized_poller_macro,
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError}, error::{Error, SendError, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
},
receivers::Request, receivers::Request,
BatchSynchronizedHandler, Bus, Message, Untyped, BatchSynchronizedHandler, Bus, Message, Untyped,
}; };
@ -53,14 +56,7 @@ where
{ {
type Config = SynchronizedBatchedConfig; type Config = SynchronizedBatchedConfig;
fn build( fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) {
cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let (stx, srx) = mpsc::unbounded_channel(); let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();

View File

@ -1,6 +1,6 @@
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use crate::synchronized_poller_macro; use crate::{receiver::UntypedPollerCallback, synchronized_poller_macro};
use futures::{Future, Stream}; use futures::{Future, Stream};
use super::SynchronizedConfig; use super::SynchronizedConfig;
@ -52,14 +52,7 @@ where
{ {
type Config = SynchronizedConfig; type Config = SynchronizedConfig;
fn build( fn build(_cfg: Self::Config) -> (Self, UntypedPollerCallback) {
_cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let (stx, srx) = mpsc::unbounded_channel(); let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();

View File

@ -42,7 +42,9 @@ macro_rules! synchronized_poller_macro {
while let Some(msg) = rx.recv().await { while let Some(msg) = rx.recv().await {
match msg { match msg {
Request::Request(mid, msg, _req) => { Request::Request(mid, msg, _req) =>
{
#[allow(clippy::redundant_closure_call)]
($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone()) ($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone())
.await .await
.unwrap() .unwrap()
@ -57,6 +59,7 @@ macro_rules! synchronized_poller_macro {
stx.send(Event::Flushed).unwrap(); stx.send(Event::Flushed).unwrap();
} }
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {
#[allow(clippy::redundant_closure_call)]
let resp = ($st2)(bus.clone(), ut.clone()).await; let resp = ($st2)(bus.clone(), ut.clone()).await;
stx.send(Event::Synchronized(resp.map_err(Error::Other))) stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap(); .unwrap();

View File

@ -1,6 +1,6 @@
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use crate::synchronized_poller_macro; use crate::{receiver::UntypedPollerCallback, synchronized_poller_macro};
use futures::{executor::block_on, Future, Stream}; use futures::{executor::block_on, Future, Stream};
use super::SynchronizedConfig; use super::SynchronizedConfig;
@ -53,14 +53,7 @@ where
{ {
type Config = SynchronizedConfig; type Config = SynchronizedConfig;
fn build( fn build(_cfg: Self::Config) -> (Self, UntypedPollerCallback) {
_cfg: Self::Config,
) -> (
Self,
Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>,
) {
let (stx, srx) = mpsc::unbounded_channel(); let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();

View File

@ -1,18 +1,15 @@
use crate::{ use crate::{
error::Error, error::Error,
receiver::{ receiver::{
Action, AnyReceiver, AnyWrapperRef, PermitDrop, ReceiverTrait, SendUntypedReceiver, Action, AnyReceiver, AnyWrapperRef, BusPollerCallback, PermitDrop, ReceiverTrait,
TypeTagAccept, SendUntypedReceiver, TypeTagAccept,
}, },
stats::Stats, stats::Stats,
Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag, Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag,
}; };
use core::{ use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
pin::Pin,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
};
use dashmap::DashMap; use dashmap::DashMap;
use futures::{pin_mut, Future, StreamExt}; use futures::{pin_mut, StreamExt};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{oneshot, Notify}; use tokio::sync::{oneshot, Notify};
@ -159,7 +156,7 @@ where
Ok(self Ok(self
.waiters .waiters
.insert(listener) .insert(listener)
.ok_or_else(|| Error::AddListenerError)? as _) .ok_or(Error::AddListenerError)? as _)
} }
fn try_reserve(&self, tt: &TypeTag) -> Option<Permit> { fn try_reserve(&self, tt: &TypeTag) -> Option<Permit> {
@ -223,9 +220,7 @@ where
.map(|r| r.processing.fetch_add(1, Ordering::SeqCst)); .map(|r| r.processing.fetch_add(1, Ordering::SeqCst));
} }
fn start_polling( fn start_polling(self: Arc<Self>) -> BusPollerCallback {
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
Box::new(move |_| { Box::new(move |_| {
Box::pin(async move { Box::pin(async move {
let this = self.clone(); let this = self.clone();

View File

@ -45,9 +45,14 @@ impl AsyncHandler<Msg<i32>> for TmpReceiver {
} }
} }
pub type TestRelayRxChannelCell =
Mutex<Option<mpsc::UnboundedReceiver<Event<Box<dyn Message>, GenericError>>>>;
pub type TestRelayRxStream =
Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, error::GenericError>> + Send>>;
pub struct TestRelay { pub struct TestRelay {
stx: mpsc::UnboundedSender<Event<Box<dyn Message>, GenericError>>, stx: mpsc::UnboundedSender<Event<Box<dyn Message>, GenericError>>,
srx: Mutex<Option<mpsc::UnboundedReceiver<Event<Box<dyn Message>, GenericError>>>>, srx: TestRelayRxChannelCell,
} }
impl TypeTagAccept for TestRelay { impl TypeTagAccept for TestRelay {
@ -104,9 +109,7 @@ impl TypeTagAccept for TestRelay {
&Msg::<i32>::type_tag_(), &Msg::<i32>::type_tag_(),
&Msg::<i64>::type_tag_(), &Msg::<i64>::type_tag_(),
&Error::type_tag_(), &Error::type_tag_(),
) { ) {}
return;
}
} }
} }
@ -155,7 +158,7 @@ impl SendUntypedReceiver for TestRelay {
} }
impl ReciveUntypedReceiver for TestRelay { impl ReciveUntypedReceiver for TestRelay {
type Stream = Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, error::GenericError>> + Send>>; type Stream = TestRelayRxStream;
fn event_stream(&self) -> Self::Stream { fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap(); let mut rx = self.srx.lock().take().unwrap();

View File

@ -229,7 +229,7 @@ async fn test() {
.await .await
.unwrap(); .unwrap();
assert_eq!(we_res.0, 1633.0f64); assert!((we_res.0 - 1633.0f64).abs() < f64::EPSILON);
let boxed_res = b let boxed_res = b
.request_boxed(Box::new(MsgF64(1000.)), Default::default()) .request_boxed(Box::new(MsgF64(1000.)), Default::default())
@ -237,7 +237,8 @@ async fn test() {
.unwrap(); .unwrap();
let val = boxed_res.as_any_ref().downcast_ref::<MsgF64>().unwrap().0; let val = boxed_res.as_any_ref().downcast_ref::<MsgF64>().unwrap().0;
assert_eq!(val, 1633.0);
assert!((val - 1633.0f64).abs() < f64::EPSILON);
b.flush().await; b.flush().await;
b.close().await; b.close().await;

View File

@ -87,6 +87,6 @@ async fn test_shared() {
b.close().await; b.close().await;
poller.await; poller.await;
assert_eq!(ctx.sync1.load(Ordering::Relaxed), true); assert!(ctx.sync1.load(Ordering::Relaxed));
assert_eq!(ctx.sync2.load(Ordering::Relaxed), false); assert!(!ctx.sync2.load(Ordering::Relaxed));
} }