diff --git a/examples/producer.rs b/examples/producer.rs index 53fdbf0..7912af6 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -13,7 +13,7 @@ use std::{ use futures::Future; use messagebus::{ bus::{Bus, MaskMatch}, - cell::MsgCell, + cell::{MessageCell, MsgCell}, error::Error, handler::{Handler, MessageProducer}, message::{Message, SharedMessage}, @@ -72,7 +72,7 @@ impl Message for Msg { None } - fn try_clone_into(&self, _into: &mut dyn Message) -> bool { + fn try_clone_into(&self, _into: &mut dyn MessageCell) -> bool { false } @@ -142,7 +142,7 @@ impl Message for StartMsg { None } - fn try_clone_into(&self, _into: &mut dyn Message) -> bool { + fn try_clone_into(&self, _into: &mut dyn MessageCell) -> bool { false } @@ -170,6 +170,7 @@ impl MessageProducer for Test { type Message = Msg; type NextFuture<'a> = impl Future> + 'a; type StartFuture<'a> = impl Future> + 'a; + type CloseFuture<'a> = impl Future> + 'a; fn start(&self, _msg: &mut MsgCell, _: &Bus) -> Self::StartFuture<'_> { async move { @@ -190,14 +191,19 @@ impl MessageProducer for Test { Ok(msg) } } + + fn close(&mut self) -> Self::CloseFuture<'_> { + async move { Ok(()) } + } } impl Handler for Test { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; type FlushFuture<'a> = impl Future> + 'a; + type CloseFuture<'a> = impl Future> + 'a; - fn handle(&self, msg: &mut MsgCell, bus: &Bus) -> Self::HandleFuture<'_> { + fn handle(&self, msg: &mut MsgCell, _bus: &Bus) -> Self::HandleFuture<'_> { let msg = msg.get(); async move { @@ -207,7 +213,11 @@ impl Handler for Test { } } - fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_> { + fn flush(&mut self, _bus: &Bus) -> Self::FlushFuture<'_> { + async move { Ok(()) } + } + + fn close(&mut self) -> Self::CloseFuture<'_> { async move { Ok(()) } } } diff --git a/examples/run.rs b/examples/run.rs index c826cc5..0a2a7a0 100644 --- a/examples/run.rs +++ b/examples/run.rs @@ -1,88 +1,20 @@ #![feature(type_alias_impl_trait)] -use std::{alloc::Layout, any::Any, sync::Arc}; +use std::sync::Arc; use futures::Future; use messagebus::{ bus::{Bus, MaskMatch}, cell::MsgCell, + derive_message_clone, error::Error, handler::Handler, - message::{Message, SharedMessage}, receivers::wrapper::HandlerWrapper, - type_tag::{TypeTag, TypeTagInfo}, }; #[derive(Debug, Clone)] struct Msg(pub u32); - -impl Message for Msg { - fn TYPE_TAG() -> TypeTag - where - Self: Sized, - { - TypeTagInfo::parse("demo::Msg").unwrap().into() - } - - fn type_tag(&self) -> TypeTag { - Msg::TYPE_TAG() - } - - fn type_layout(&self) -> Layout { - Layout::for_value(self) - } - - fn as_any_ref(&self) -> &dyn Any { - self - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } - - fn as_any_boxed(self: Box) -> Box { - self as _ - } - - fn as_any_arc(self: Arc) -> Arc { - self as _ - } - - fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { - None - } - - fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { - None - } - - fn as_shared_boxed(self: Box) -> Result, Box> { - Err(self) - } - - fn as_shared_arc(self: Arc) -> Option> { - None - } - - fn try_clone_into(&self, into: &mut dyn Message) -> bool { - false - } - - fn try_clone_boxed(&self) -> Option> { - None - } - - fn try_clone(&self) -> Option - where - Self: Sized, - { - Some(Self(self.0)) - } - - fn is_cloneable(&self) -> bool { - false - } -} +derive_message_clone!(EXAMPLE_MSG, Msg, "example::Msg"); struct Test { inner: u32, @@ -92,6 +24,7 @@ impl Handler for Test { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; type FlushFuture<'a> = impl Future> + 'a; + type CloseFuture<'a> = impl Future> + 'a; fn handle(&self, msg: &mut MsgCell, _bus: &Bus) -> Self::HandleFuture<'_> { let msg = msg.get(); @@ -106,6 +39,10 @@ impl Handler for Test { fn flush(&mut self, _bus: &Bus) -> Self::FlushFuture<'_> { async move { Ok(()) } } + + fn close(&mut self) -> Self::CloseFuture<'_> { + async move { Ok(()) } + } } async fn run() -> Result<(), Error> { diff --git a/src/handler.rs b/src/handler.rs index 4f638f0..ce8bfc9 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -12,8 +12,13 @@ pub trait Handler: Send + Sync { where Self: 'a; + type CloseFuture<'a>: Future> + Send + 'a + where + Self: 'a; + fn handle(&self, msg: &mut MsgCell, bus: &Bus) -> Self::HandleFuture<'_>; fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_>; + fn close(&mut self) -> Self::CloseFuture<'_>; } pub trait MessageProducer: Send + Sync { @@ -27,6 +32,11 @@ pub trait MessageProducer: Send + Sync { where Self: 'a; + type CloseFuture<'a>: Future> + Send + 'a + where + Self: 'a; + fn start(&self, msg: &mut MsgCell, bus: &Bus) -> Self::StartFuture<'_>; fn next(&self, bus: &Bus) -> Self::NextFuture<'_>; + fn close(&mut self) -> Self::CloseFuture<'_>; } diff --git a/src/receiver.rs b/src/receiver.rs index ee0be93..1bdb800 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -30,6 +30,9 @@ pub trait Receiver: Send + Sync { cx: &mut Context<'_>, bus: &Bus, ) -> Poll>; + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll>; + fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; } pub trait ReceiverEx: Receiver { @@ -43,6 +46,12 @@ pub trait ReceiverEx: Receiver { where Self: 'a; type ProcessFut<'a>: Future> + Send + 'a + where + Self: 'a; + type FlushFut<'a>: Future> + Send + 'a + where + Self: 'a; + type CloseFut<'a>: Future> + Send + 'a where Self: 'a; @@ -51,6 +60,8 @@ pub trait ReceiverEx: Receiver { fn request(&self, msg: MsgCell, bus: Bus) -> Self::RequestFut<'_>; fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_>; fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_>; + fn flush(&self, bus: Bus) -> Self::FlushFut<'_>; + fn close(&self) -> Self::CloseFut<'_>; } impl + Send + Sync + 'static> ReceiverEx for H { @@ -58,6 +69,8 @@ impl + Send + Sync + 'static> Receiver type RequestFut<'a> = impl Future> + Send + 'a; type ResultFut<'a> = impl Future> + Send + 'a; type ProcessFut<'a> = impl Future> + Send + 'a; + type FlushFut<'a> = impl Future> + Send + 'a; + type CloseFut<'a> = impl Future> + Send + 'a; fn try_send(&self, cell: &mut MsgCell, bus: &Bus) -> Result { match self.poll_send(cell, None, bus) { @@ -88,6 +101,14 @@ impl + Send + Sync + 'static> Receiver self.result(task, bus).await } } + + fn flush(&self, bus: Bus) -> Self::FlushFut<'_> { + poll_fn(move |cx| self.poll_flush(cx, &bus)) + } + + fn close(&self) -> Self::CloseFut<'_> { + poll_fn(move |cx| self.poll_close(cx)) + } } pub trait AbstractReceiver: Send + Sync + 'static { @@ -107,6 +128,9 @@ pub trait AbstractReceiver: Send + Sync + 'static { cx: &mut Context<'_>, bus: &Bus, ) -> Poll>; + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll>; + fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; } pub trait IntoAbstractReceiver { @@ -167,6 +191,14 @@ impl + Send + Sync + 'static> Abstract self.inner.poll_result(task, res, cx, bus) } + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll> { + self.inner.poll_flush(cx, bus) + } + + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) + } } impl dyn AbstractReceiver { @@ -230,4 +262,14 @@ impl dyn AbstractReceiver { let task = self.send(&mut cell, bus.clone()).await?; self.result(task, bus).await } + + #[inline] + pub async fn flush(&self, bus: Bus) -> Result<(), Error> { + poll_fn(move |cx| self.poll_flush(cx, &bus)).await + } + + #[inline] + pub async fn close(&self) -> Result<(), Error> { + poll_fn(move |cx| self.poll_close(cx)).await + } } diff --git a/src/receivers/producer.rs b/src/receivers/producer.rs index ae7de71..09fda92 100644 --- a/src/receivers/producer.rs +++ b/src/receivers/producer.rs @@ -194,6 +194,14 @@ impl + 'static> Receiver } } } + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } } lazy_static::lazy_static! { @@ -245,7 +253,7 @@ mod tests { use crate::{ bus::Bus, - cell::MsgCell, + cell::{MessageCell, MsgCell}, error::Error, handler::Handler, message::{Message, SharedMessage}, @@ -306,7 +314,7 @@ mod tests { None } - fn try_clone_into(&self, _into: &mut dyn Message) -> bool { + fn try_clone_into(&self, _into: &mut dyn MessageCell) -> bool { false } @@ -334,6 +342,7 @@ mod tests { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; type FlushFuture<'a> = std::future::Ready>; + type CloseFuture<'a> = std::future::Ready>; fn handle(&self, msg: &mut MsgCell, _: &Bus) -> Self::HandleFuture<'_> { let val = msg.peek().0; @@ -347,6 +356,10 @@ mod tests { fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> { std::future::ready(Ok(())) } + + fn close(&mut self) -> Self::CloseFuture<'_> { + std::future::ready(Ok(())) + } } struct SleepTest { @@ -357,6 +370,7 @@ mod tests { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; type FlushFuture<'a> = std::future::Ready>; + type CloseFuture<'a> = std::future::Ready>; fn handle(&self, msg: &mut MsgCell, _: &Bus) -> Self::HandleFuture<'_> { let val = msg.peek().0; @@ -366,9 +380,14 @@ mod tests { Ok(Msg(x + val)) } } + fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> { std::future::ready(Ok(())) } + + fn close(&mut self) -> Self::CloseFuture<'_> { + std::future::ready(Ok(())) + } } #[tokio::test] diff --git a/src/receivers/queue.rs b/src/receivers/queue.rs index dbd06f6..4ce7518 100644 --- a/src/receivers/queue.rs +++ b/src/receivers/queue.rs @@ -1,11 +1,10 @@ use std::{ - marker::PhantomData, pin::Pin, sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, }, - task::{ready, Context, Poll, Waker}, + task::{ready, Context, Poll}, }; use crossbeam::queue::ArrayQueue; @@ -158,95 +157,35 @@ impl<'a, M: Message, R: Message, T: Receiver + 'static> Receiver for Poll::Pending } + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } } #[cfg(test)] mod tests { use futures::Future; - use std::{any::Any, sync::Arc}; + use std::sync::Arc; use crate::{ bus::Bus, cell::MsgCell, + derive_message_clone, error::Error, handler::Handler, - message::{Message, SharedMessage}, receiver::IntoAbstractReceiver, receivers::{queue::Queue, wrapper::HandlerWrapper}, - type_tag::{TypeTag, TypeTagInfo}, }; #[derive(Debug, Clone, PartialEq)] struct Msg(pub u32); - impl Message for Msg { - #[allow(non_snake_case)] - fn TYPE_TAG() -> TypeTag - where - Self: Sized, - { - TypeTagInfo::parse("demo::Msg").unwrap().into() - } - - fn type_tag(&self) -> TypeTag { - Msg::TYPE_TAG() - } - - fn type_layout(&self) -> std::alloc::Layout { - std::alloc::Layout::for_value(self) - } - - fn as_any_ref(&self) -> &dyn Any { - self - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } - - fn as_any_boxed(self: Box) -> Box { - self as _ - } - - fn as_any_arc(self: Arc) -> Arc { - self as _ - } - - fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { - None - } - - fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { - None - } - - fn as_shared_boxed(self: Box) -> Result, Box> { - Err(self) - } - - fn as_shared_arc(self: Arc) -> Option> { - None - } - - fn try_clone_into(&self, _into: &mut dyn Message) -> bool { - false - } - - fn try_clone_boxed(&self) -> Option> { - None - } - - fn try_clone(&self) -> Option - where - Self: Sized, - { - Some(Self(self.0)) - } - - fn is_cloneable(&self) -> bool { - false - } - } + derive_message_clone!(MSG1, Msg, "test::Msg"); struct Test { inner: u32, @@ -255,9 +194,8 @@ mod tests { impl Handler for Test { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; - type FlushFuture<'a> = impl Future> + 'a - where - Self: 'a; + type FlushFuture<'a> = impl Future> + 'a; + type CloseFuture<'a> = impl Future> + 'a; fn handle(&self, msg: &mut MsgCell, _bus: &Bus) -> Self::HandleFuture<'_> { let msg = msg.peek().0; @@ -272,6 +210,10 @@ mod tests { fn flush(&mut self, _bus: &Bus) -> Self::FlushFuture<'_> { async move { Ok(()) } } + + fn close(&mut self) -> Self::CloseFuture<'_> { + async move { Ok(()) } + } } #[tokio::test] diff --git a/src/receivers/spawner.rs b/src/receivers/spawner.rs index 24034e7..990a40b 100644 --- a/src/receivers/spawner.rs +++ b/src/receivers/spawner.rs @@ -1,5 +1,4 @@ use std::{ - future::poll_fn, marker::PhantomData, sync::Arc, task::{ready, Context, Poll, Waker}, @@ -8,7 +7,7 @@ use std::{ use crossbeam::queue::ArrayQueue; use futures::task::AtomicWaker; use parking_lot::Mutex; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::sync::mpsc; use crate::{ bus::{Bus, TaskHandler}, @@ -205,6 +204,14 @@ impl + Send + Sync + 'static> Receiver // .poll(resp, cx) Poll::Pending } + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } } #[cfg(test)] diff --git a/src/receivers/wrapper.rs b/src/receivers/wrapper.rs index 139661b..3fb2332 100644 --- a/src/receivers/wrapper.rs +++ b/src/receivers/wrapper.rs @@ -131,6 +131,14 @@ impl + 'static> Receiver for HandlerWr Poll::Pending } + + fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } } pub(crate) struct HandlerWrapperHelper>(PhantomData<(M, T)>); @@ -150,7 +158,6 @@ impl + 'static> HandlerWrapperHelper { #[cfg(test)] mod tests { use std::{ - any::Any, future::{poll_fn, Future}, sync::{ atomic::{AtomicBool, Ordering}, @@ -160,87 +167,14 @@ mod tests { }; use crate::{ - bus::Bus, - cell::MsgCell, - error::Error, - handler::Handler, - message::{Message, SharedMessage}, - receiver::IntoAbstractReceiver, - receivers::wrapper::HandlerWrapper, - type_tag::{TypeTag, TypeTagInfo}, + bus::Bus, cell::MsgCell, derive_message_clone, error::Error, handler::Handler, + receiver::IntoAbstractReceiver, receivers::wrapper::HandlerWrapper, }; #[derive(Debug, Clone, PartialEq)] struct Msg(pub u32); - impl Message for Msg { - #[allow(non_snake_case)] - fn TYPE_TAG() -> TypeTag - where - Self: Sized, - { - TypeTagInfo::parse("demo::Msg").unwrap().into() - } - - fn type_tag(&self) -> TypeTag { - Msg::TYPE_TAG() - } - - fn type_layout(&self) -> std::alloc::Layout { - std::alloc::Layout::for_value(self) - } - - fn as_any_ref(&self) -> &dyn Any { - self - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } - - fn as_any_boxed(self: Box) -> Box { - self as _ - } - - fn as_any_arc(self: Arc) -> Arc { - self as _ - } - - fn as_shared_ref(&self) -> Option<&dyn SharedMessage> { - None - } - - fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { - None - } - - fn as_shared_boxed(self: Box) -> Result, Box> { - Err(self) - } - - fn as_shared_arc(self: Arc) -> Option> { - None - } - - fn try_clone_into(&self, _into: &mut dyn Message) -> bool { - false - } - - fn try_clone_boxed(&self) -> Option> { - None - } - - fn is_cloneable(&self) -> bool { - false - } - - fn try_clone(&self) -> Option - where - Self: Sized, - { - Some(Self(self.0)) - } - } + derive_message_clone!(TEST_MSG, Msg, "test::Msg"); struct Test { inner: u32, @@ -250,6 +184,7 @@ mod tests { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; type FlushFuture<'a> = std::future::Ready>; + type CloseFuture<'a> = std::future::Ready>; fn handle(&self, msg: &mut MsgCell, _: &Bus) -> Self::HandleFuture<'_> { let val = msg.peek().0; @@ -263,6 +198,10 @@ mod tests { fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> { std::future::ready(Ok(())) } + + fn close(&mut self) -> Self::CloseFuture<'_> { + std::future::ready(Ok(())) + } } struct SleepTest { @@ -273,6 +212,7 @@ mod tests { type Response = Msg; type HandleFuture<'a> = impl Future> + 'a; type FlushFuture<'a> = std::future::Ready>; + type CloseFuture<'a> = std::future::Ready>; fn handle(&self, msg: &mut MsgCell, _: &Bus) -> Self::HandleFuture<'_> { let val = msg.peek().0; @@ -285,6 +225,10 @@ mod tests { fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> { std::future::ready(Ok(())) } + + fn close(&mut self) -> Self::CloseFuture<'_> { + std::future::ready(Ok(())) + } } #[tokio::test] diff --git a/src/type_tag/info.rs b/src/type_tag/info.rs index 7f64378..9e82a63 100644 --- a/src/type_tag/info.rs +++ b/src/type_tag/info.rs @@ -9,7 +9,7 @@ use regex::Regex; lazy_static! { static ref INFO_RE: Regex = Regex::new( - r"^((?:\s*[a-zA-Z_][a-zA-Z0-9_]*::)*)\s*([a-zA-Z_][a-zA-Z0-9_]*)(?:<(.*)>)?\s*(.*)" + r"^((?:\s*[a-zA-Z_][a-zA-Z0-9_-]*::)*)\s*([a-zA-Z_][a-zA-Z0-9_]*)(?:<(.*)>)?\s*(.*)" ) .unwrap(); }