2021-09-20 19:31:06 +04:00
|
|
|
use std::pin::Pin;
|
2021-07-30 16:58:30 +04:00
|
|
|
|
|
|
|
use async_trait::async_trait;
|
2021-09-20 19:31:06 +04:00
|
|
|
use futures::Stream;
|
2021-11-15 20:01:28 +04:00
|
|
|
use messagebus::{
|
|
|
|
derive::{Error as MbError, Message},
|
|
|
|
error::{self, GenericError},
|
|
|
|
receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUntypedReceiver,
|
2021-12-03 15:30:13 +04:00
|
|
|
SendUntypedReceiver, TypeTagAccept, TypeTagAcceptItem, TypeTagged,
|
2021-11-15 20:01:28 +04:00
|
|
|
};
|
2021-07-30 16:58:30 +04:00
|
|
|
use parking_lot::Mutex;
|
|
|
|
use thiserror::Error;
|
|
|
|
use tokio::sync::mpsc;
|
|
|
|
|
|
|
|
#[derive(Debug, Error, MbError)]
|
|
|
|
enum Error {
|
|
|
|
#[error("Error({0})")]
|
|
|
|
Error(anyhow::Error),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M: Message> From<error::Error<M>> for Error {
|
|
|
|
fn from(err: error::Error<M>) -> Self {
|
|
|
|
Self::Error(err.into())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Message)]
|
|
|
|
pub struct Msg<F: MessageBounds + Clone>(pub F);
|
|
|
|
|
|
|
|
struct TmpReceiver;
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
impl AsyncHandler<Msg<i32>> for TmpReceiver {
|
|
|
|
type Error = Error;
|
|
|
|
type Response = ();
|
|
|
|
|
|
|
|
async fn handle(&self, msg: Msg<i32>, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
|
|
|
println!("TmpReceiver::handle {:?}", msg);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
|
|
|
|
println!("TmpReceiver::sync");
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-21 14:47:21 +04:00
|
|
|
pub type TestRelayRxChannelCell =
|
|
|
|
Mutex<Option<mpsc::UnboundedReceiver<Event<Box<dyn Message>, GenericError>>>>;
|
|
|
|
pub type TestRelayRxStream =
|
|
|
|
Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, error::GenericError>> + Send>>;
|
|
|
|
|
2021-07-30 16:58:30 +04:00
|
|
|
pub struct TestRelay {
|
|
|
|
stx: mpsc::UnboundedSender<Event<Box<dyn Message>, GenericError>>,
|
2021-09-21 14:47:21 +04:00
|
|
|
srx: TestRelayRxChannelCell,
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
impl TypeTagAccept for TestRelay {
|
2021-10-08 13:03:24 +04:00
|
|
|
fn accept_req(
|
2021-07-30 16:58:30 +04:00
|
|
|
&self,
|
|
|
|
msg: &messagebus::TypeTag,
|
|
|
|
resp: Option<&messagebus::TypeTag>,
|
2021-08-02 18:08:41 +04:00
|
|
|
_err: Option<&messagebus::TypeTag>,
|
2021-07-30 16:58:30 +04:00
|
|
|
) -> bool {
|
|
|
|
if msg.as_ref() == Msg::<i16>::type_tag_().as_ref() {
|
|
|
|
if let Some(resp) = resp {
|
2021-10-08 13:03:24 +04:00
|
|
|
if resp.as_ref() == Msg::<u8>::type_tag_().as_ref() {
|
2021-11-15 20:01:28 +04:00
|
|
|
return true;
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
2021-10-08 13:03:24 +04:00
|
|
|
} else {
|
2021-11-15 20:01:28 +04:00
|
|
|
return true;
|
|
|
|
}
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
if msg.as_ref() == Msg::<i32>::type_tag_().as_ref() {
|
|
|
|
if let Some(resp) = resp {
|
2021-10-08 13:03:24 +04:00
|
|
|
if resp.as_ref() == Msg::<u64>::type_tag_().as_ref() {
|
2021-11-15 20:01:28 +04:00
|
|
|
return true;
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
2021-10-08 13:03:24 +04:00
|
|
|
} else {
|
2021-11-15 20:01:28 +04:00
|
|
|
return true;
|
|
|
|
}
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
|
2021-10-08 13:03:24 +04:00
|
|
|
false
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
|
2021-11-15 20:01:28 +04:00
|
|
|
fn accept_msg(&self, msg: &messagebus::TypeTag) -> bool {
|
2021-10-08 13:03:24 +04:00
|
|
|
if msg.as_ref() == Msg::<i32>::type_tag_().as_ref() {
|
2021-11-15 20:01:28 +04:00
|
|
|
return true;
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
2021-10-08 13:03:24 +04:00
|
|
|
|
|
|
|
false
|
|
|
|
}
|
|
|
|
|
2021-12-03 15:30:13 +04:00
|
|
|
fn iter_types(&self) -> Box<dyn Iterator<Item = TypeTagAcceptItem>> {
|
2021-10-08 13:03:24 +04:00
|
|
|
Box::new(
|
|
|
|
std::iter::once((Msg::<i32>::type_tag_(), None))
|
2021-11-15 20:01:28 +04:00
|
|
|
.chain(std::iter::once((
|
|
|
|
Msg::<i32>::type_tag_(),
|
|
|
|
Some((Msg::<u64>::type_tag_(), GenericError::type_tag_())),
|
|
|
|
)))
|
|
|
|
.chain(std::iter::once((
|
|
|
|
Msg::<i16>::type_tag_(),
|
|
|
|
Some((Msg::<u8>::type_tag_(), GenericError::type_tag_())),
|
|
|
|
))),
|
2021-10-08 13:03:24 +04:00
|
|
|
)
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SendUntypedReceiver for TestRelay {
|
2021-09-22 17:07:21 +04:00
|
|
|
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), error::Error<Action>> {
|
2021-07-30 16:58:30 +04:00
|
|
|
match msg {
|
2021-09-24 19:46:23 +04:00
|
|
|
Action::Init(..) => {
|
2021-08-04 19:14:56 +04:00
|
|
|
self.stx.send(Event::Ready).unwrap();
|
|
|
|
}
|
2021-07-30 16:58:30 +04:00
|
|
|
Action::Close => {
|
|
|
|
self.stx.send(Event::Exited).unwrap();
|
|
|
|
}
|
|
|
|
Action::Flush => {
|
|
|
|
self.stx.send(Event::Flushed).unwrap();
|
|
|
|
}
|
|
|
|
Action::Sync => {
|
|
|
|
self.stx.send(Event::Synchronized(Ok(()))).unwrap();
|
|
|
|
}
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
|
|
|
|
println!("TestRelay::send {:?}", msg);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn send_msg(
|
|
|
|
&self,
|
|
|
|
mid: u64,
|
2021-08-09 18:27:25 +04:00
|
|
|
msg: Box<dyn Message>,
|
2021-10-08 13:03:24 +04:00
|
|
|
req: bool,
|
2021-08-04 19:14:56 +04:00
|
|
|
_bus: &Bus,
|
2021-09-22 17:07:21 +04:00
|
|
|
) -> Result<(), error::Error<Box<dyn Message>>> {
|
2021-07-30 16:58:30 +04:00
|
|
|
println!("TestRelay::send_msg [{}] {:?}", mid, msg);
|
|
|
|
if msg.type_tag().as_ref() == Msg::<i16>::type_tag_().as_ref() {
|
|
|
|
self.stx
|
|
|
|
.send(Event::Response(mid, Ok(Box::new(Msg(9u8)))))
|
|
|
|
.unwrap();
|
2021-10-08 13:03:24 +04:00
|
|
|
} else if msg.type_tag().as_ref() == Msg::<i32>::type_tag_().as_ref() {
|
|
|
|
if req {
|
|
|
|
self.stx
|
|
|
|
.send(Event::Response(mid, Ok(Box::new(Msg(22u64)))))
|
|
|
|
.unwrap();
|
|
|
|
} else {
|
|
|
|
self.stx
|
|
|
|
.send(Event::Response(mid, Ok(Box::new(()))))
|
|
|
|
.unwrap();
|
|
|
|
}
|
2021-07-30 16:58:30 +04:00
|
|
|
} else {
|
2021-10-08 13:03:24 +04:00
|
|
|
panic!("unsupported message type {}", msg.type_tag());
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-16 16:37:45 +04:00
|
|
|
impl ReciveUntypedReceiver for TestRelay {
|
2021-09-21 14:47:21 +04:00
|
|
|
type Stream = TestRelayRxStream;
|
2021-09-20 19:31:06 +04:00
|
|
|
|
2021-09-22 16:53:21 +04:00
|
|
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
2021-09-20 19:31:06 +04:00
|
|
|
let mut rx = self.srx.lock().take().unwrap();
|
|
|
|
|
|
|
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
2021-07-30 16:58:30 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_relay() {
|
|
|
|
let (stx, srx) = mpsc::unbounded_channel();
|
|
|
|
let relay = TestRelay {
|
|
|
|
stx,
|
2021-09-20 19:31:06 +04:00
|
|
|
srx: Mutex::new(Some(srx)),
|
2021-07-30 16:58:30 +04:00
|
|
|
};
|
|
|
|
|
|
|
|
let (b, poller) = Bus::build()
|
2021-08-02 18:08:41 +04:00
|
|
|
.register_relay(relay)
|
2021-07-30 16:58:30 +04:00
|
|
|
.register(TmpReceiver)
|
|
|
|
.subscribe_async::<Msg<i32>>(
|
|
|
|
1,
|
|
|
|
receivers::BufferUnorderedConfig {
|
|
|
|
buffer_size: 1,
|
|
|
|
max_parallel: 1,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.done()
|
|
|
|
.build();
|
|
|
|
|
|
|
|
b.send(Msg(32i32)).await.unwrap();
|
2021-10-08 13:03:24 +04:00
|
|
|
let res1: Msg<u8> = b.request(Msg(12i16), Default::default()).await.unwrap();
|
|
|
|
let res2: Msg<u64> = b.request(Msg(12i32), Default::default()).await.unwrap();
|
2021-07-30 16:58:30 +04:00
|
|
|
|
2021-10-08 13:03:24 +04:00
|
|
|
assert_eq!(res1.0, 9u8);
|
|
|
|
assert_eq!(res2.0, 22u64);
|
2021-07-30 16:58:30 +04:00
|
|
|
|
2021-11-15 20:01:28 +04:00
|
|
|
b.flush_all().await;
|
2021-07-30 16:58:30 +04:00
|
|
|
b.close().await;
|
|
|
|
poller.await;
|
|
|
|
}
|