Add send_one, send_one_blocked, try_send_one

This commit is contained in:
Andrey Tkachenko 2021-06-29 13:25:03 +04:00
parent 48756264b8
commit 908059d8c9
4 changed files with 56 additions and 110 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "messagebus" name = "messagebus"
version = "0.6.4" version = "0.6.5"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"] authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git" repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"] keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -1,102 +1,6 @@
use core::any::{self, Any}; use core::any::Any;
use core::fmt; use core::fmt;
// use erased_serde::{Deserializer, Serialize}; // use erased_serde::{Deserializer, Serialize};
pub trait Message: Any + fmt::Debug/*Serialize + for<'a> Deserializer<'a> + */ + Unpin + Clone + Send + Sync + 'static {} pub trait Message: Any + fmt::Debug + Unpin + Send + Sync + 'static {}
impl<T: Any + fmt::Debug + Unpin + Clone + Send + Sync> Message for T {} impl<T: Any + fmt::Debug + Unpin + Send + Sync> Message for T {}
trait SafeMessage: Any + fmt::Debug/*+ Serialize + for<'a> Deserializer<'a>*/ + Unpin + Send + Sync + 'static {
fn type_name(&self) -> &'static str;
fn clone_boxed(&self) -> Box<dyn SafeMessage>;
}
impl<T: Message> SafeMessage for T {
fn type_name(&self) -> &'static str {
any::type_name::<T>()
}
fn clone_boxed(&self) -> Box<dyn SafeMessage> {
Box::new(self.clone())
}
}
// pub struct BoxedEnvelop {
// inner: Box<dyn SafeMessage>,
// }
// impl BoxedEnvelop {
// pub fn from_message<M: Message>(m: M) -> Self {
// Self {
// inner: Box::new(m)
// }
// }
// pub fn as_ref(&self) -> Envelop<'_> {
// Envelop { inner: &*self.inner }
// }
// pub fn downcast<T: 'static>(self) -> Option<Box<T>> {
// if (*self.inner).type_id() == TypeId::of::<T>() {
// unsafe {
// let raw: *mut dyn SafeMessage = Box::into_raw(self.inner);
// Some(Box::from_raw(raw as *mut T))
// }
// } else {
// None
// }
// }
// }
#[derive(Copy, Clone)]
pub struct Envelop<'inner> {
inner: &'inner dyn SafeMessage,
}
impl<'inner> fmt::Debug for Envelop<'inner> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Envelop(")?;
self.inner.fmt(f)?;
write!(f, ")")?;
Ok(())
}
}
impl<'inner> Envelop<'inner> {
// pub fn new<T: Message>(inner: &'inner T) -> Self {
// Self { inner }
// }
// #[inline]
// pub fn downcast_to<T: 'static>(&self) -> Option<&T> {
// if self.inner.type_id() == TypeId::of::<T>() {
// unsafe { Some(&*(self.inner as *const dyn SafeMessage as *const T)) }
// } else {
// None
// }
// }
// #[inline]
// pub fn type_id(&self) -> TypeId {
// self.inner.type_id()
// }
// #[inline]
// pub fn type_name(&self) -> &'static str {
// self.inner.type_name()
// }
// #[inline]
// pub fn clone_boxed(&self) -> BoxedEnvelop {
// BoxedEnvelop {
// inner: self.inner.clone_boxed(),
// }
// }
}
// impl<'inner> serde::Serialize for Envelop<'inner> {
// fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
// erased_serde::serialize(self.inner, serializer)
// }
// }

View File

