Fix deadlock bug with flush

This commit is contained in:
Andrey Tkachenko 2021-12-03 15:30:13 +04:00
parent c9f03f5b49
commit b06eac6504
35 changed files with 902 additions and 398 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.9.11"
version = "0.9.12"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -4,7 +4,7 @@ extern crate proc_macro;
// use proc_macro::{TokenStream};
use proc_macro2::{Ident, Span, TokenStream};
use quote::{ToTokens, quote};
use quote::{quote, ToTokens};
use std::collections::hash_map;
use std::fmt::Write;
use std::hash::Hasher;
@ -244,19 +244,18 @@ pub fn derive_message(input: proc_macro::TokenStream) -> proc_macro::TokenStream
if tags.has_shared {
let bound: syn::TypeParamBound =
syn::parse_str("messagebus::__reexport::serde::Serialize").unwrap();
syn::parse_str("messagebus::__reexport::serde::Serialize").unwrap();
params.bounds.push(bound);
let bound: syn::TypeParamBound =
syn::parse_str("messagebus::__reexport::serde::Deserialize<'de>").unwrap();
syn::parse_str("messagebus::__reexport::serde::Deserialize<'de>").unwrap();
params.bounds.push(bound);
}
if tags.has_clone {
let bound: syn::TypeParamBound =
syn::parse_str("core::clone::Clone").unwrap();
let bound: syn::TypeParamBound = syn::parse_str("core::clone::Clone").unwrap();
params.bounds.push(bound);
}
@ -269,7 +268,10 @@ pub fn derive_message(input: proc_macro::TokenStream) -> proc_macro::TokenStream
let shared_part = shared_part(&ast, tags.has_shared);
let clone_part = clone_part(&ast, tags.has_clone);
let init = Ident::new(&format!("__init_{}", hash(ast.clone().into_token_stream())), Span::call_site());
let init = Ident::new(
&format!("__init_{}", hash(ast.clone().into_token_stream())),
Span::call_site(),
);
let init_impl = if tags.has_shared && impl_generics.params.is_empty() {
quote! {
#[allow(non_upper_case_globals)]
@ -279,11 +281,15 @@ pub fn derive_message(input: proc_macro::TokenStream) -> proc_macro::TokenStream
}
}
} else {
quote!{}
quote! {}
};
if !impl_generics.params.is_empty() && tags.has_shared {
impl_generics.params.push(syn::GenericParam::Lifetime(syn::LifetimeDef::new(syn::Lifetime::new("'de", Span::call_site()))))
impl_generics
.params
.push(syn::GenericParam::Lifetime(syn::LifetimeDef::new(
syn::Lifetime::new("'de", Span::call_site()),
)))
}
let tokens = quote! {
@ -341,4 +347,4 @@ fn hash(input: TokenStream) -> u64 {
let mut hasher = hash_map::DefaultHasher::new();
hasher.write(input.to_string().as_bytes());
hasher.finish()
}
}

View File

