refactor senderror

This commit is contained in:
Andrey Tkachenko 2021-09-22 17:07:21 +04:00
parent 5ba6f1139d
commit bed35670c6
13 changed files with 66 additions and 56 deletions

View File

@ -305,7 +305,7 @@ impl TypeTagAccept for QuicClientRelay {
}
impl SendUntypedReceiver for QuicClientRelay {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
match msg {
Action::Init => {
let (sender, mut rx) = self.receiver_send.lock().take().unwrap();
@ -335,11 +335,13 @@ impl SendUntypedReceiver for QuicClientRelay {
msg: Box<dyn Message>,
req: bool,
_bus: &Bus,
) -> Result<(), SendError<Box<dyn Message>>> {
msg.as_shared_boxed()
self.sender.send((req, mid, msg).into()).unwrap();
Ok(())
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
if let Ok(val) = msg.as_shared_boxed() {
self.sender.send((req, mid, msg).into()).unwrap();
Ok(())
} else {
Err(SendError:)
}
}
}

View File

@ -130,6 +130,14 @@ pub enum Error<M: fmt::Debug + 'static = (), E: StdSyncSendError = GenericError>
}
impl<M: fmt::Debug + 'static, E: StdSyncSendError> Error<M, E> {
pub fn send_closed(m: M) -> Self {
Error::SendError(SendError::Closed(m))
}
pub fn send_full(m: M) -> Self {
Error::SendError(SendError::Full(m))
}
pub fn map_msg<UM: fmt::Debug + 'static, F: FnOnce(M) -> UM>(self, f: F) -> Error<UM, E> {
match self {
Error::SendError(inner) => Error::SendError(inner.map_msg(f)),

View File

@ -3,7 +3,7 @@ use crate::stats::Stats;
use crate::Untyped;
use crate::{
envelop::{IntoBoxedMessage, TypeTag},
error::{GenericError, SendError, StdSyncSendError},
error::{GenericError, StdSyncSendError},
trait_object::TraitObject,
Bus, Error, Message, Relay,
};
@ -31,20 +31,20 @@ impl sharded_slab::Config for SlabCfg {
type Slab<T> = sharded_slab::Slab<T, SlabCfg>;
pub trait SendUntypedReceiver: Send + Sync {
fn send(&self, msg: Action, bus: &Bus) -> Result<(), SendError<Action>>;
fn send(&self, msg: Action, bus: &Bus) -> Result<(), Error<Action>>;
fn send_msg(
&self,
_mid: u64,
_msg: Box<dyn Message>,
_req: bool,
_bus: &Bus,
) -> Result<(), SendError<Box<dyn Message>>> {
) -> Result<(), Error<Box<dyn Message>>> {
unimplemented!()
}
}
pub trait SendTypedReceiver<M: Message>: Sync {
fn send(&self, mid: u64, msg: M, req: bool, bus: &Bus) -> Result<(), SendError<M>>;
fn send(&self, mid: u64, msg: M, req: bool, bus: &Bus) -> Result<(), Error<M>>;
}
pub trait ReciveTypedReceiver<M, E>: Sync
@ -371,7 +371,7 @@ where
.map_err(|_| Error::MessageCastError)?;
SendTypedReceiver::send(&self.inner, mid, *boxed, req, bus)
.map_err(|err| Error::from(err.into_boxed()))
.map_err(|err| err.map_msg(|m|m.into_boxed()))
}
fn stats(&self) -> Stats {
@ -389,7 +389,7 @@ where
}
fn send_action(&self, bus: &Bus, action: Action) -> Result<(), Error<Action>> {
Ok(SendUntypedReceiver::send(&self.inner, action, bus)?)
SendUntypedReceiver::send(&self.inner, action, bus)
}
fn set_need_flush(&self) {

View File

@ -9,7 +9,7 @@ use std::{
use crate::{
buffer_unordered_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
@ -101,10 +101,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -116,14 +116,14 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -10,7 +10,7 @@ use super::{BufferUnorderedConfig, BufferUnorderedStats};
use crate::{
buffer_unordered_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
@ -104,10 +104,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(msg)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -119,14 +119,14 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -9,7 +9,7 @@ use std::{
use crate::{
buffer_unordered_batch_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
@ -102,10 +102,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -117,14 +117,14 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -10,7 +10,7 @@ use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use crate::{
buffer_unordered_batch_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
@ -108,10 +108,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(msg)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -123,14 +123,14 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => {
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -4,7 +4,7 @@ use super::SynchronizedBatchedConfig;
use crate::{
batch_synchronized_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
@ -79,10 +79,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -94,10 +94,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -3,7 +3,7 @@ use std::{pin::Pin, sync::Arc};
use crate::{
batch_synchronized_poller_macro,
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{
Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
@ -83,10 +83,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(msg)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -98,10 +98,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -6,7 +6,7 @@ use futures::{Future, Stream};
use super::SynchronizedConfig;
use crate::{
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::Request,
AsyncSynchronizedHandler, Bus, Message, Untyped,
@ -79,10 +79,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, m: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -94,10 +94,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -6,7 +6,7 @@ use futures::{executor::block_on, Future, Stream};
use super::SynchronizedConfig;
use crate::{
builder::ReceiverSubscriberBuilder,
error::{Error, SendError, StdSyncSendError},
error::{Error, StdSyncSendError},
receiver::{Action, Event, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver},
receivers::Request,
Bus, Message, SynchronizedHandler, Untyped,
@ -80,10 +80,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(msg)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
@ -95,10 +95,10 @@ where
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), SendError<M>> {
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(SendError::Closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}

View File

@ -118,7 +118,7 @@ where
req: bool,
bus: &Bus,
) -> Result<(), Error<Box<dyn Message>>> {
Ok(self.inner.send_msg(mid, boxed_msg, req, bus)?)
self.inner.send_msg(mid, boxed_msg, req, bus)
}
fn need_flush(&self) -> bool {
@ -134,7 +134,7 @@ where
}
fn send_action(&self, bus: &Bus, action: Action) -> Result<(), Error<Action>> {
Ok(SendUntypedReceiver::send(&self.inner, action, bus)?)
SendUntypedReceiver::send(&self.inner, action, bus)
}
fn close_notify(&self) -> &Notify {

View File

@ -114,7 +114,7 @@ impl TypeTagAccept for TestRelay {
}
impl SendUntypedReceiver for TestRelay {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), error::SendError<Action>> {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), error::Error<Action>> {
match msg {
Action::Init => {
self.stx.send(Event::Ready).unwrap();
@ -141,7 +141,7 @@ impl SendUntypedReceiver for TestRelay {
msg: Box<dyn Message>,
_req: bool,
_bus: &Bus,
) -> Result<(), error::SendError<Box<dyn Message>>> {
) -> Result<(), error::Error<Box<dyn Message>>> {
println!("TestRelay::send_msg [{}] {:?}", mid, msg);
if msg.type_tag().as_ref() == Msg::<i16>::type_tag_().as_ref() {
self.stx