try_clone

This commit is contained in:
Andrey Tkachenko 2021-09-28 18:06:57 +04:00
parent 7034e8b87b
commit 1bff5cc70a
11 changed files with 63 additions and 30 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.9.8"
version = "0.9.9"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]
@ -17,7 +17,7 @@ members = [
]
[dependencies]
messagebus_derive = "0.2"
messagebus_derive = "0.2.4"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
parking_lot = "0.11"
async-trait = "0.1"

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus_derive"
version = "0.2.3"
version = "0.2.4"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -46,11 +46,15 @@ fn clone_part(ast: &syn::DeriveInput, has_clone: bool) -> proc_macro2::TokenStre
fn try_clone_boxed(&self) -> std::option::Option<std::boxed::Box<dyn messagebus::Message>>{
Some(Box::new(core::clone::Clone::clone(self)))
}
fn try_clone(&self) -> Option<Self> {
Some(core::clone::Clone::clone(self))
}
}
} else {
quote! {
fn try_clone_into(&self, into: &mut dyn core::any::Any) -> bool {false}
fn try_clone_boxed(&self) -> std::option::Option<std::boxed::Box<dyn messagebus::Message>>{ None }
fn try_clone(&self) -> Option<Self> { None }
}
}
}

View File

@ -1,8 +1,7 @@
use messagebus::Bus;
use messagebus_remote::relays::QuicClientRelay;
use serde_derive::{Serialize, Deserialize};
use messagebus::{Message, derive::Message};
use messagebus::derive::Message;
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
#[namespace("example")]
@ -20,17 +19,16 @@ pub struct Resp {
text: String
}
#[tokio::main]
async fn main() {
let relay = QuicClientRelay::new(
"./examples/cert.der",
"127.0.0.1:8083".parse().unwrap(),
"localhost".into(),
vec![
("example::Req".into(), "example::Resp".into(), "GenericError".into())
]
(vec![
("example::Req".into(), "example::Resp".into(), "GenericError".into())
],
vec![])
).unwrap();
let (b, poller) = Bus::build()

View File

@ -63,9 +63,10 @@ async fn main() {
"./examples/cert.key",
"./examples/cert.der",
"0.0.0.0:8083".parse().unwrap(),
(vec![],
vec![
("example::Req".into(), "example::Resp".into(), "GenericError".into())
]
])
).unwrap();
let (b, poller) = Bus::build()

View File

@ -417,7 +417,7 @@ mod tests {
BodyType, ProtocolHeader, ProtocolHeaderActionKind, ProtocolHeaderFlags, ProtocolPacket,
};
use messagebus::Event;
use messagebus::{derive::Message, Bus, Message, TypeTagged};
use messagebus::{derive::Message, Bus, TypeTagged};
use serde_derive::{Deserialize, Serialize};
use std::borrow::Cow;

View File

@ -23,7 +23,7 @@ pub type QuicClientRelay = QuicRelay<QuicClientEndpoint>;
pub type QuicServerRelay = QuicRelay<QuicServerEndpoint>;
use super::{GenericEventStream, MessageTable};
pub type MessageList = Vec<(TypeTag, TypeTag, TypeTag)>;
pub trait WaitIdle<'a>: Sync {
type Fut: Future + Send + 'a;
@ -42,7 +42,7 @@ pub struct QuicRelay<B> {
base: Mutex<Option<B>>,
self_id: Arc<AtomicU64>,
in_table: MessageTable,
out_table: MessageTable,
_out_table: MessageTable,
item_sender: UnboundedSender<Option<ProtocolItem>>,
item_receiver: Mutex<Option<UnboundedReceiver<Option<ProtocolItem>>>>,
event_sender: UnboundedSender<RecvDo>,
@ -50,7 +50,7 @@ pub struct QuicRelay<B> {
}
impl QuicRelay<QuicClientEndpoint> {
pub fn new(cert: &str, addr: SocketAddr, host: String, table: (Vec<(TypeTag, TypeTag, TypeTag)>, Vec<(TypeTag, TypeTag, TypeTag)>)) -> Result<Self, crate::error::Error> {
pub fn new(cert: &str, addr: SocketAddr, host: String, table: (MessageList, MessageList)) -> Result<Self, crate::error::Error> {
let (item_sender, item_receiver) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded_channel();
@ -58,7 +58,7 @@ impl QuicRelay<QuicClientEndpoint> {
base: Mutex::new(Some(QuicClientEndpoint::new(cert, addr, host)?)),
self_id: Arc::new(AtomicU64::new(0)),
in_table: MessageTable::from(table.0),
out_table: MessageTable::from(table.1),
_out_table: MessageTable::from(table.1),
item_sender,
item_receiver: Mutex::new(Some(item_receiver)),
event_sender,
@ -68,7 +68,7 @@ impl QuicRelay<QuicClientEndpoint> {
}
impl QuicRelay<QuicServerEndpoint> {
pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: (Vec<(TypeTag, TypeTag, TypeTag)>, Vec<(TypeTag, TypeTag, TypeTag)>)) -> Result<Self, crate::error::Error> {
pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: (MessageList, MessageList)) -> Result<Self, crate::error::Error> {
let (item_sender, item_receiver) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded_channel();
@ -76,7 +76,7 @@ impl QuicRelay<QuicServerEndpoint> {
base: Mutex::new(Some(QuicServerEndpoint::new(key_path, cert_path, &addr )?)),
self_id: Arc::new(AtomicU64::new(0)),
in_table: MessageTable::from(table.0),
out_table: MessageTable::from(table.1),
_out_table: MessageTable::from(table.1),
item_sender,
item_receiver: Mutex::new(Some(item_receiver)),
event_sender,

View File

@ -77,7 +77,7 @@ impl<T, F, P, B> RegisterEntry<UnsyncEntry, T, F, P, B> {
{
let (inner, poller) = S::build(cfg);
let receiver = Receiver::new::<M, R, E, S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), queue, inner);
let receiver = Receiver::new::<M, R, E, S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), queue, true, inner);
let poller2 = receiver.start_polling();
self.receivers.insert(M::type_tag_(), receiver);
self.pollers.push(poller(self.item.clone()));
@ -146,7 +146,7 @@ impl<T, F, P, B> RegisterEntry<SyncEntry, T, F, P, B> {
{
let (inner, poller) = S::build(cfg);
let receiver = Receiver::new::<M, R, E, S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), queue, inner);
let receiver = Receiver::new::<M, R, E, S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), queue, true, inner);
let poller2 = receiver.start_polling();
self.receivers.insert(M::type_tag_(), receiver);
self.pollers.push(poller(self.item.clone()));

View File

@ -32,6 +32,8 @@ pub trait Message: MessageBounds {
fn try_clone_into(&self, into: &mut dyn Any) -> bool;
fn try_clone_boxed(&self) -> Option<Box<dyn Message>>;
fn try_clone(&self) -> Option<Self> where Self: Sized;
}
macro_rules! gen_impls {
@ -142,6 +144,10 @@ impl Message for () {
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(()))
}
fn try_clone(&self) -> Option<Self> {
Some(())
}
}
pub trait IntoBoxedMessage {
@ -254,6 +260,10 @@ mod tests {
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
None
}
fn try_clone(&self) -> Option<Self> {
None
}
}
#[derive(Debug, Clone)]
@ -310,6 +320,10 @@ mod tests {
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(self.clone()))
}
fn try_clone(&self) -> Option<Self> {
Some(self.clone())
}
}
#[derive(Debug, Clone, serde_derive::Serialize, serde_derive::Deserialize)]
@ -368,6 +382,10 @@ mod tests {
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
Some(Box::new(self.clone()))
}
fn try_clone(&self) -> Option<Self> {
Some(self.clone())
}
}
#[test]