@ -134,11 +134,11 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn try_send<M: Message>(&self, msg: M) -> core::result::Result<(), Error<M>> { pub fn try_send<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.try_send_ext(msg, SendOptions::Broadcast) self.try_send_ext(msg, SendOptions::Broadcast)
} }
pub fn try_send_ext<M: Message>( pub fn try_send_ext<M: Message + Clone>(
&self, &self,
msg: M, msg: M,
_options: SendOptions, _options: SendOptions,
@ -183,12 +183,12 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn send_blocking<M: Message>(&self, msg: M) -> core::result::Result<(), Error<M>> { pub fn send_blocking<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.send_blocking_ext(msg, SendOptions::Broadcast) self.send_blocking_ext(msg, SendOptions::Broadcast)
} }
#[inline] #[inline]
pub fn send_blocking_ext<M: Message>( pub fn send_blocking_ext<M: Message + Clone>(
&self, &self,
msg: M, msg: M,
options: SendOptions, options: SendOptions,
@ -197,11 +197,11 @@ impl BusInner {
} }
#[inline] #[inline]
pub async fn send<M: Message>(&self, msg: M) -> core::result::Result<(), Error<M>> { pub async fn send<M: Message + Clone>(&self, msg: M) -> core::result::Result<(), Error<M>> {
Ok(self.send_ext(msg, SendOptions::Broadcast).await?) Ok(self.send_ext(msg, SendOptions::Broadcast).await?)
} }
pub async fn send_ext<M: Message>( pub async fn send_ext<M: Message + Clone>(
&self, &self,
msg: M, msg: M,
_options: SendOptions, _options: SendOptions,
@ -234,11 +234,11 @@ impl BusInner {
} }
#[inline] #[inline]
pub fn force_send<M: Message>(&self, msg: M) -> core::result::Result<(), Error<M>> { pub fn force_send<M: Message + Clone>(&self, msg: M) -> Result<(), Error<M>> {
self.force_send_ext(msg, SendOptions::Broadcast) self.force_send_ext(msg, SendOptions::Broadcast)
} }
pub fn force_send_ext<M: Message>( pub fn force_send_ext<M: Message + Clone>(
&self, &self,
msg: M, msg: M,
_options: SendOptions, _options: SendOptions,
@ -270,6 +270,48 @@ impl BusInner {
Ok(()) Ok(())
} }
#[inline]
pub fn try_send_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs|rs.first()) {
let permits = if let Some(x) = rs.try_reserve() {
x
} else {
return Err(SendError::Full(msg).into());
};
Ok(rs.send(mid, permits, msg)?)
} else {
Err(Error::NoReceivers)
}
}
pub async fn send_one<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
if self.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(msg).into());
}
let mid = ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let tid = TypeId::of::<M>();
if let Some(rs) = self.receivers.get(&tid).and_then(|rs|rs.first()) {
Ok(rs.send(mid, rs.reserve().await, msg)?)
} else {
Err(Error::NoReceivers)
}
}
#[inline]
pub fn send_one_blocking<M: Message>(&self, msg: M) -> Result<(), Error<M>> {
futures::executor::block_on(self.send_one(msg))
}
pub async fn request<M: Message, R: Message>( pub async fn request<M: Message, R: Message>(
&self, &self,
req: M, req: M,

View File

@ -390,7 +390,7 @@ impl Receiver {
} }
#[inline] #[inline]
pub fn force_send<M: Message>(&self, mid: u64, msg: M) -> Result<(), SendError<M>> { pub fn force_send<M: Message + Clone>(&self, mid: u64, msg: M) -> Result<(), SendError<M>> {
let any_receiver = self.inner.typed(); let any_receiver = self.inner.typed();
let receiver = any_receiver.dyn_typed_receiver::<M>(); let receiver = any_receiver.dyn_typed_receiver::<M>();
let res = receiver.send(mid, msg); let res = receiver.send(mid, msg);
@ -452,7 +452,7 @@ impl Receiver {
error!("Response cannot be processed!"); error!("Response cannot be processed!");
} }
} else if TypeId::of::<R>() != TypeId::of::<()>() { } else if TypeId::of::<R>() != TypeId::of::<()>() {
warn!("Non-void response has no listeners!"); warn!("Non-void response has no waiters!");
} }
}, },