From bed35670c6de7e5e4e86961cc8377287cfc747e9 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Wed, 22 Sep 2021 17:07:21 +0400 Subject: [PATCH] refactor senderror --- crates/remote/src/relays/quic/client.rs | 14 ++++++++------ src/error.rs | 8 ++++++++ src/receiver.rs | 12 ++++++------ src/receivers/buffer_unordered/async.rs | 10 +++++----- src/receivers/buffer_unordered/sync.rs | 10 +++++----- src/receivers/buffer_unordered_batched/async.rs | 10 +++++----- src/receivers/buffer_unordered_batched/sync.rs | 10 +++++----- src/receivers/synchronize_batched/async.rs | 10 +++++----- src/receivers/synchronize_batched/sync.rs | 10 +++++----- src/receivers/synchronized/async.rs | 10 +++++----- src/receivers/synchronized/sync.rs | 10 +++++----- src/relay.rs | 4 ++-- tests/test_relay.rs | 4 ++-- 13 files changed, 66 insertions(+), 56 deletions(-) diff --git a/crates/remote/src/relays/quic/client.rs b/crates/remote/src/relays/quic/client.rs index 8372dce..474b143 100644 --- a/crates/remote/src/relays/quic/client.rs +++ b/crates/remote/src/relays/quic/client.rs @@ -305,7 +305,7 @@ impl TypeTagAccept for QuicClientRelay { } impl SendUntypedReceiver for QuicClientRelay { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { match msg { Action::Init => { let (sender, mut rx) = self.receiver_send.lock().take().unwrap(); @@ -335,11 +335,13 @@ impl SendUntypedReceiver for QuicClientRelay { msg: Box, req: bool, _bus: &Bus, - ) -> Result<(), SendError>> { - msg.as_shared_boxed() - self.sender.send((req, mid, msg).into()).unwrap(); - - Ok(()) + ) -> Result<(), messagebus::error::Error>> { + if let Ok(val) = msg.as_shared_boxed() { + self.sender.send((req, mid, msg).into()).unwrap(); + Ok(()) + } else { + Err(SendError:) + } } } diff --git a/src/error.rs b/src/error.rs index f0175e0..0e91e8e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -130,6 +130,14 @@ pub enum Error } impl Error { + pub fn send_closed(m: M) -> Self { + Error::SendError(SendError::Closed(m)) + } + + pub fn send_full(m: M) -> Self { + Error::SendError(SendError::Full(m)) + } + pub fn map_msg UM>(self, f: F) -> Error { match self { Error::SendError(inner) => Error::SendError(inner.map_msg(f)), diff --git a/src/receiver.rs b/src/receiver.rs index 265aaa2..a765cd9 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -3,7 +3,7 @@ use crate::stats::Stats; use crate::Untyped; use crate::{ envelop::{IntoBoxedMessage, TypeTag}, - error::{GenericError, SendError, StdSyncSendError}, + error::{GenericError, StdSyncSendError}, trait_object::TraitObject, Bus, Error, Message, Relay, }; @@ -31,20 +31,20 @@ impl sharded_slab::Config for SlabCfg { type Slab = sharded_slab::Slab; pub trait SendUntypedReceiver: Send + Sync { - fn send(&self, msg: Action, bus: &Bus) -> Result<(), SendError>; + fn send(&self, msg: Action, bus: &Bus) -> Result<(), Error>; fn send_msg( &self, _mid: u64, _msg: Box, _req: bool, _bus: &Bus, - ) -> Result<(), SendError>> { + ) -> Result<(), Error>> { unimplemented!() } } pub trait SendTypedReceiver: Sync { - fn send(&self, mid: u64, msg: M, req: bool, bus: &Bus) -> Result<(), SendError>; + fn send(&self, mid: u64, msg: M, req: bool, bus: &Bus) -> Result<(), Error>; } pub trait ReciveTypedReceiver: Sync @@ -371,7 +371,7 @@ where .map_err(|_| Error::MessageCastError)?; SendTypedReceiver::send(&self.inner, mid, *boxed, req, bus) - .map_err(|err| Error::from(err.into_boxed())) + .map_err(|err| err.map_msg(|m|m.into_boxed())) } fn stats(&self) -> Stats { @@ -389,7 +389,7 @@ where } fn send_action(&self, bus: &Bus, action: Action) -> Result<(), Error> { - Ok(SendUntypedReceiver::send(&self.inner, action, bus)?) + SendUntypedReceiver::send(&self.inner, action, bus) } fn set_need_flush(&self) { diff --git a/src/receivers/buffer_unordered/async.rs b/src/receivers/buffer_unordered/async.rs index cefac00..ff825d8 100644 --- a/src/receivers/buffer_unordered/async.rs +++ b/src/receivers/buffer_unordered/async.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ buffer_unordered_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{ Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, @@ -101,10 +101,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(m)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -116,14 +116,14 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => { self.stats.buffer.fetch_add(1, Ordering::Relaxed); Ok(()) } - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/buffer_unordered/sync.rs b/src/receivers/buffer_unordered/sync.rs index 4f089da..b8ab1cd 100644 --- a/src/receivers/buffer_unordered/sync.rs +++ b/src/receivers/buffer_unordered/sync.rs @@ -10,7 +10,7 @@ use super::{BufferUnorderedConfig, BufferUnorderedStats}; use crate::{ buffer_unordered_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{ Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, @@ -104,10 +104,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(msg)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -119,14 +119,14 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => { self.stats.buffer.fetch_add(1, Ordering::Relaxed); Ok(()) } - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/buffer_unordered_batched/async.rs b/src/receivers/buffer_unordered_batched/async.rs index 168230d..d0827df 100644 --- a/src/receivers/buffer_unordered_batched/async.rs +++ b/src/receivers/buffer_unordered_batched/async.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{ Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, @@ -102,10 +102,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(m)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -117,14 +117,14 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => { self.stats.buffer.fetch_add(1, Ordering::Relaxed); Ok(()) } - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/buffer_unordered_batched/sync.rs b/src/receivers/buffer_unordered_batched/sync.rs index a4fd363..32d43c7 100644 --- a/src/receivers/buffer_unordered_batched/sync.rs +++ b/src/receivers/buffer_unordered_batched/sync.rs @@ -10,7 +10,7 @@ use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats}; use crate::{ buffer_unordered_batch_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{ Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, @@ -108,10 +108,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(msg)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -123,14 +123,14 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => { self.stats.buffer.fetch_add(1, Ordering::Relaxed); Ok(()) } - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/synchronize_batched/async.rs b/src/receivers/synchronize_batched/async.rs index 53ebd5d..16616ca 100644 --- a/src/receivers/synchronize_batched/async.rs +++ b/src/receivers/synchronize_batched/async.rs @@ -4,7 +4,7 @@ use super::SynchronizedBatchedConfig; use crate::{ batch_synchronized_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{ Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, @@ -79,10 +79,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(m)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -94,10 +94,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/synchronize_batched/sync.rs b/src/receivers/synchronize_batched/sync.rs index 34b01de..51e897a 100644 --- a/src/receivers/synchronize_batched/sync.rs +++ b/src/receivers/synchronize_batched/sync.rs @@ -3,7 +3,7 @@ use std::{pin::Pin, sync::Arc}; use crate::{ batch_synchronized_poller_macro, builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{ Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, @@ -83,10 +83,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(msg)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -98,10 +98,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/synchronized/async.rs b/src/receivers/synchronized/async.rs index 3e22a8f..7fb578d 100644 --- a/src/receivers/synchronized/async.rs +++ b/src/receivers/synchronized/async.rs @@ -6,7 +6,7 @@ use futures::{Future, Stream}; use super::SynchronizedConfig; use crate::{ builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers::Request, AsyncSynchronizedHandler, Bus, Message, Untyped, @@ -79,10 +79,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(m)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -94,10 +94,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/receivers/synchronized/sync.rs b/src/receivers/synchronized/sync.rs index 18ec341..b749592 100644 --- a/src/receivers/synchronized/sync.rs +++ b/src/receivers/synchronized/sync.rs @@ -6,7 +6,7 @@ use futures::{executor::block_on, Future, Stream}; use super::SynchronizedConfig; use crate::{ builder::ReceiverSubscriberBuilder, - error::{Error, SendError, StdSyncSendError}, + error::{Error, StdSyncSendError}, receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver}, receivers::Request, Bus, Message, SynchronizedHandler, Untyped, @@ -80,10 +80,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Action(msg)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } @@ -95,10 +95,10 @@ where R: Message, E: StdSyncSendError, { - fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError> { + fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error> { match self.tx.send(Request::Request(mid, m, req)) { Ok(_) => Ok(()), - Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)), + Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), _ => unimplemented!(), } } diff --git a/src/relay.rs b/src/relay.rs index 7fbedf8..85edba4 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -118,7 +118,7 @@ where req: bool, bus: &Bus, ) -> Result<(), Error>> { - Ok(self.inner.send_msg(mid, boxed_msg, req, bus)?) + self.inner.send_msg(mid, boxed_msg, req, bus) } fn need_flush(&self) -> bool { @@ -134,7 +134,7 @@ where } fn send_action(&self, bus: &Bus, action: Action) -> Result<(), Error> { - Ok(SendUntypedReceiver::send(&self.inner, action, bus)?) + SendUntypedReceiver::send(&self.inner, action, bus) } fn close_notify(&self) -> &Notify { diff --git a/tests/test_relay.rs b/tests/test_relay.rs index a8f1f4b..ad819b9 100644 --- a/tests/test_relay.rs +++ b/tests/test_relay.rs @@ -114,7 +114,7 @@ impl TypeTagAccept for TestRelay { } impl SendUntypedReceiver for TestRelay { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), error::SendError> { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), error::Error> { match msg { Action::Init => { self.stx.send(Event::Ready).unwrap(); @@ -141,7 +141,7 @@ impl SendUntypedReceiver for TestRelay { msg: Box, _req: bool, _bus: &Bus, - ) -> Result<(), error::SendError>> { + ) -> Result<(), error::Error>> { println!("TestRelay::send_msg [{}] {:?}", mid, msg); if msg.type_tag().as_ref() == Msg::::type_tag_().as_ref() { self.stx