View File

@ -35,7 +35,7 @@ pub use builder::Module;
pub use envelop::{IntoBoxedMessage, Message, MessageBounds, SharedMessage, TypeTag, TypeTagged};
pub use handler::*;
pub use receiver::{
Action, Event, ReciveTypedReceiver, ReciveUntypedReceiver, SendTypedReceiver,
Action, Event, EventBoxed, ReciveTypedReceiver, ReciveUntypedReceiver, SendTypedReceiver,
SendUntypedReceiver, TypeTagAccept,
};
pub use relay::Relay;

View File

@ -83,7 +83,7 @@ pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Syn
&self,
listener: oneshot::Sender<Result<R, Error<(), E>>>,
) -> Result<u64, Error>;
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error>;
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<Option<R>, Error>;
}
pub trait TypeTagAccept {
@ -148,6 +148,8 @@ pub enum Action {
Stats,
}
pub type EventBoxed<E> = Event<Box<dyn Message>, E>;
#[non_exhaustive]
#[derive(Debug)]
pub enum Event<M, E: StdSyncSendError> {
@ -242,8 +244,15 @@ where
self.context.processing.fetch_sub(1, Ordering::SeqCst);
self.context.response.notify_one();
if let Err(err) = self.response(mid, resp) {
error!("Response Error: {}", err);
match self.response(mid, resp) {
Ok(Some(_resp)) => {
if self.context.resend_unused_resp {
// TODO
}
},
Ok(None) => (),
Err(err) => error!("Response Error: {}", err),
}
}
@ -274,8 +283,8 @@ where
.ok_or(Error::AddListenerError)? as _)
}
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<(), Error> {
if let Some(waiter) = self.waiters.take(mid as _) {
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<Option<R>, Error> {
Ok(if let Some(waiter) = self.waiters.take(mid as _) {
match waiter {
Waiter::WithErrorType(sender) => sender.send(resp).unwrap(),
Waiter::WithoutErrorType(sender) => {
@ -289,9 +298,10 @@ where
sender.send(resp.map(|x| x.into_boxed())).unwrap()
}
}
}
Ok(())
None
} else {
resp.ok()
})
}
}
@ -678,6 +688,7 @@ struct ReceiverContext {
ready: Notify,
response: Arc<Notify>,
init_sent: AtomicBool,
resend_unused_resp: bool,
}
impl PermitDrop for ReceiverContext {
@ -715,7 +726,7 @@ impl core::cmp::Eq for Receiver {}
impl Receiver {
#[inline]
pub(crate) fn new<M, R, E, S>(id: u64, limit: u64, inner: S) -> Self
pub(crate) fn new<M, R, E, S>(id: u64, limit: u64, resend: bool, inner: S) -> Self
where
M: Message,
R: Message,
@ -738,6 +749,7 @@ impl Receiver {
closed: Notify::new(),
ready: Notify::new(),
response: Arc::new(Notify::new()),
resend_unused_resp: resend,
}),
_m: Default::default(),
}),