@ -1,10 +1,13 @@
use messagebus::{AsyncHandler, Bus, Message, TypeTagged, derive::{Message, Error as MbError}, error::{self, GenericError}};
use messagebus_remote::relays::{QuicServerRelay};
use serde_derive::{Serialize, Deserialize};
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error::{self, GenericError},
AsyncHandler, Bus, Message, TypeTagged,
};
use messagebus_remote::relays::QuicServerRelay;
use serde_derive::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
@ -17,13 +20,12 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
#[namespace("example")]
#[message(shared, clone)]
pub struct Req {
data: i32,
text: String
text: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
@ -31,10 +33,9 @@ pub struct Req {
#[message(shared, clone)]
pub struct Resp {
data: i32,
text: String
text: String,
}
struct TmpReceiver;
#[async_trait]
@ -56,26 +57,29 @@ impl AsyncHandler<Req> for TmpReceiver {
}
}
#[tokio::main]
async fn main() {
let relay = QuicServerRelay::new(
"./examples/cert.key",
"./examples/cert.der",
"0.0.0.0:8083".parse().unwrap(),
(vec![],
vec![
(Req::type_tag_(), Some((Resp::type_tag_(), GenericError::type_tag_())))
])
).unwrap();
"./examples/cert.der",
"0.0.0.0:8083".parse().unwrap(),
(
vec![],
vec![(
Req::type_tag_(),
Some((Resp::type_tag_(), GenericError::type_tag_())),
)],
),
)
.unwrap();
let (b, poller) = Bus::build()
.register_relay(relay)
.register(TmpReceiver)
.subscribe_async::<Req>(8, Default::default())
.subscribe_async::<Req>(8, Default::default())
.done()
.build();
b.ready().await;
println!("ready");

View File

@ -1,15 +1,15 @@
use messagebus::derive::Message;
use messagebus::error::GenericError;
use messagebus::{Bus, TypeTagged};
use messagebus_remote::relays::TcpRelay;
use serde_derive::{Serialize, Deserialize};
use messagebus::derive::Message;
use serde_derive::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
#[namespace("example")]
#[message(shared, clone)]
pub struct Req {
data: i32,
text: String
text: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
@ -17,32 +17,38 @@ pub struct Req {
#[message(shared, clone)]
pub struct Resp {
data: i32,
text: String
text: String,
}
#[tokio::main]
async fn main() {
let relay = TcpRelay::new(false, "0.0.0.0:8083".parse().unwrap(),
(vec![
(Req::type_tag_(), Some((Resp::type_tag_(), GenericError::type_tag_())))
],
vec![])
let relay = TcpRelay::new(
false,
"0.0.0.0:8083".parse().unwrap(),
(
vec![(
Req::type_tag_(),
Some((Resp::type_tag_(), GenericError::type_tag_())),
)],
vec![],
),
);
let (b, poller) = Bus::build()
.register_relay(relay)
.build();
let (b, poller) = Bus::build().register_relay(relay).build();
b.ready().await;
println!("ready");
let resp: Resp = b.request(Req {
data: 12,
text: String::from("test")
}, Default::default())
.await
.unwrap();
let resp: Resp = b
.request(
Req {
data: 12,
text: String::from("test"),
},
Default::default(),
)
.await
.unwrap();
println!("resp {:?}", resp);

View File

@ -1,10 +1,13 @@
use messagebus::{AsyncHandler, Bus, Message, TypeTagged, derive::{Message, Error as MbError}, error::{self, GenericError}};
use messagebus_remote::relays::TcpRelay;
use serde_derive::{Serialize, Deserialize};
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error::{self, GenericError},
AsyncHandler, Bus, Message, TypeTagged,
};
use messagebus_remote::relays::TcpRelay;
use serde_derive::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
@ -17,13 +20,12 @@ impl<M: Message> From<error::Error<M>> for Error {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
#[namespace("example")]
#[message(shared, clone)]
pub struct Req {
data: i32,
text: String
text: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
@ -31,10 +33,9 @@ pub struct Req {
#[message(shared, clone)]
pub struct Resp {
data: i32,
text: String
text: String,
}
struct TmpReceiver;
#[async_trait]
@ -56,23 +57,27 @@ impl AsyncHandler<Req> for TmpReceiver {
}
}
#[tokio::main]
async fn main() {
let relay = TcpRelay::new(true, "0.0.0.0:8083".parse().unwrap(),
(vec![],
vec![
(Req::type_tag_(), Some((Resp::type_tag_(), GenericError::type_tag_())))
])
let relay = TcpRelay::new(
true,
"0.0.0.0:8083".parse().unwrap(),
(
vec![],
vec![(
Req::type_tag_(),
Some((Resp::type_tag_(), GenericError::type_tag_())),
)],
),
);
let (b, poller) = Bus::build()
.register_relay(relay)
.register(TmpReceiver)
.subscribe_async::<Req>(8, Default::default())
.subscribe_async::<Req>(8, Default::default())
.done()
.build();
b.ready().await;
println!("ready");

View File

@ -36,7 +36,7 @@ pub enum Error {
#[error("ProtocolParseError {0}")]
ProtocolParseError(String),
#[error("UnknownCodec")]
UnknownCodec,

View File

@ -129,7 +129,7 @@ pub enum ProtocolItem {
Nop,
Event(Event<Box<dyn SharedMessage>, messagebus::error::GenericError>),
Action(Action),
Send(u64, Box<dyn SharedMessage>, bool)
Send(u64, Box<dyn SharedMessage>, bool),
}
impl From<Action> for ProtocolItem {
@ -154,16 +154,20 @@ impl ProtocolItem {
pub fn unwrap_send(self) -> Result<(u64, Box<dyn SharedMessage>, bool), ProtocolItem> {
match self {
ProtocolItem::Send(a, b, c) => Ok((a, b, c)),
other => Err(other)
other => Err(other),
}
}
pub fn serialize<'a>(&self, mut body_type: BodyType, body_buff: &'a mut Vec<u8>) -> Result<ProtocolPacket<'a>, crate::error::Error> {
pub fn serialize<'a>(
&self,
mut body_type: BodyType,
body_buff: &'a mut Vec<u8>,
) -> Result<ProtocolPacket<'a>, crate::error::Error> {
let mut argument = 0;
let mut type_tag = None;
let mut body = None;
let mut flags = ProtocolHeaderFlags::empty();
let kind = match self {
ProtocolItem::Nop => ProtocolHeaderActionKind::Nop,
ProtocolItem::Action(action) => match action {
@ -172,9 +176,10 @@ impl ProtocolItem {
Action::Init(..) => ProtocolHeaderActionKind::Initialize,
Action::Sync => ProtocolHeaderActionKind::Synchronize,
_ => unimplemented!(),
}
},
ProtocolItem::Send(mid, msg, req) => {
let msg = msg.as_shared_ref()
let msg = msg
.as_shared_ref()
.ok_or(crate::error::Error::UnknownCodec)?;
argument = *mid;
@ -185,7 +190,7 @@ impl ProtocolItem {
body = Some(generic_serialize(body_type, &*msg, body_buff)?);
ProtocolHeaderActionKind::Send
},
}
ProtocolItem::Event(ev) => match ev {
Event::Response(mid, res) => {
argument = *mid;
@ -195,16 +200,17 @@ impl ProtocolItem {
match res {
Ok(msg) => {
let msg = msg.as_shared_ref()
let msg = msg
.as_shared_ref()
.ok_or(crate::error::Error::UnknownCodec)?;
type_tag = Some(msg.type_tag());
body = Some(generic_serialize(body_type, &*msg, body_buff)?);
}
Err(err) => {
flags.set(ProtocolHeaderFlags::ERROR, true);
type_tag = Some("GenericError".into());
body_type = BodyType::Utf8;
body = Some(format!("{}", err).into_bytes().into());
@ -212,7 +218,7 @@ impl ProtocolItem {
}
ProtocolHeaderActionKind::Response
},
}
Event::Error(err) => {
flags.set(ProtocolHeaderFlags::ERROR, true);
flags.set(ProtocolHeaderFlags::BODY, true);
@ -230,7 +236,7 @@ impl ProtocolItem {
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
flags.set(ProtocolHeaderFlags::ARGUMENT, true);
ProtocolHeaderActionKind::BatchComplete
},
}
Event::Synchronized(res) => {
match res {
Ok(_) => {}
@ -238,31 +244,31 @@ impl ProtocolItem {
flags.set(ProtocolHeaderFlags::BODY, true);
flags.set(ProtocolHeaderFlags::ERROR, true);
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
type_tag = Some("GenericError".into());
body_type = BodyType::Utf8;
body = Some(format!("{}", err).into_bytes().into());
}
}
ProtocolHeaderActionKind::Synchronized
ProtocolHeaderActionKind::Synchronized
}
Event::InitFailed(err) => {
flags.set(ProtocolHeaderFlags::BODY, true);
flags.set(ProtocolHeaderFlags::ERROR, true);
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
type_tag = Some("GenericError".into());
body_type = BodyType::Utf8;
body = Some(format!("{}", err).into_bytes().into());
ProtocolHeaderActionKind::Ready
},
}
Event::Ready => ProtocolHeaderActionKind::Ready,
Event::Pause => ProtocolHeaderActionKind::Pause,
Event::Exited => ProtocolHeaderActionKind::Exited,
Event::Flushed => ProtocolHeaderActionKind::Flushed,
_ => unimplemented!()
}
_ => unimplemented!(),
},
};
Ok(ProtocolPacket {
@ -273,12 +279,11 @@ impl ProtocolItem {
body_type,
argument,
},
body
body,
})
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ProtocolPacket<'a> {
pub header: ProtocolHeader<'a>,
@ -286,12 +291,9 @@ pub struct ProtocolPacket<'a> {
}
impl<'a> ProtocolPacket<'a> {
pub fn deserialize(
self,
_bus: &Bus,
) -> Result<ProtocolItem, crate::error::Error>
{
let type_tag: Option<TypeTag> = if self.header.flags.contains(ProtocolHeaderFlags::TYPE_TAG) {
pub fn deserialize(self, _bus: &Bus) -> Result<ProtocolItem, crate::error::Error> {
let type_tag: Option<TypeTag> = if self.header.flags.contains(ProtocolHeaderFlags::TYPE_TAG)
{
self.header
.type_tag
.map(|x| String::from_utf8_lossy(x.as_ref()).to_string().into())
@ -302,12 +304,17 @@ impl<'a> ProtocolPacket<'a> {
let (body, error) = if self.header.flags.contains(ProtocolHeaderFlags::ERROR) {
let error = messagebus::error::GenericError {
type_tag: type_tag.clone().unwrap(),
description: self.body.map(|x|String::from_utf8_lossy(x.as_ref()).to_string()).unwrap_or_default(),
description: self
.body
.map(|x| String::from_utf8_lossy(x.as_ref()).to_string())
.unwrap_or_default(),
};
(None, Some(messagebus::error::Error::Other(error)))
} else if self.header.flags.contains(ProtocolHeaderFlags::TT_AND_BODY) {
let body = self.body.ok_or_else(|| crate::error::Error::ProtocolParseError("No body".to_string()))?;
let body = self
.body
.ok_or_else(|| crate::error::Error::ProtocolParseError("No body".to_string()))?;
let res = generic_deserialize(self.header.body_type, body.as_ref(), |de| {
messagebus::deserialize_shared_message(type_tag.clone().unwrap(), de)
.map_err(|x| x.map_msg(|_| ()))
@ -329,43 +336,52 @@ impl<'a> ProtocolPacket<'a> {
Ok(ProtocolItem::Event(match self.header.kind {
ProtocolHeaderActionKind::Response => Event::Response(
argument
.ok_or_else(|| crate::error::Error::ProtocolParseError("Event::Response expected argument".into()))?,
error
.map(Err)
.or_else(|| body.map(Ok))
.ok_or_else(|| crate::error::Error::ProtocolParseError("Event::Response expected body".into()))?,
argument.ok_or_else(|| {
crate::error::Error::ProtocolParseError(
"Event::Response expected argument".into(),
)
})?,
error.map(Err).or_else(|| body.map(Ok)).ok_or_else(|| {
crate::error::Error::ProtocolParseError("Event::Response expected body".into())
})?,
),
ProtocolHeaderActionKind::Synchronized => {
Event::Synchronized(error.map(Err).unwrap_or(Ok(())))
}
ProtocolHeaderActionKind::Error => {
Event::Error(error.ok_or_else(|| crate::error::Error::ProtocolParseError("Event::Error expected body".into()))?)
ProtocolHeaderActionKind::Error => Event::Error(error.ok_or_else(|| {
crate::error::Error::ProtocolParseError("Event::Error expected body".into())
})?),
ProtocolHeaderActionKind::BatchComplete => {
Event::BatchComplete(type_tag.unwrap(), self.header.argument)
}
ProtocolHeaderActionKind::BatchComplete => Event::BatchComplete(type_tag.unwrap(), self.header.argument),
ProtocolHeaderActionKind::Flushed => Event::Flushed,
ProtocolHeaderActionKind::Exited => Event::Exited,
ProtocolHeaderActionKind::Ready => Event::Ready,
ProtocolHeaderActionKind::Pause => Event::Pause,
other => return Ok(ProtocolItem::Action(match other {
ProtocolHeaderActionKind::Initialize => Action::Init(self.header.argument),
ProtocolHeaderActionKind::Close => Action::Close,
ProtocolHeaderActionKind::Flush => Action::Flush,
ProtocolHeaderActionKind::Synchronize => Action::Sync,
ProtocolHeaderActionKind::Send => {
let req = argument.is_some();
let mid = self.header.argument;
let body = body.ok_or_else(|| crate::error::Error::ProtocolParseError(
format!("Action::Send[{:?}] expected body", type_tag)
))?;
other => {
return Ok(ProtocolItem::Action(match other {
ProtocolHeaderActionKind::Initialize => Action::Init(self.header.argument),
ProtocolHeaderActionKind::Close => Action::Close,
ProtocolHeaderActionKind::Flush => Action::Flush,
ProtocolHeaderActionKind::Synchronize => Action::Sync,
ProtocolHeaderActionKind::Send => {
let req = argument.is_some();
let mid = self.header.argument;
let body = body.ok_or_else(|| {
crate::error::Error::ProtocolParseError(format!(
"Action::Send[{:?}] expected body",
type_tag
))
})?;
return Ok(ProtocolItem::Send(mid, body, req));
},
ProtocolHeaderActionKind::Nop => return Ok(ProtocolItem::Nop),
return Ok(ProtocolItem::Send(mid, body, req));
}
ProtocolHeaderActionKind::Nop => return Ok(ProtocolItem::Nop),
_ => unreachable!()
})),
_ => unreachable!(),
}))
}
}))
}
}
@ -393,7 +409,11 @@ where
}
}
fn generic_serialize<'a>(kind: BodyType, msg: &dyn SharedMessage, buffer: &'a mut Vec<u8>) -> Result<Cow<'a, [u8]>, crate::error::Error> {
fn generic_serialize<'a>(
kind: BodyType,
msg: &dyn SharedMessage,
buffer: &'a mut Vec<u8>,
) -> Result<Cow<'a, [u8]>, crate::error::Error> {
match kind {
BodyType::Cbor => {
let mut cbor_se = serde_cbor::Serializer::new(&mut *buffer);
@ -435,8 +455,7 @@ mod tests {
#[test]
fn test_proto_pack_event() {
let (bus, _) = Bus::build()
.build();
let (bus, _) = Bus::build().build();
let pkt = ProtocolPacket {
header: ProtocolHeader {
@ -451,9 +470,9 @@ mod tests {
let event = match pkt.deserialize(&bus).unwrap() {
ProtocolItem::Event(ev) => ev,
_ => unreachable!()
_ => unreachable!(),
};
assert!(matches!(event, Event::Response(..)));
match event {
@ -474,8 +493,7 @@ mod tests {
#[test]
fn test_proto_pack_event_error() {
let (bus, _) = Bus::build()
.build();
let (bus, _) = Bus::build().build();
let pkt = ProtocolPacket {
header: ProtocolHeader {
@ -490,9 +508,9 @@ mod tests {
let event = match pkt.deserialize(&bus).unwrap() {
ProtocolItem::Event(ev) => ev,
_ => unreachable!()
_ => unreachable!(),
};
assert!(matches!(event, Event::Response(..)));
#[allow(clippy::unit_cmp)]
@ -500,7 +518,7 @@ mod tests {
Event::Response(mid, msg) => {
assert_eq!(mid, 222);
let msg = msg.unwrap_err();
assert!(matches!(msg, messagebus::error::Error::Other(val) if (
assert_eq!(val.type_tag, TestSharedMessage::type_tag_()) == () &&
assert_eq!(val.description, "error description") == ()

View File

@ -1,6 +1,6 @@
// #[cfg(feature = "quic")]
mod quic;
mod redis;
// mod redis;
mod tcp;
use futures::Stream;
@ -11,7 +11,6 @@ use std::{collections::HashMap, pin::Pin};
pub use quic::*;
pub use tcp::*;
pub(crate) type GenericEventStream =
Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, GenericError>> + Send>>;
@ -40,8 +39,7 @@ impl MessageTable {
}
pub fn iter_keys(&self) -> impl Iterator<Item = &str> + '_ {
self.table.keys()
.map(|k|k.as_ref())
self.table.keys().map(|k| k.as_ref())
}
#[inline]
@ -53,26 +51,26 @@ impl MessageTable {
pub fn accept_message(&self, msg: &TypeTag) -> bool {
self.table
.get(msg)
.map_or(false, |v| {
v
.iter()
.any(Option::is_none)
})
.map_or(false, |v| v.iter().any(Option::is_none))
}
pub fn accept_request(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
pub fn accept_request(
&self,
msg: &TypeTag,
resp: Option<&TypeTag>,
err: Option<&TypeTag>,
) -> bool {
self.table.get(msg).map_or(false, |v| {
v
.iter()
.filter_map(Option::as_ref)
.any(|(r, e)| {
resp.map_or(true, |resp| resp.as_ref() == r.as_ref())
&& err.map_or(true, |err| err.as_ref() == e.as_ref())
})
v.iter().filter_map(Option::as_ref).any(|(r, e)| {
resp.map_or(true, |resp| resp.as_ref() == r.as_ref())
&& err.map_or(true, |err| err.as_ref() == e.as_ref())
})
})
}
pub fn iter_types(&self) -> impl Iterator<Item = (&'_ TypeTag, Option<&'_ (TypeTag, TypeTag)>)> + '_ {
pub fn iter_types(
&self,
) -> impl Iterator<Item = (&'_ TypeTag, Option<&'_ (TypeTag, TypeTag)>)> + '_ {
self.table
.iter()
.map(|(k, v)| v.iter().map(move |resp| (k, resp.as_ref())))

View File

@ -1,7 +1,11 @@
use crate::{error::Error};
use crate::error::Error;
use futures::{Future, Stream};
use quinn::{Connecting};
use std::{net::SocketAddr, pin::Pin, task::{Context, Poll}};
use quinn::Connecting;
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use super::WaitIdle;
@ -27,9 +31,10 @@ impl QuicClientEndpoint {
let (endpoint, _) = endpoint.bind(&"0.0.0.0:0".parse().unwrap())?;
Ok(Self {
addr, host,
endpoint
Ok(Self {
addr,
host,
endpoint,
})
}
}
@ -40,12 +45,12 @@ impl Stream for QuicClientEndpoint {
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Poll::Ready(this.endpoint.connect(&this.addr, &this.host).ok())
}
}
}
impl<'a> WaitIdle<'a> for QuicClientEndpoint {
type Fut = Pin<Box<dyn Future<Output = ()> + Send + 'a>> ;
type Fut = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
fn wait_idle(&'a self) -> Self::Fut {
Box::pin(self.endpoint.wait_idle())
}
}
}

View File

@ -11,7 +11,7 @@ use futures::{pin_mut, Future, Stream, StreamExt};
use messagebus::error::GenericError;
use messagebus::{
Action, Bus, Event, EventBoxed, Message, ReciveUntypedReceiver, SendOptions,
SendUntypedReceiver, TypeTag, TypeTagAccept,
SendUntypedReceiver, TypeTag, TypeTagAccept, TypeTagAcceptItem,
};
use parking_lot::Mutex;
use quinn::{Connecting, IncomingBiStreams};
@ -105,7 +105,7 @@ impl<B> TypeTagAccept for QuicRelay<B>
where
B: Stream<Item = Connecting> + Send + 'static,
{
fn iter_types(&self) -> Box<dyn Iterator<Item = (TypeTag, Option<(TypeTag, TypeTag)>)> + '_> {
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem> + '_> {
let iter = self.in_table.iter_types();
Box::new(iter.map(|(x, y)| (x.clone(), y.cloned())))
}
@ -352,7 +352,7 @@ where
}
let mut reader = &buff[..];
let version = reader.get_u16();
let _version = reader.get_u16();
let content_type = reader.get_u16();
let body_size = reader.get_u64();
@ -487,4 +487,3 @@ where
)
}
}

View File

@ -1,7 +1,12 @@
use crate::error::Error;
use futures::{Future, Stream};
use quinn::Connecting;
use std::{net::SocketAddr, pin::Pin, sync::Arc, task::{Context, Poll}};
use std::{
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use super::WaitIdle;
@ -17,20 +22,20 @@ impl QuicServerEndpoint {
let mut server_config = quinn::ServerConfig::default();
server_config.transport = Arc::new(transport_config);
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
server_config.protocols(super::ALPN_QUIC_HTTP);
server_config.enable_keylog();
let key = std::fs::read(key_path)?;
let cert_der = std::fs::read(cert_path)?;
let key = quinn::PrivateKey::from_der(&key)?;
let cert_chain = quinn::Certificate::from_der(&cert_der)?;
let cert = quinn::CertificateChain::from_certs([cert_chain]);
server_config.certificate(cert, key)?;
let mut endpoint = quinn::Endpoint::builder();
@ -38,10 +43,7 @@ impl QuicServerEndpoint {
let (endpoint, incoming) = endpoint.bind(addr)?;
Ok(Self {
endpoint,
incoming
})
Ok(Self { endpoint, incoming })
}
}
@ -51,12 +53,12 @@ impl Stream for QuicServerEndpoint {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
unsafe { Pin::new_unchecked(&mut this.incoming) }.poll_next(cx)
}
}
}
impl<'a> WaitIdle<'a> for QuicServerEndpoint {
type Fut = Pin<Box<dyn Future<Output = ()> + Send + 'a>> ;
type Fut = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
fn wait_idle(&'a self) -> Self::Fut {
Box::pin(self.endpoint.wait_idle())
}
}
}

View File

@ -1,25 +1,22 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept};
use futures::StreamExt;
use messagebus::{
Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept,
};
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use crate::proto::{BodyType, ProtocolItem};
use super::{GenericEventStream, MessageTable};
use crate::proto::{BodyType, ProtocolItem};
use redis::AsyncCommands;
#[derive(Debug)]
enum RecvDo {
Pause,
Ready,
Closed,
}
pub struct RedisRelay {
client: Arc<redis::Client>,
self_id: Arc<AtomicU64>,
@ -31,7 +28,10 @@ pub struct RedisRelay {
}
impl RedisRelay {
pub fn new(path: &str, table: Vec<(TypeTag, Option<(TypeTag, TypeTag)>)>) -> Result<Self, crate::error::Error> {
pub fn new(
path: &str,
table: Vec<(TypeTag, Option<(TypeTag, TypeTag)>)>,
) -> Result<Self, crate::error::Error> {
let client = redis::Client::open(path)?;
let (item_sender, item_receiver) = mpsc::unbounded_channel();
@ -81,7 +81,7 @@ impl SendUntypedReceiver for RedisRelay {
// let mut item = None;
event_sender.send(RecvDo::Ready).unwrap();
while let Some(Some(item)) = rx.recv().await {
header_buff.clear();
body_buff.clear();
@ -92,14 +92,15 @@ impl SendUntypedReceiver for RedisRelay {
let channel = match &item {
ProtocolItem::Action(_) => "mbus_action".into(),
ProtocolItem::Send(_, msg, _) => format!("mbus_request::{}", msg.type_tag()),
ProtocolItem::Event(ev) => "mbus_response::".into(),
_ => unreachable!()
ProtocolItem::Send(_, msg, _) => {
format!("mbus_request::{}", msg.type_tag())
}
ProtocolItem::Event(_ev) => "mbus_response::".into(),
_ => unreachable!(),
};
let () = connection.publish(channel, &header_buff).await.unwrap();
}
});
}
@ -124,12 +125,14 @@ impl SendUntypedReceiver for RedisRelay {
match msg.as_shared_boxed() {
Ok(msg) => {
if let Err(err) = self.item_sender.send(Some((mid, msg, req).into())) {
Err(messagebus::error::Error::TryAgain(err.0.unwrap().unwrap_send().unwrap().1.upcast_box()))
Err(messagebus::error::Error::TryAgain(
err.0.unwrap().unwrap_send().unwrap().1.upcast_box(),
))
} else {
Ok(())
}
}
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
}
}
@ -144,112 +147,106 @@ impl ReciveUntypedReceiver for RedisRelay {
let mut recv = self.event_receiver.lock().take().unwrap();
Box::pin(
futures::stream::poll_fn(move |cx|recv.poll_recv(cx))
futures::stream::poll_fn(move |cx| recv.poll_recv(cx))
.map(move |recv_do| {
let self_id = self_id.clone();
let bus = bus.clone();
let sender = sender.clone();
match recv_do {
// RecvDo::Incoming(incoming) => {
// futures::stream::unfold((incoming, bus, sender, self_id), |(mut incoming, bus, sender, self_id)| async move {
// loop {
// let (_, recv) = match incoming.next().await? {
// Ok(recv) => recv,
// Err(err) => {
// println!("error: {}", err);
// return None;
// }
// };
// let buff = recv
// .read_to_end(usize::max_value())
// .await
// .unwrap();
// // assert_eq!(&buff[0..4], b"MBUS");
// RecvDo::Incoming(incoming) => {
// futures::stream::unfold((incoming, bus, sender, self_id), |(mut incoming, bus, sender, self_id)| async move {
// loop {
// let (_, recv) = match incoming.next().await? {
// Ok(recv) => recv,
// Err(err) => {
// println!("error: {}", err);
// return None;
// }
// };
// if buff.is_empty() {
// println!("PONG");
// continue;
// }
// let mut reader = &buff[4..];
// let version = reader.get_u16();
// let content_type = reader.get_u16();
// let body_size = reader.get_u64();
// println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size);
// let event = match content_type {
// 0 => { // CBOR
// let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap();
// match proto.deserialize(&bus).unwrap() {
// ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
// ProtocolItem::Action(action) => {
// match action {
// Action::Close => {
// println!("warning: Close recevied - ignoring!");
// sender.send(Some(ProtocolItem::Event(Event::Exited))).unwrap();
// },
// Action::Flush => {
// bus.flush().await;
// sender.send(Some(ProtocolItem::Event(Event::Flushed))).unwrap();
// },
// Action::Sync => {
// bus.sync().await;
// sender.send(Some(ProtocolItem::Event(Event::Synchronized(Ok(()))))).unwrap();
// },
// Action::Init(..) => (),
// Action::Stats => (),
// _ => (),
// }
// continue;
// }
// ProtocolItem::Send(mid, msg, req) => {
// if req {
// let res = bus.request_boxed(
// msg.upcast_box(),
// SendOptions::Except(self_id.load(Ordering::SeqCst))
// )
// .await
// .map(|x|x.as_shared_boxed().unwrap())
// .map_err(|x|x.map_msg(|_|()));
// sender.send(Some(ProtocolItem::Event(Event::Response(mid, res)))).unwrap();
// } else {
// let _ = bus.send_boxed(msg.upcast_box(), Default::default())
// .await;
// }
// continue;
// }
// _ => unimplemented!()
// }
// },
// _ => unimplemented!()
// };
// let buff = recv
// .read_to_end(usize::max_value())
// .await
// .unwrap();
// return Some((event, (incoming, bus, sender, self_id)));
// }
// }).right_stream()
// }
// // assert_eq!(&buff[0..4], b"MBUS");
other => futures::stream::once(async move {
// if buff.is_empty() {
// println!("PONG");
// continue;
// }
// let mut reader = &buff[4..];
// let version = reader.get_u16();
// let content_type = reader.get_u16();
// let body_size = reader.get_u64();
// println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size);
// let event = match content_type {
// 0 => { // CBOR
// let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap();
// match proto.deserialize(&bus).unwrap() {
// ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
// ProtocolItem::Action(action) => {
// match action {
// Action::Close => {
// println!("warning: Close recevied - ignoring!");
// sender.send(Some(ProtocolItem::Event(Event::Exited))).unwrap();
// },
// Action::Flush => {
// bus.flush().await;
// sender.send(Some(ProtocolItem::Event(Event::Flushed))).unwrap();
// },
// Action::Sync => {
// bus.sync().await;
// sender.send(Some(ProtocolItem::Event(Event::Synchronized(Ok(()))))).unwrap();
// },
// Action::Init(..) => (),
// Action::Stats => (),
// _ => (),
// }
// continue;
// }
// ProtocolItem::Send(mid, msg, req) => {
// if req {
// let res = bus.request_boxed(
// msg.upcast_box(),
// SendOptions::Except(self_id.load(Ordering::SeqCst))
// )
// .await
// .map(|x|x.as_shared_boxed().unwrap())
// .map_err(|x|x.map_msg(|_|()));
// sender.send(Some(ProtocolItem::Event(Event::Response(mid, res)))).unwrap();
// } else {
// let _ = bus.send_boxed(msg.upcast_box(), Default::default())
// .await;
// }
// continue;
// }
// _ => unimplemented!()
// }
// },
// _ => unimplemented!()
// };
// return Some((event, (incoming, bus, sender, self_id)));
// }
// }).right_stream()
// }
other => futures::stream::once(async move {
match other {
RecvDo::Pause => Event::Pause,
RecvDo::Ready => Event::Ready,
RecvDo::Closed => Event::Exited,
_ => unreachable!()
}
})
// .left_stream()
_ => unreachable!(),
}
}), // .left_stream()
}
})
.flatten()
.flatten(),
)
}
}
}

View File

@ -8,7 +8,7 @@ use futures::{pin_mut, Stream, StreamExt};
use messagebus::error::GenericError;
use messagebus::{
Action, Bus, Event, EventBoxed, Message, ReciveUntypedReceiver, SendOptions,
SendUntypedReceiver, TypeTag, TypeTagAccept,
SendUntypedReceiver, TypeTag, TypeTagAccept, TypeTagAcceptItem,
};
use parking_lot::Mutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -121,7 +121,7 @@ impl From<TcpStream> for TcpRelayConnection {
}
impl TypeTagAccept for TcpRelay {
fn iter_types(&self) -> Box<dyn Iterator<Item = (TypeTag, Option<(TypeTag, TypeTag)>)> + '_> {
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem> + '_> {
let iter = self.in_table.iter_types();
Box::new(iter.map(|(x, y)| (x.clone(), y.cloned())))
}
@ -294,7 +294,7 @@ impl ReciveUntypedReceiver for TcpRelay {
}
let mut reader = &buff[..];
let version = reader.get_u16();
let _version = reader.get_u16();
let content_type = reader.get_u16();
let body_size = reader.get_u64();
@ -427,4 +427,3 @@ impl ReciveUntypedReceiver for TcpRelay {
)
}
}

View File

@ -1,14 +1,26 @@
use core::{marker::PhantomData, pin::Pin};
use std::{collections::HashSet, sync::{Arc, atomic::{AtomicU64, Ordering}}};
use std::{
collections::HashSet,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use futures::{Future, FutureExt};
use tokio::sync::Mutex;
use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, Relay, SynchronizedHandler, Untyped, error::StdSyncSendError, receiver::{
use crate::{
error::StdSyncSendError,
receiver::{
BusPollerCallback, Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
UntypedPollerCallback,
}, receivers};
},
receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler,
AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler,
Message, Relay, SynchronizedHandler, Untyped,
};
static RECEVIER_ID_SEQ: AtomicU64 = AtomicU64::new(1);
@ -70,7 +82,12 @@ impl<T, F, P, B> RegisterEntry<UnsyncEntry, T, F, P, B> {
{
let (inner, poller) = S::build(cfg);
let receiver = Receiver::new::<M, R, E, S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), queue, true, inner);
let receiver = Receiver::new::<M, R, E, S>(
RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed),
queue,
true,
inner,
);
let poller2 = receiver.start_polling();
self.receivers.insert(receiver);
self.pollers.push(poller(self.item.clone()));
@ -139,7 +156,12 @@ impl<T, F, P, B> RegisterEntry<SyncEntry, T, F, P, B> {
{
let (inner, poller) = S::build(cfg);
let receiver = Receiver::new::<M, R, E, S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), queue, true, inner);
let receiver = Receiver::new::<M, R, E, S>(
RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed),
queue,
true,
inner,
);
let poller2 = receiver.start_polling();
self.receivers.insert(receiver);
self.pollers.push(poller(self.item.clone()));
@ -213,7 +235,8 @@ impl Module {
}
pub fn register_relay<S: Relay + Send + Sync + 'static>(mut self, inner: S) -> Self {
let receiver = Receiver::new_relay::<S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), inner);
let receiver =
Receiver::new_relay::<S>(RECEVIER_ID_SEQ.fetch_add(1, Ordering::Relaxed), inner);
self.pollings.push(receiver.start_polling());
self.receivers.insert(receiver);
@ -233,7 +256,9 @@ impl Module {
RegisterEntry {
item: Arc::new(item) as Untyped,
payload: self,
builder: |p: &mut Self, r| { p.receivers.insert(r); },
builder: |p: &mut Self, r| {
p.receivers.insert(r);
},
poller: |p: &mut Self, poller| p.pollings.push(poller),
receivers: HashSet::new(),
pollers: Vec::new(),
@ -256,7 +281,9 @@ impl Module {
RegisterEntry {
item,
payload: self,
builder: |p: &mut Self, r| { p.receivers.insert(r); },
builder: |p: &mut Self, r| {
p.receivers.insert(r);
},
poller: |p: &mut Self, poller| p.pollings.push(poller),
receivers: HashSet::new(),
pollers: Vec::new(),
@ -302,7 +329,9 @@ impl BusBuilder {
RegisterEntry {
item: Arc::new(item) as Untyped,
payload: self,
builder: |p: &mut Self, r| { p.inner.receivers.insert(r); },
builder: |p: &mut Self, r| {
p.inner.receivers.insert(r);
},
poller: |p: &mut Self, poller| p.inner.pollings.push(poller),
receivers: HashSet::new(),
pollers: Vec::new(),
@ -323,7 +352,9 @@ impl BusBuilder {
RegisterEntry {
item: Arc::new(Mutex::new(item)) as Untyped,
payload: self,
builder: |p: &mut Self, r| { p.inner.receivers.insert(r); },
builder: |p: &mut Self, r| {
p.inner.receivers.insert(r);
},
poller: |p: &mut Self, poller| p.inner.pollings.push(poller),
receivers: HashSet::new(),
pollers: Vec::new(),

View File

@ -34,7 +34,9 @@ pub trait Message: MessageBounds {
fn try_clone_into(&self, into: &mut dyn Any) -> bool;
fn try_clone_boxed(&self) -> Option<Box<dyn Message>>;
fn try_clone(&self) -> Option<Self> where Self: Sized;
fn try_clone(&self) -> Option<Self>
where
Self: Sized;
}
macro_rules! gen_impls {
@ -183,7 +185,6 @@ impl<T: Message + serde::Serialize> IntoSharedMessage for T {
}
}
pub trait SharedMessage: Message + erased_serde::Serialize {
fn upcast_arc(self: Arc<Self>) -> Arc<dyn Message>;
fn upcast_box(self: Box<Self>) -> Box<dyn Message>;
@ -191,10 +192,18 @@ pub trait SharedMessage: Message + erased_serde::Serialize {
fn upcast_mut(&mut self) -> &mut dyn Message;
}
impl<T: Message + erased_serde::Serialize> SharedMessage for T {
fn upcast_arc(self: Arc<Self>) -> Arc<dyn Message> { self }
fn upcast_box(self: Box<Self>) -> Box<dyn Message> { self }
fn upcast_ref(&self) -> &dyn Message { self }
fn upcast_mut(&mut self) -> &mut dyn Message { self }
fn upcast_arc(self: Arc<Self>) -> Arc<dyn Message> {
self
}
fn upcast_box(self: Box<Self>) -> Box<dyn Message> {
self
}
fn upcast_ref(&self) -> &dyn Message {
self
}
fn upcast_mut(&mut self) -> &mut dyn Message {
self
}
}
// pub trait IntoTakeable {

View File

@ -58,8 +58,8 @@ impl TypeTagged for GenericError {
fn type_name(&self) -> TypeTag {
type_name::<GenericError>().into()
}
fn type_layout(&self) -> std::alloc::Layout {
std::alloc::Layout::for_value(self)
fn type_layout(&self) -> std::alloc::Layout {
std::alloc::Layout::for_value(self)
}
}

View File

@ -46,7 +46,7 @@ pub use envelop::{IntoBoxedMessage, Message, MessageBounds, SharedMessage, TypeT
pub use handler::*;
pub use receiver::{
Action, Event, EventBoxed, ReciveTypedReceiver, ReciveUntypedReceiver, SendTypedReceiver,
SendUntypedReceiver, TypeTagAccept,
SendUntypedReceiver, TypeTagAccept, TypeTagAcceptItem,
};
pub use relay::Relay;
pub use type_tag::{deserialize_shared_message, register_shared_message};
@ -162,7 +162,6 @@ impl Bus {
}
pub async fn flush_all(&self) {
let _handle = self.inner.maintain.lock().await;
let fuse_count = 32i32;
let mut breaked = false;
let mut iters = 0usize;
@ -194,7 +193,6 @@ impl Bus {
}
pub async fn flush<M: Message>(&self) {
let _handle = self.inner.maintain.lock().await;
let fuse_count = 32i32;
let mut breaked = false;
let mut iters = 0usize;
@ -229,7 +227,6 @@ impl Bus {
}
pub async fn flush2<M1: Message, M2: Message>(&self) {
let _handle = self.inner.maintain.lock().await;
let fuse_count = 32i32;
let mut breaked = false;
let mut iters = 0usize;

View File

@ -87,8 +87,9 @@ pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Syn
fn response(&self, mid: u64, resp: Result<R, Error<(), E>>) -> Result<Option<R>, Error>;
}
pub type TypeTagAcceptItem = (TypeTag, Option<(TypeTag, TypeTag)>);
pub trait TypeTagAccept {
fn iter_types(&self) -> Box<dyn Iterator<Item = (TypeTag, Option<(TypeTag, TypeTag)>)> + '_>;
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem> + '_>;
fn accept_msg(&self, msg: &TypeTag) -> bool;
fn accept_req(&self, req: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool;
}
@ -119,9 +120,11 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync {
fn sync_notify(&self) -> &Notify;
fn flush_notify(&self) -> &Notify;
fn ready_notify(&self) -> &Notify;
fn idle_notify(&self) -> &Notify;
fn is_init_sent(&self) -> bool;
fn is_ready(&self) -> bool;
fn is_idling(&self) -> bool;
fn need_flush(&self) -> bool;
fn set_need_flush(&self);
@ -165,6 +168,8 @@ pub enum Event<M, E: StdSyncSendError> {
Exited,
Ready,
Pause,
IdleBegin,
IdleEnd,
}
impl<M, E: StdSyncSendError> Event<M, E> {
@ -180,6 +185,8 @@ impl<M, E: StdSyncSendError> Event<M, E> {
Event::Exited => Event::Exited,
Event::Ready => Event::Ready,
Event::Pause => Event::Pause,
Event::IdleBegin => Event::IdleBegin,
Event::IdleEnd => Event::IdleEnd,
}
}
}
@ -239,7 +246,7 @@ where
}
Event::Flushed => {
self.context.need_flush.store(false, Ordering::SeqCst);
self.context.flushed.notify_waiters();
self.context.flushed.notify_one();
}
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
@ -268,6 +275,15 @@ where
}
}
Event::IdleBegin => {
self.context.idling_flag.store(true, Ordering::SeqCst);
self.context.idle.notify_waiters();
}
Event::IdleEnd => {
self.context.idling_flag.store(false, Ordering::SeqCst);
}
_ => unimplemented!(),
}
}
@ -350,7 +366,7 @@ where
E: StdSyncSendError,
S: ReciveTypedReceiver<R, E> + Send + Sync + 'static,
{
fn iter_types(&self) -> Box<dyn Iterator<Item = (TypeTag, Option<(TypeTag, TypeTag)>)> + '_> {
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem> + '_> {
Box::new(std::iter::once((
M::type_tag_(),
Some((R::type_tag_(), E::type_tag_())),
@ -385,10 +401,6 @@ where
E: StdSyncSendError,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{
fn id(&self) -> u64 {
self.id
}
fn name(&self) -> &str {
std::any::type_name::<S>()
}
@ -401,6 +413,10 @@ where
Some(AnyWrapperRef::new(self))
}
fn id(&self) -> u64 {
self.id
}
fn send_boxed(
&self,
mid: u64,
@ -417,6 +433,16 @@ where
.map_err(|err| err.map_msg(|m| m.into_boxed()))
}
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<u64, Error> {
Ok(self
.waiters
.insert(Waiter::Boxed(listener))
.ok_or(Error::AddListenerError)? as _)
}
fn stats(&self) -> Stats {
Stats {
msg_type_tag: M::type_tag_(),
@ -435,10 +461,6 @@ where
SendUntypedReceiver::send(&self.inner, action, bus)
}
fn set_need_flush(&self) {
self.context.need_flush.store(true, Ordering::SeqCst);
}
fn close_notify(&self) -> &Notify {
&self.context.closed
}
@ -451,32 +473,34 @@ where
&self.context.flushed
}
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<u64, Error> {
Ok(self
.waiters
.insert(Waiter::Boxed(listener))
.ok_or(Error::AddListenerError)? as _)
}
fn ready_notify(&self) -> &Notify {
&self.context.ready
}
fn is_ready(&self) -> bool {
self.context.ready_flag.load(Ordering::SeqCst)
fn idle_notify(&self) -> &Notify {
&self.context.idle
}
fn is_init_sent(&self) -> bool {
self.context.init_sent.load(Ordering::SeqCst)
}
fn is_ready(&self) -> bool {
self.context.ready_flag.load(Ordering::SeqCst)
}
fn is_idling(&self) -> bool {
self.context.idling_flag.load(Ordering::SeqCst)
}
fn need_flush(&self) -> bool {
self.context.need_flush.load(Ordering::SeqCst)
}
fn set_need_flush(&self) {
self.context.need_flush.store(true, Ordering::SeqCst);
}
fn try_reserve(&self, _: &TypeTag) -> Option<Permit> {
loop {
let count = self.context.processing.load(Ordering::Relaxed);
@ -506,13 +530,13 @@ where
self.context.response.clone()
}
fn start_polling(self: Arc<Self>) -> BusPollerCallback {
self.start_polling_events()
}
fn increment_processing(&self, _tt: &TypeTag) {
self.context.processing.fetch_add(1, Ordering::SeqCst);
}
fn start_polling(self: Arc<Self>) -> BusPollerCallback {
self.start_polling_events()
}
}
pub struct Permit {
@ -691,10 +715,12 @@ struct ReceiverContext {
processing: AtomicI64,
need_flush: AtomicBool,
ready_flag: AtomicBool,
idling_flag: AtomicBool,
flushed: Notify,
synchronized: Notify,
closed: Notify,
ready: Notify,
idle: Notify,
response: Arc<Notify>,
init_sent: AtomicBool,
resend_unused_resp: bool,
@ -758,11 +784,13 @@ impl Receiver {
processing: AtomicI64::new(0),
need_flush: AtomicBool::new(false),
ready_flag: AtomicBool::new(false),
idling_flag: AtomicBool::new(true),
init_sent: AtomicBool::new(false),
flushed: Notify::new(),
synchronized: Notify::new(),
closed: Notify::new(),
ready: Notify::new(),
idle: Notify::new(),
response: Arc::new(Notify::new()),
resend_unused_resp: resend,
}),
@ -1020,6 +1048,13 @@ impl Receiver {
}
}
#[inline]
pub async fn idle(&self) {
if !self.inner.is_idling() {
self.inner.idle_notify().notified().await;
}
}
#[inline]
pub async fn ready(&self) {
let notify = self.inner.ready_notify().notified();
@ -1052,6 +1087,8 @@ impl Receiver {
#[inline]
pub async fn flush(&self, bus: &Bus) {
self.idle().await;
let notify = self.inner.flush_notify().notified();
if self.inner.send_action(bus, Action::Flush).is_ok() {

View File

@ -123,7 +123,9 @@ where
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -46,10 +46,45 @@ macro_rules! buffer_unordered_poller_macro {
R: Message,
E: StdSyncSendError,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<$t>().unwrap();
let semaphore = Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel));
while let Some(msg) = rx.recv().await {
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
match msg {
Request::Request(mid, msg, _req) => {
#[allow(clippy::redundant_closure_call)]
@ -62,12 +97,10 @@ macro_rules! buffer_unordered_poller_macro {
semaphore.clone().acquire_owned().await,
);
}
Request::Action(Action::Init(..)) => {
stx.send(Event::Ready).unwrap();
}
Request::Action(Action::Close) => {
rx.close();
}
Request::Action(Action::Init(..)) => stx.send(Event::Ready).unwrap(),
Request::Action(Action::Close) => rx.close(),
Request::Action(Action::Flush) => {
let _ = semaphore.acquire_many(cfg.max_parallel as _).await;
stx.send(Event::Flushed).unwrap();

View File

@ -126,7 +126,9 @@ where
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -124,7 +124,9 @@ where
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -51,13 +51,47 @@ macro_rules! buffer_unordered_batch_poller_macro {
M: Message,
R: Message,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<$t>().unwrap();
let semaphore = Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel));
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let semaphore = semaphore.clone();

View File

@ -130,7 +130,9 @@ where
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -1,5 +1,6 @@
mod buffer_unordered;
mod buffer_unordered_batched;
// mod producer;
mod synchronize_batched;
mod synchronized;
@ -13,6 +14,8 @@ pub use synchronize_batched::{
SynchronizedBatchedAsync, SynchronizedBatchedConfig, SynchronizedBatchedSync,
};
// pub use producer::{AsyncProducer, AsyncProducerConfig};
use crate::receiver::Action;
#[macro_export]

View File

@ -0,0 +1,147 @@
use std::pin::Pin;
use futures::{pin_mut, Future, Stream};
use tokio::sync::{mpsc, Mutex};
use crate::builder::ReceiverSubscriberBuilder;
use crate::error::{Error, StdSyncSendError};
use crate::handler::AsyncProducer as AsyncProducerHandler;
use crate::receiver::UntypedPollerCallback;
use crate::receivers::Request;
use crate::{
Action, Bus, Event, Message, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
Untyped,
};
#[derive(Default)]
pub struct AsyncProducerConfig {}
async fn producer_poller<T, M>(
mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus,
ut: Untyped,
stx: mpsc::UnboundedSender<Event<T::Response, T::Error>>,
) where
T: AsyncProducerHandler<M> + 'static,
T::Error: StdSyncSendError,
T::Item: Message,
T::Response: Message,
M: Message,
{
let ut = ut.downcast::<Mutex<T>>().unwrap();
let stream = Option<Pin<Box<dyn Stream<Item = Result<Self::Item, Self::Error>> + Send + '_>>, Self::Error>;
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) => {
let lock = ut.lock().await;
let stream = lock.producer(msg, &bus).await.unwrap();
pin_mut!(stream);
stx.send(Event::BatchComplete(M::type_tag_(), 1)).unwrap();
}
Request::Action(Action::Init(..)) => {
stx.send(Event::Ready).unwrap();
}
Request::Action(Action::Close) => {
rx.close();
}
Request::Action(Action::Flush) => {
stx.send(Event::Flushed).unwrap();
}
Request::Action(Action::Sync) => {}
_ => unimplemented!(),
}
}
}
#[derive(Debug)]
pub struct AsyncProducer<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M> ReceiverSubscriberBuilder<T, M, T::Response, T::Error>
for AsyncProducer<M, T::Response, T::Error>
where
T: AsyncProducerHandler<M> + 'static,
T::Item: Message,
T::Response: Message,
T::Error: StdSyncSendError,
M: Message,
{
type Config = AsyncProducerConfig;
fn build(_cfg: Self::Config) -> (Self, UntypedPollerCallback) {
let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
let poller = Box::new(move |ut| {
Box::new(move |bus| {
Box::pin(producer_poller::<T, M>(rx, bus, ut, stx))
as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
});
(
AsyncProducer::<M, T::Response, T::Error> {
tx,
srx: parking_lot::Mutex::new(Some(srx)),
},
poller,
)
}
}
impl<M, R, E> SendUntypedReceiver for AsyncProducer<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
fn send(&self, m: Action, _bus: &Bus) -> Result<(), Error<Action>> {
match self.tx.send(Request::Action(m)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Action(msg))) => Err(Error::send_closed(msg)),
_ => unimplemented!(),
}
}
}
impl<M, R, E> SendTypedReceiver<M> for AsyncProducer<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}
}
impl<M, R, E> ReciveTypedReceiver<R, E> for AsyncProducer<M, R, E>
where
M: Message,
R: Message,
E: StdSyncSendError,
{
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self, _: Bus) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -97,7 +97,9 @@ where
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -48,12 +48,46 @@ macro_rules! batch_synchronized_poller_macro {
M: Message,
R: Message,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let stx = stx.clone();

View File

@ -101,7 +101,9 @@ where
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -38,9 +38,42 @@ macro_rules! synchronized_poller_macro {
M: Message,
R: Message,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) =>
{

View File

@ -98,7 +98,9 @@ where
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(),
}
}

View File

@ -5,7 +5,7 @@ use crate::{
SendUntypedReceiver, TypeTagAccept,
},
stats::Stats,
Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag,
Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag, TypeTagAcceptItem,
};
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use dashmap::DashMap;
@ -27,11 +27,13 @@ pub(crate) struct RelayContext {
receivers: DashMap<TypeTag, Arc<RelayReceiverContext>>,
need_flush: AtomicBool,
ready_flag: AtomicBool,
idling_flag: AtomicBool,
init_sent: AtomicBool,
flushed: Notify,
synchronized: Notify,
closed: Notify,
ready: Notify,
idle: Notify,
}
pub struct RelayReceiverContext {
@ -74,11 +76,13 @@ impl<S> RelayWrapper<S> {
receivers: DashMap::new(),
need_flush: AtomicBool::new(false),
ready_flag: AtomicBool::new(false),
idling_flag: AtomicBool::new(true),
init_sent: AtomicBool::new(false),
flushed: Notify::new(),
synchronized: Notify::new(),
closed: Notify::new(),
ready: Notify::new(),
idle: Notify::new(),
}),
waiters: sharded_slab::Slab::new_with_config::<SlabCfg>(),
}
@ -89,7 +93,7 @@ impl<S> TypeTagAccept for RelayWrapper<S>
where
S: Relay + Send + Sync + 'static,
{
fn iter_types(&self) -> Box<dyn Iterator<Item = (TypeTag, Option<(TypeTag, TypeTag)>)> + '_> {
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem> + '_> {
self.inner.iter_types()
}
@ -106,10 +110,6 @@ impl<S> ReceiverTrait for RelayWrapper<S>
where
S: Relay + Send + Sync + 'static,
{
fn id(&self) -> u64 {
self.id
}
fn name(&self) -> &str {
std::any::type_name::<Self>()
}
@ -117,9 +117,13 @@ where
fn typed(&self) -> Option<AnyReceiver<'_>> {
None
}
fn wrapper(&self) -> Option<AnyWrapperRef<'_>> {
None
}
fn id(&self) -> u64 {
self.id
}
fn send_boxed(
&self,
@ -131,12 +135,14 @@ where
self.inner.send_msg(mid, boxed_msg, req, bus)
}
fn need_flush(&self) -> bool {
self.context.need_flush.load(Ordering::SeqCst)
}
fn set_need_flush(&self) {
self.context.need_flush.store(true, Ordering::SeqCst);
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<u64, Error> {
Ok(self
.waiters
.insert(listener)
.ok_or(Error::AddListenerError)? as _)
}
fn stats(&self) -> Stats {
@ -159,14 +165,32 @@ where
&self.context.flushed
}
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error>>,
) -> Result<u64, Error> {
Ok(self
.waiters
.insert(listener)
.ok_or(Error::AddListenerError)? as _)
fn ready_notify(&self) -> &Notify {
&self.context.ready
}
fn idle_notify(&self) -> &Notify {
&self.context.idle
}
fn is_init_sent(&self) -> bool {
self.context.init_sent.load(Ordering::SeqCst)
}
fn is_ready(&self) -> bool {
self.context.ready_flag.load(Ordering::SeqCst)
}
fn is_idling(&self) -> bool {
self.context.idling_flag.load(Ordering::SeqCst)
}
fn need_flush(&self) -> bool {
self.context.need_flush.load(Ordering::SeqCst)
}
fn set_need_flush(&self) {
self.context.need_flush.store(true, Ordering::SeqCst);
}
fn try_reserve(&self, tt: &TypeTag) -> Option<Permit> {
@ -211,18 +235,6 @@ where
self.context.receivers.get(tt).unwrap().response.clone()
}
fn ready_notify(&self) -> &Notify {
&self.context.ready
}
fn is_ready(&self) -> bool {
self.context.ready_flag.load(Ordering::SeqCst)
}
fn is_init_sent(&self) -> bool {
self.context.init_sent.load(Ordering::SeqCst)
}
fn increment_processing(&self, tt: &TypeTag) {
self.context
.receivers
@ -264,7 +276,7 @@ where
}
Event::Flushed => {
self.context.need_flush.store(false, Ordering::SeqCst);
self.context.flushed.notify_waiters()
self.context.flushed.notify_one()
}
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
@ -293,13 +305,21 @@ where
Event::BatchComplete(tt, n) => {
if let Some(ctx) = self.context.receivers.get(&tt) {
ctx.processing.fetch_sub(n, Ordering::SeqCst);
for _ in 0..n {
ctx.response.notify_one();
}
}
}
Event::IdleBegin => {
self.context.idling_flag.store(true, Ordering::SeqCst);
self.context.idle.notify_waiters();
}
Event::IdleEnd => {
self.context.idling_flag.store(false, Ordering::SeqCst);
}
_ => unimplemented!(),
}
}

View File

@ -1,5 +1,5 @@
use std::collections::HashMap;
use parking_lot::RwLock;
use std::collections::HashMap;
use crate::envelop::IntoSharedMessage;
use crate::error::Error;
@ -35,18 +35,16 @@ pub struct TypeRegistry {
impl TypeRegistry {
pub const fn new() -> Self {
Self {
message_types: parking_lot::const_rwlock(None)
message_types: parking_lot::const_rwlock(None),
}
}
}
pub fn deserialize(
&self,
tt: TypeTag,
de: &mut dyn erased_serde::Deserializer<'_>,
) -> Result<Box<dyn SharedMessage>, Error<Box<dyn Message>>> {
let guard = self
.message_types
.read();
let guard = self.message_types.read();
let md = guard
.as_ref()
.ok_or_else(|| Error::TypeTagNotRegistered(tt.clone()))?
@ -59,8 +57,9 @@ impl TypeRegistry {
pub fn register<M: Message + serde::Serialize + serde::de::DeserializeOwned>(&self) {
println!("insert {}", M::type_tag_());
self.message_types.write()
self.message_types
.write()
.get_or_insert_with(HashMap::new)
.insert(
M::type_tag_(),
@ -72,11 +71,14 @@ impl TypeRegistry {
}
#[inline]
pub fn deserialize_shared_message(tt: TypeTag, de: &mut dyn erased_serde::Deserializer<'_>) -> Result<Box<dyn SharedMessage>, Error<Box<dyn Message>>> {
pub fn deserialize_shared_message(
tt: TypeTag,
de: &mut dyn erased_serde::Deserializer<'_>,
) -> Result<Box<dyn SharedMessage>, Error<Box<dyn Message>>> {
TYPE_REGISTRY.deserialize(tt, de)
}
#[inline]
pub fn register_shared_message<M: Message + serde::Serialize + serde::de::DeserializeOwned>() {
TYPE_REGISTRY.register::<M>();
}
}

71
tests/test_idle.rs Normal file
View File

@ -0,0 +1,71 @@
use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error, receivers, AsyncHandler, Bus, Message,
};
use thiserror::Error;
#[derive(Debug, Error, MbError)]
enum Error {
#[error("Error({0})")]
Error(anyhow::Error),
}
impl<M: Message> From<error::Error<M>> for Error {
fn from(err: error::Error<M>) -> Self {
Self::Error(err.into())
}
}
#[derive(Debug, Clone, Message)]
struct MsgF32(pub f32);
#[derive(Debug, Clone, Message)]
struct MsgF64(pub f64);
struct TmpReceiver;
#[async_trait]
impl AsyncHandler<MsgF64> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgF64, _bus: &Bus) -> Result<Self::Response, Self::Error> {
std::thread::sleep(std::time::Duration::from_millis(100));
Ok(())
}
}
#[async_trait]
impl AsyncHandler<MsgF32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, _msg: MsgF32, bus: &Bus) -> Result<Self::Response, Self::Error> {
bus.send(MsgF64(12.0)).await.unwrap();
bus.flush::<MsgF64>().await;
Ok(())
}
}
#[tokio::test]
async fn test_backpressure() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe_async::<MsgF32>(
1,
receivers::BufferUnorderedConfig {
buffer_size: 1,
max_parallel: 1,
},
)
.done()
.build();
b.send(MsgF32(10.0)).await.unwrap();
// b.idle_all().await;
b.flush_all().await;
b.close().await;
poller.await;
}

View File

@ -6,7 +6,7 @@ use messagebus::{
derive::{Error as MbError, Message},
error::{self, GenericError},
receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUntypedReceiver,
SendUntypedReceiver, TypeTag, TypeTagAccept, TypeTagged,
SendUntypedReceiver, TypeTagAccept, TypeTagAcceptItem, TypeTagged,
};
use parking_lot::Mutex;
use thiserror::Error;
@ -93,7 +93,7 @@ impl TypeTagAccept for TestRelay {
false
}
fn iter_types(&self) -> Box<dyn Iterator<Item = (TypeTag, Option<(TypeTag, TypeTag)>)>> {
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem>> {
Box::new(
std::iter::once((Msg::<i32>::type_tag_(), None))
.chain(std::iter::once((