diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index c7efe25..772c6f4 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -141,15 +141,11 @@ struct TypeTag { impl Parse for TypeTag { fn parse(input: ParseStream) -> Result { - let mut inner = None; let content; parenthesized!(content in input); let punctuated = Punctuated::::parse_terminated(&content)?; - for pair in punctuated.pairs() { - inner = Some(pair.into_value()); - break; - } + let inner = punctuated.pairs().map(|x| x.into_value()).next(); Ok(TypeTag { inner: inner.unwrap().to_owned(), diff --git a/src/builder.rs b/src/builder.rs index 5d6740f..553fc94 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -9,12 +9,21 @@ use tokio::sync::Mutex; use crate::{ envelop::TypeTag, error::{Error, StdSyncSendError}, - receiver::{Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + BusPollerCallback, Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, IntoBoxedMessage, Message, Relay, SynchronizedHandler, Untyped, }; +type MessageDeserializerCallback = Box< + dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result, Error> + + Send + + Sync, +>; + pub trait ReceiverSubscriberBuilder: SendUntypedReceiver + SendTypedReceiver + ReciveTypedReceiver where @@ -25,14 +34,7 @@ where { type Config: Default; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) where Self: Sized; } @@ -47,14 +49,14 @@ pub struct RegisterEntry { builder: F, poller: P, receivers: HashMap, - pollers: Vec Pin + Send>>>>, + pollers: Vec, _m: PhantomData<(K, T)>, } impl RegisterEntry where F: FnMut(&mut B, TypeTag, Receiver), - P: FnMut(&mut B, Box Pin + Send>>>), + P: FnMut(&mut B, BusPollerCallback), { pub fn done(mut self) -> B { for (tid, v) in self.receivers { @@ -209,11 +211,7 @@ impl RegisterEntry { } pub struct MessageTypeDescriptor { - de: Box< - dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result, Error> - + Send - + Sync, - >, + de: MessageDeserializerCallback, } impl MessageTypeDescriptor { @@ -226,10 +224,11 @@ impl MessageTypeDescriptor { } } +#[derive(Default)] pub struct Module { message_types: HashMap, receivers: HashMap>, - pollings: Vec Pin + Send>>>>, + pollings: Vec, } impl Module { diff --git a/src/envelop.rs b/src/envelop.rs index 6408b2e..cc6c7da 100644 --- a/src/envelop.rs +++ b/src/envelop.rs @@ -136,11 +136,11 @@ impl Message for () { return false; }; - into.replace(self.clone()); + into.replace(()); true } fn try_clone_boxed(&self) -> Option> { - Some(Box::new(self.clone())) + Some(Box::new(())) } } diff --git a/src/lib.rs b/src/lib.rs index ccdf763..11a3a1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -95,7 +95,7 @@ impl Bus { } pub(crate) fn init(&self) { - for (_, rs) in &self.inner.receivers { + for rs in self.inner.receivers.values() { for r in rs { r.init(self).unwrap(); } @@ -103,7 +103,7 @@ impl Bus { } pub async fn ready(&self) { - for (_, rs) in &self.inner.receivers { + for rs in self.inner.receivers.values() { for r in rs { r.ready().await; } @@ -114,7 +114,7 @@ impl Bus { let _handle = self.inner.maintain.lock().await; self.inner.closed.store(true, Ordering::SeqCst); - for (_, rs) in &self.inner.receivers { + for rs in self.inner.receivers.values() { for r in rs { let err = tokio::time::timeout(Duration::from_secs(20), r.close(self)).await; @@ -133,7 +133,7 @@ impl Bus { for _ in 0..fuse_count { iters += 1; let mut flushed = false; - for (_, rs) in &self.inner.receivers { + for rs in self.inner.receivers.values() { for r in rs { if r.need_flush() { flushed = true; @@ -163,7 +163,7 @@ impl Bus { self.flush().await; let _handle = self.inner.maintain.lock().await; - for (_, rs) in &self.inner.receivers { + for rs in self.inner.receivers.values() { for r in rs { r.sync(self).await; } @@ -598,7 +598,7 @@ impl Bus { .inner .message_types .get(&tt) - .ok_or_else(|| Error::TypeTagNotRegistered(tt))?; + .ok_or(Error::TypeTagNotRegistered(tt))?; md.deserialize_boxed(de) .map_err(|err| err.specify::>()) diff --git a/src/receiver.rs b/src/receiver.rs index 8d5db58..1ee1b6e 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,5 +1,6 @@ use crate::relay::RelayWrapper; use crate::stats::Stats; +use crate::Untyped; use crate::{ envelop::{IntoBoxedMessage, TypeTag}, error::{GenericError, SendError, StdSyncSendError}, @@ -19,6 +20,9 @@ use futures::{Future, FutureExt, StreamExt}; use std::{borrow::Cow, sync::Arc}; use tokio::sync::{oneshot, Notify}; +pub type BusPollerCallback = Box Pin + Send>>>; +pub type UntypedPollerCallback = Box BusPollerCallback>; + struct SlabCfg; impl sharded_slab::Config for SlabCfg { const RESERVED_BITS: usize = 1; @@ -74,9 +78,7 @@ pub trait WrapperErrorTypeOnly: Send + Sync { } pub trait WrapperReturnTypeAndError: Send + Sync { - fn start_polling_events( - self: Arc, - ) -> Box Pin + Send>>>; + fn start_polling_events(self: Arc) -> BusPollerCallback; fn add_response_listener( &self, listener: oneshot::Sender>>, @@ -123,9 +125,7 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync { fn reserve_notify(&self, tt: &TypeTag) -> Arc; fn increment_processing(&self, tt: &TypeTag); - fn start_polling( - self: Arc, - ) -> Box Pin + Send>>>; + fn start_polling(self: Arc) -> BusPollerCallback; } pub trait ReceiverPollerBuilder { @@ -181,9 +181,7 @@ where E: StdSyncSendError, S: SendUntypedReceiver + ReciveTypedReceiver + Send + Sync + 'static, { - fn start_polling_events( - self: Arc, - ) -> Box Pin + Send>>> { + fn start_polling_events(self: Arc) -> BusPollerCallback { Box::new(move |_| { Box::pin(async move { let this = self.clone(); @@ -253,7 +251,7 @@ where Ok(self .waiters .insert(Waiter::WithErrorType(listener)) - .ok_or_else(|| Error::AddListenerError)? as _) + .ok_or(Error::AddListenerError)? as _) } fn response(&self, mid: u64, resp: Result>) -> Result<(), Error> { @@ -291,7 +289,7 @@ where Ok(self .waiters .insert(Waiter::WithoutErrorType(listener)) - .ok_or_else(|| Error::AddListenerError)? as _) + .ok_or(Error::AddListenerError)? as _) } } @@ -309,7 +307,7 @@ where Ok(self .waiters .insert(Waiter::BoxedWithError(listener)) - .ok_or_else(|| Error::AddListenerError)? as _) + .ok_or(Error::AddListenerError)? as _) } } @@ -372,8 +370,8 @@ where .downcast::() .map_err(|_| Error::MessageCastError)?; - Ok(SendTypedReceiver::send(&self.inner, mid, *boxed, req, bus) - .map_err(|err| Error::from(err.into_boxed()))?) + SendTypedReceiver::send(&self.inner, mid, *boxed, req, bus) + .map_err(|err| Error::from(err.into_boxed())) } fn stats(&self) -> Stats { @@ -417,7 +415,7 @@ where Ok(self .waiters .insert(Waiter::Boxed(listener)) - .ok_or_else(|| Error::AddListenerError)? as _) + .ok_or(Error::AddListenerError)? as _) } fn ready_notify(&self) -> &Notify { @@ -465,9 +463,7 @@ where self.context.response.clone() } - fn start_polling( - self: Arc, - ) -> Box Pin + Send>>> { + fn start_polling(self: Arc) -> BusPollerCallback { self.start_polling_events() } @@ -840,9 +836,7 @@ impl Receiver { } #[inline] - pub fn start_polling( - &self, - ) -> Box Pin + Send>>> { + pub fn start_polling(&self) -> BusPollerCallback { self.inner.clone().start_polling() } diff --git a/src/receivers/buffer_unordered/async.rs b/src/receivers/buffer_unordered/async.rs index 1e55b50..0c66ef5 100644 --- a/src/receivers/buffer_unordered/async.rs +++ b/src/receivers/buffer_unordered/async.rs @@ -10,7 +10,10 @@ use crate::{ buffer_unordered_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers::Request, AsyncHandler, Bus, Message, Untyped, }; @@ -56,14 +59,7 @@ where { type Config = BufferUnorderedConfig; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) { let stats = Arc::new(BufferUnorderedStats { buffer: AtomicU64::new(0), buffer_total: AtomicU64::new(cfg.buffer_size as _), diff --git a/src/receivers/buffer_unordered/mod.rs b/src/receivers/buffer_unordered/mod.rs index 2fc0f77..9b35051 100644 --- a/src/receivers/buffer_unordered/mod.rs +++ b/src/receivers/buffer_unordered/mod.rs @@ -52,6 +52,7 @@ macro_rules! buffer_unordered_poller_macro { while let Some(msg) = rx.recv().await { match msg { Request::Request(mid, msg, _req) => { + #[allow(clippy::redundant_closure_call)] let _ = ($st1)( mid, msg, @@ -74,6 +75,8 @@ macro_rules! buffer_unordered_poller_macro { Request::Action(Action::Sync) => { let lock = semaphore.acquire_many(cfg.max_parallel as _).await; + + #[allow(clippy::redundant_closure_call)] let resp = ($st2)(bus.clone(), ut.clone()).await; drop(lock); diff --git a/src/receivers/buffer_unordered/sync.rs b/src/receivers/buffer_unordered/sync.rs index fbce53c..03bef8c 100644 --- a/src/receivers/buffer_unordered/sync.rs +++ b/src/receivers/buffer_unordered/sync.rs @@ -11,7 +11,10 @@ use crate::{ buffer_unordered_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers::Request, Bus, Handler, Message, Untyped, }; @@ -59,14 +62,7 @@ where { type Config = BufferUnorderedConfig; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) { let stats = Arc::new(BufferUnorderedStats { buffer: AtomicU64::new(0), buffer_total: AtomicU64::new(cfg.buffer_size as _), diff --git a/src/receivers/buffer_unordered_batched/async.rs b/src/receivers/buffer_unordered_batched/async.rs index 742eb71..b4c6dd8 100644 --- a/src/receivers/buffer_unordered_batched/async.rs +++ b/src/receivers/buffer_unordered_batched/async.rs @@ -10,7 +10,10 @@ use crate::{ buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers::Request, AsyncBatchHandler, Bus, Message, Untyped, }; @@ -55,14 +58,7 @@ where { type Config = BufferUnorderedBatchedConfig; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) { let stats = Arc::new(BufferUnorderedBatchedStats { buffer: AtomicU64::new(0), buffer_total: AtomicU64::new(cfg.buffer_size as _), diff --git a/src/receivers/buffer_unordered_batched/mod.rs b/src/receivers/buffer_unordered_batched/mod.rs index 3c2e76e..bbe24f9 100644 --- a/src/receivers/buffer_unordered_batched/mod.rs +++ b/src/receivers/buffer_unordered_batched/mod.rs @@ -74,6 +74,7 @@ macro_rules! buffer_unordered_batch_poller_macro { let buffer_mid_clone = buffer_mid.drain(..).collect::>(); let buffer_clone = buffer.drain(..).collect(); + #[allow(clippy::redundant_closure_call)] let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx); } @@ -92,6 +93,7 @@ macro_rules! buffer_unordered_batch_poller_macro { let buffer_clone = buffer.drain(..).collect(); let task_permit = semaphore.clone().acquire_owned().await; + #[allow(clippy::redundant_closure_call)] let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx); } @@ -102,6 +104,8 @@ macro_rules! buffer_unordered_batch_poller_macro { Request::Action(Action::Sync) => { let lock = semaphore.acquire_many(cfg.max_parallel as _).await; + + #[allow(clippy::redundant_closure_call)] let resp = ($st2)(bus.clone(), ut.clone()).await; drop(lock); diff --git a/src/receivers/buffer_unordered_batched/sync.rs b/src/receivers/buffer_unordered_batched/sync.rs index 0fdd262..3b62f26 100644 --- a/src/receivers/buffer_unordered_batched/sync.rs +++ b/src/receivers/buffer_unordered_batched/sync.rs @@ -11,7 +11,10 @@ use crate::{ buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers::Request, BatchHandler, Bus, Message, Untyped, }; @@ -61,14 +64,7 @@ where { type Config = BufferUnorderedBatchedConfig; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) { let stats = Arc::new(BufferUnorderedBatchedStats { buffer: AtomicU64::new(0), buffer_total: AtomicU64::new(cfg.buffer_size as _), diff --git a/src/receivers/synchronize_batched/async.rs b/src/receivers/synchronize_batched/async.rs index 520a00f..8e9c053 100644 --- a/src/receivers/synchronize_batched/async.rs +++ b/src/receivers/synchronize_batched/async.rs @@ -5,7 +5,10 @@ use crate::{ batch_synchronized_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers::Request, AsyncBatchSynchronizedHandler, Bus, Message, Untyped, }; @@ -49,14 +52,7 @@ where { type Config = SynchronizedBatchedConfig; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) { let (stx, srx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel(); diff --git a/src/receivers/synchronize_batched/mod.rs b/src/receivers/synchronize_batched/mod.rs index d84fc58..7ce55b8 100644 --- a/src/receivers/synchronize_batched/mod.rs +++ b/src/receivers/synchronize_batched/mod.rs @@ -67,6 +67,7 @@ macro_rules! batch_synchronized_poller_macro { let buffer_mid_clone = buffer_mid.drain(..).collect::>(); let buffer_clone = buffer.drain(..).collect(); + #[allow(clippy::redundant_closure_call)] let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx); } } @@ -83,6 +84,7 @@ macro_rules! batch_synchronized_poller_macro { let buffer_mid_clone = buffer_mid.drain(..).collect::>(); let buffer_clone = buffer.drain(..).collect(); + #[allow(clippy::redundant_closure_call)] let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx); } @@ -90,6 +92,7 @@ macro_rules! batch_synchronized_poller_macro { } Request::Action(Action::Sync) => { + #[allow(clippy::redundant_closure_call)] let resp = ($st2)(bus.clone(), ut.clone()).await; stx.send(Event::Synchronized(resp.map_err(Error::Other))) .unwrap(); diff --git a/src/receivers/synchronize_batched/sync.rs b/src/receivers/synchronize_batched/sync.rs index 8bcce54..9042268 100644 --- a/src/receivers/synchronize_batched/sync.rs +++ b/src/receivers/synchronize_batched/sync.rs @@ -4,7 +4,10 @@ use crate::{ batch_synchronized_poller_macro, builder::ReceiverSubscriberBuilder, error::{Error, SendError, StdSyncSendError}, - receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, + receiver::{ + Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, + UntypedPollerCallback, + }, receivers::Request, BatchSynchronizedHandler, Bus, Message, Untyped, }; @@ -53,14 +56,7 @@ where { type Config = SynchronizedBatchedConfig; - fn build( - cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(cfg: Self::Config) -> (Self, UntypedPollerCallback) { let (stx, srx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel(); diff --git a/src/receivers/synchronized/async.rs b/src/receivers/synchronized/async.rs index 65c5d69..cb86ea3 100644 --- a/src/receivers/synchronized/async.rs +++ b/src/receivers/synchronized/async.rs @@ -1,6 +1,6 @@ use std::{pin::Pin, sync::Arc}; -use crate::synchronized_poller_macro; +use crate::{receiver::UntypedPollerCallback, synchronized_poller_macro}; use futures::{Future, Stream}; use super::SynchronizedConfig; @@ -52,14 +52,7 @@ where { type Config = SynchronizedConfig; - fn build( - _cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(_cfg: Self::Config) -> (Self, UntypedPollerCallback) { let (stx, srx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel(); diff --git a/src/receivers/synchronized/mod.rs b/src/receivers/synchronized/mod.rs index a098530..f2ec28a 100644 --- a/src/receivers/synchronized/mod.rs +++ b/src/receivers/synchronized/mod.rs @@ -42,7 +42,9 @@ macro_rules! synchronized_poller_macro { while let Some(msg) = rx.recv().await { match msg { - Request::Request(mid, msg, _req) => { + Request::Request(mid, msg, _req) => + { + #[allow(clippy::redundant_closure_call)] ($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone()) .await .unwrap() @@ -57,6 +59,7 @@ macro_rules! synchronized_poller_macro { stx.send(Event::Flushed).unwrap(); } Request::Action(Action::Sync) => { + #[allow(clippy::redundant_closure_call)] let resp = ($st2)(bus.clone(), ut.clone()).await; stx.send(Event::Synchronized(resp.map_err(Error::Other))) .unwrap(); diff --git a/src/receivers/synchronized/sync.rs b/src/receivers/synchronized/sync.rs index e59b340..b44e788 100644 --- a/src/receivers/synchronized/sync.rs +++ b/src/receivers/synchronized/sync.rs @@ -1,6 +1,6 @@ use std::{pin::Pin, sync::Arc}; -use crate::synchronized_poller_macro; +use crate::{receiver::UntypedPollerCallback, synchronized_poller_macro}; use futures::{executor::block_on, Future, Stream}; use super::SynchronizedConfig; @@ -53,14 +53,7 @@ where { type Config = SynchronizedConfig; - fn build( - _cfg: Self::Config, - ) -> ( - Self, - Box< - dyn FnOnce(Untyped) -> Box Pin + Send>>>, - >, - ) { + fn build(_cfg: Self::Config) -> (Self, UntypedPollerCallback) { let (stx, srx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel(); diff --git a/src/relay.rs b/src/relay.rs index b102251..d4a6aba 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -1,18 +1,15 @@ use crate::{ error::Error, receiver::{ - Action, AnyReceiver, AnyWrapperRef, PermitDrop, ReceiverTrait, SendUntypedReceiver, - TypeTagAccept, + Action, AnyReceiver, AnyWrapperRef, BusPollerCallback, PermitDrop, ReceiverTrait, + SendUntypedReceiver, TypeTagAccept, }, stats::Stats, Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag, }; -use core::{ - pin::Pin, - sync::atomic::{AtomicBool, AtomicU64, Ordering}, -}; +use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use dashmap::DashMap; -use futures::{pin_mut, Future, StreamExt}; +use futures::{pin_mut, StreamExt}; use std::sync::Arc; use tokio::sync::{oneshot, Notify}; @@ -159,7 +156,7 @@ where Ok(self .waiters .insert(listener) - .ok_or_else(|| Error::AddListenerError)? as _) + .ok_or(Error::AddListenerError)? as _) } fn try_reserve(&self, tt: &TypeTag) -> Option { @@ -223,9 +220,7 @@ where .map(|r| r.processing.fetch_add(1, Ordering::SeqCst)); } - fn start_polling( - self: Arc, - ) -> Box Pin + Send>>> { + fn start_polling(self: Arc) -> BusPollerCallback { Box::new(move |_| { Box::pin(async move { let this = self.clone(); diff --git a/tests/test_relay.rs b/tests/test_relay.rs index 17c92d6..c3bf667 100644 --- a/tests/test_relay.rs +++ b/tests/test_relay.rs @@ -45,9 +45,14 @@ impl AsyncHandler> for TmpReceiver { } } +pub type TestRelayRxChannelCell = + Mutex, GenericError>>>>; +pub type TestRelayRxStream = + Pin, error::GenericError>> + Send>>; + pub struct TestRelay { stx: mpsc::UnboundedSender, GenericError>>, - srx: Mutex, GenericError>>>>, + srx: TestRelayRxChannelCell, } impl TypeTagAccept for TestRelay { @@ -104,9 +109,7 @@ impl TypeTagAccept for TestRelay { &Msg::::type_tag_(), &Msg::::type_tag_(), &Error::type_tag_(), - ) { - return; - } + ) {} } } @@ -155,7 +158,7 @@ impl SendUntypedReceiver for TestRelay { } impl ReciveUntypedReceiver for TestRelay { - type Stream = Pin, error::GenericError>> + Send>>; + type Stream = TestRelayRxStream; fn event_stream(&self) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); diff --git a/tests/test_req_resp.rs b/tests/test_req_resp.rs index 538b7f3..f916e3c 100644 --- a/tests/test_req_resp.rs +++ b/tests/test_req_resp.rs @@ -229,7 +229,7 @@ async fn test() { .await .unwrap(); - assert_eq!(we_res.0, 1633.0f64); + assert!((we_res.0 - 1633.0f64).abs() < f64::EPSILON); let boxed_res = b .request_boxed(Box::new(MsgF64(1000.)), Default::default()) @@ -237,7 +237,8 @@ async fn test() { .unwrap(); let val = boxed_res.as_any_ref().downcast_ref::().unwrap().0; - assert_eq!(val, 1633.0); + + assert!((val - 1633.0f64).abs() < f64::EPSILON); b.flush().await; b.close().await; diff --git a/tests/test_shared.rs b/tests/test_shared.rs index 773e011..c4808ac 100644 --- a/tests/test_shared.rs +++ b/tests/test_shared.rs @@ -87,6 +87,6 @@ async fn test_shared() { b.close().await; poller.await; - assert_eq!(ctx.sync1.load(Ordering::Relaxed), true); - assert_eq!(ctx.sync2.load(Ordering::Relaxed), false); + assert!(ctx.sync1.load(Ordering::Relaxed)); + assert!(!ctx.sync2.load(Ordering::Relaxed)); }