Flush and Close

This commit is contained in:
Andrey Tkachenko 2023-03-07 15:18:55 +04:00
parent 800da160ac
commit 2b5c8a081f
9 changed files with 146 additions and 235 deletions

View File

@ -13,7 +13,7 @@ use std::{
use futures::Future;
use messagebus::{
bus::{Bus, MaskMatch},
cell::MsgCell,
cell::{MessageCell, MsgCell},
error::Error,
handler::{Handler, MessageProducer},
message::{Message, SharedMessage},
@ -72,7 +72,7 @@ impl Message for Msg {
None
}
fn try_clone_into(&self, _into: &mut dyn Message) -> bool {
fn try_clone_into(&self, _into: &mut dyn MessageCell) -> bool {
false
}
@ -142,7 +142,7 @@ impl Message for StartMsg {
None
}
fn try_clone_into(&self, _into: &mut dyn Message) -> bool {
fn try_clone_into(&self, _into: &mut dyn MessageCell) -> bool {
false
}
@ -170,6 +170,7 @@ impl MessageProducer<StartMsg> for Test {
type Message = Msg;
type NextFuture<'a> = impl Future<Output = Result<Self::Message, Error>> + 'a;
type StartFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
type CloseFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn start(&self, _msg: &mut MsgCell<StartMsg>, _: &Bus) -> Self::StartFuture<'_> {
async move {
@ -190,14 +191,19 @@ impl MessageProducer<StartMsg> for Test {
Ok(msg)
}
}
fn close(&mut self) -> Self::CloseFuture<'_> {
async move { Ok(()) }
}
}
impl Handler<Msg> for Test {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
type CloseFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn handle(&self, msg: &mut MsgCell<Msg>, bus: &Bus) -> Self::HandleFuture<'_> {
fn handle(&self, msg: &mut MsgCell<Msg>, _bus: &Bus) -> Self::HandleFuture<'_> {
let msg = msg.get();
async move {
@ -207,7 +213,11 @@ impl Handler<Msg> for Test {
}
}
fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_> {
fn flush(&mut self, _bus: &Bus) -> Self::FlushFuture<'_> {
async move { Ok(()) }
}
fn close(&mut self) -> Self::CloseFuture<'_> {
async move { Ok(()) }
}
}

View File

@ -1,88 +1,20 @@
#![feature(type_alias_impl_trait)]
use std::{alloc::Layout, any::Any, sync::Arc};
use std::sync::Arc;
use futures::Future;
use messagebus::{
bus::{Bus, MaskMatch},
cell::MsgCell,
derive_message_clone,
error::Error,
handler::Handler,
message::{Message, SharedMessage},
receivers::wrapper::HandlerWrapper,
type_tag::{TypeTag, TypeTagInfo},
};
#[derive(Debug, Clone)]
struct Msg(pub u32);
impl Message for Msg {
fn TYPE_TAG() -> TypeTag
where
Self: Sized,
{
TypeTagInfo::parse("demo::Msg").unwrap().into()
}
fn type_tag(&self) -> TypeTag {
Msg::TYPE_TAG()
}
fn type_layout(&self) -> Layout {
Layout::for_value(self)
}
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self as _
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self as _
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
None
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
None
}
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
Err(self)
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
None
}
fn try_clone_into(&self, into: &mut dyn Message) -> bool {
false
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
None
}
fn try_clone(&self) -> Option<Self>
where
Self: Sized,
{
Some(Self(self.0))
}
fn is_cloneable(&self) -> bool {
false
}
}
derive_message_clone!(EXAMPLE_MSG, Msg, "example::Msg");
struct Test {
inner: u32,
@ -92,6 +24,7 @@ impl Handler<Msg> for Test {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
type CloseFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn handle(&self, msg: &mut MsgCell<Msg>, _bus: &Bus) -> Self::HandleFuture<'_> {
let msg = msg.get();
@ -106,6 +39,10 @@ impl Handler<Msg> for Test {
fn flush(&mut self, _bus: &Bus) -> Self::FlushFuture<'_> {
async move { Ok(()) }
}
fn close(&mut self) -> Self::CloseFuture<'_> {
async move { Ok(()) }
}
}
async fn run() -> Result<(), Error> {

View File

@ -12,8 +12,13 @@ pub trait Handler<M: Message>: Send + Sync {
where
Self: 'a;
type CloseFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
fn handle(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::HandleFuture<'_>;
fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_>;
fn close(&mut self) -> Self::CloseFuture<'_>;
}
pub trait MessageProducer<M: Message>: Send + Sync {
@ -27,6 +32,11 @@ pub trait MessageProducer<M: Message>: Send + Sync {
where
Self: 'a;
type CloseFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
fn start(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Self::StartFuture<'_>;
fn next(&self, bus: &Bus) -> Self::NextFuture<'_>;
fn close(&mut self) -> Self::CloseFuture<'_>;
}

View File

@ -30,6 +30,9 @@ pub trait Receiver<M: Message, R: Message>: Send + Sync {
cx: &mut Context<'_>,
bus: &Bus,
) -> Poll<Result<(), Error>>;
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>>;
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>>;
}
pub trait ReceiverEx<M: Message, R: Message>: Receiver<M, R> {
@ -43,6 +46,12 @@ pub trait ReceiverEx<M: Message, R: Message>: Receiver<M, R> {
where
Self: 'a;
type ProcessFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
type FlushFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
type CloseFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
@ -51,6 +60,8 @@ pub trait ReceiverEx<M: Message, R: Message>: Receiver<M, R> {
fn request(&self, msg: MsgCell<M>, bus: Bus) -> Self::RequestFut<'_>;
fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_>;
fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_>;
fn flush(&self, bus: Bus) -> Self::FlushFut<'_>;
fn close(&self) -> Self::CloseFut<'_>;
}
impl<M: Message, R: Message, H: Receiver<M, R> + Send + Sync + 'static> ReceiverEx<M, R> for H {
@ -58,6 +69,8 @@ impl<M: Message, R: Message, H: Receiver<M, R> + Send + Sync + 'static> Receiver
type RequestFut<'a> = impl Future<Output = Result<R, Error>> + Send + 'a;
type ResultFut<'a> = impl Future<Output = Result<R, Error>> + Send + 'a;
type ProcessFut<'a> = impl Future<Output = Result<(), Error>> + Send + 'a;
type FlushFut<'a> = impl Future<Output = Result<(), Error>> + Send + 'a;
type CloseFut<'a> = impl Future<Output = Result<(), Error>> + Send + 'a;
fn try_send(&self, cell: &mut MsgCell<M>, bus: &Bus) -> Result<TaskHandler, Error> {
match self.poll_send(cell, None, bus) {
@ -88,6 +101,14 @@ impl<M: Message, R: Message, H: Receiver<M, R> + Send + Sync + 'static> Receiver
self.result(task, bus).await
}
}
fn flush(&self, bus: Bus) -> Self::FlushFut<'_> {
poll_fn(move |cx| self.poll_flush(cx, &bus))
}
fn close(&self) -> Self::CloseFut<'_> {
poll_fn(move |cx| self.poll_close(cx))
}
}
pub trait AbstractReceiver: Send + Sync + 'static {
@ -107,6 +128,9 @@ pub trait AbstractReceiver: Send + Sync + 'static {
cx: &mut Context<'_>,
bus: &Bus,
) -> Poll<Result<(), Error>>;
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>>;
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>>;
}
pub trait IntoAbstractReceiver<M: Message, R: Message> {
@ -167,6 +191,14 @@ impl<M: Message, R: Message, H: Receiver<M, R> + Send + Sync + 'static> Abstract
self.inner.poll_result(task, res, cx, bus)
}
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>> {
self.inner.poll_flush(cx, bus)
}
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
self.inner.poll_close(cx)
}
}
impl dyn AbstractReceiver {
@ -230,4 +262,14 @@ impl dyn AbstractReceiver {
let task = self.send(&mut cell, bus.clone()).await?;
self.result(task, bus).await
}
#[inline]
pub async fn flush(&self, bus: Bus) -> Result<(), Error> {
poll_fn(move |cx| self.poll_flush(cx, &bus)).await
}
#[inline]
pub async fn close(&self) -> Result<(), Error> {
poll_fn(move |cx| self.poll_close(cx)).await
}
}

View File

@ -194,6 +194,14 @@ impl<M: Message, T: MessageProducer<M> + 'static> Receiver<M, T::Message>
}
}
}
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
lazy_static::lazy_static! {
@ -245,7 +253,7 @@ mod tests {
use crate::{
bus::Bus,
cell::MsgCell,
cell::{MessageCell, MsgCell},
error::Error,
handler::Handler,
message::{Message, SharedMessage},
@ -306,7 +314,7 @@ mod tests {
None
}
fn try_clone_into(&self, _into: &mut dyn Message) -> bool {
fn try_clone_into(&self, _into: &mut dyn MessageCell) -> bool {
false
}
@ -334,6 +342,7 @@ mod tests {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = std::future::Ready<Result<(), Error>>;
type CloseFuture<'a> = std::future::Ready<Result<(), Error>>;
fn handle(&self, msg: &mut MsgCell<Msg>, _: &Bus) -> Self::HandleFuture<'_> {
let val = msg.peek().0;
@ -347,6 +356,10 @@ mod tests {
fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> {
std::future::ready(Ok(()))
}
fn close(&mut self) -> Self::CloseFuture<'_> {
std::future::ready(Ok(()))
}
}
struct SleepTest {
@ -357,6 +370,7 @@ mod tests {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = std::future::Ready<Result<(), Error>>;
type CloseFuture<'a> = std::future::Ready<Result<(), Error>>;
fn handle(&self, msg: &mut MsgCell<Msg>, _: &Bus) -> Self::HandleFuture<'_> {
let val = msg.peek().0;
@ -366,9 +380,14 @@ mod tests {
Ok(Msg(x + val))
}
}
fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> {
std::future::ready(Ok(()))
}
fn close(&mut self) -> Self::CloseFuture<'_> {
std::future::ready(Ok(()))
}
}
#[tokio::test]

View File

@ -1,11 +1,10 @@
use std::{
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
task::{ready, Context, Poll, Waker},
task::{ready, Context, Poll},
};
use crossbeam::queue::ArrayQueue;
@ -158,95 +157,35 @@ impl<'a, M: Message, R: Message, T: Receiver<M, R> + 'static> Receiver<M, R> for
Poll::Pending
}
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use std::{any::Any, sync::Arc};
use std::sync::Arc;
use crate::{
bus::Bus,
cell::MsgCell,
derive_message_clone,
error::Error,
handler::Handler,
message::{Message, SharedMessage},
receiver::IntoAbstractReceiver,
receivers::{queue::Queue, wrapper::HandlerWrapper},
type_tag::{TypeTag, TypeTagInfo},
};
#[derive(Debug, Clone, PartialEq)]
struct Msg(pub u32);
impl Message for Msg {
#[allow(non_snake_case)]
fn TYPE_TAG() -> TypeTag
where
Self: Sized,
{
TypeTagInfo::parse("demo::Msg").unwrap().into()
}
fn type_tag(&self) -> TypeTag {
Msg::TYPE_TAG()
}
fn type_layout(&self) -> std::alloc::Layout {
std::alloc::Layout::for_value(self)
}
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self as _
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self as _
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
None
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
None
}
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
Err(self)
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
None
}
fn try_clone_into(&self, _into: &mut dyn Message) -> bool {
false
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
None
}
fn try_clone(&self) -> Option<Self>
where
Self: Sized,
{
Some(Self(self.0))
}
fn is_cloneable(&self) -> bool {
false
}
}
derive_message_clone!(MSG1, Msg, "test::Msg");
struct Test {
inner: u32,
@ -255,9 +194,8 @@ mod tests {
impl Handler<Msg> for Test {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a
where
Self: 'a;
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
type CloseFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn handle(&self, msg: &mut MsgCell<Msg>, _bus: &Bus) -> Self::HandleFuture<'_> {
let msg = msg.peek().0;
@ -272,6 +210,10 @@ mod tests {
fn flush(&mut self, _bus: &Bus) -> Self::FlushFuture<'_> {
async move { Ok(()) }
}
fn close(&mut self) -> Self::CloseFuture<'_> {
async move { Ok(()) }
}
}
#[tokio::test]

View File

@ -1,5 +1,4 @@
use std::{
future::poll_fn,
marker::PhantomData,
sync::Arc,
task::{ready, Context, Poll, Waker},
@ -8,7 +7,7 @@ use std::{
use crossbeam::queue::ArrayQueue;
use futures::task::AtomicWaker;
use parking_lot::Mutex;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::sync::mpsc;
use crate::{
bus::{Bus, TaskHandler},
@ -205,6 +204,14 @@ impl<M: Message, R: Message, T: Receiver<M, R> + Send + Sync + 'static> Receiver
// .poll(resp, cx)
Poll::Pending
}
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
#[cfg(test)]

View File

@ -131,6 +131,14 @@ impl<M: Message, T: Handler<M> + 'static> Receiver<M, T::Response> for HandlerWr
Poll::Pending
}
fn poll_flush(&self, cx: &mut Context<'_>, bus: &Bus) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
pub(crate) struct HandlerWrapperHelper<M: Message, T: Handler<M>>(PhantomData<(M, T)>);
@ -150,7 +158,6 @@ impl<M: Message, T: Handler<M> + 'static> HandlerWrapperHelper<M, T> {
#[cfg(test)]
mod tests {
use std::{
any::Any,
future::{poll_fn, Future},
sync::{
atomic::{AtomicBool, Ordering},
@ -160,87 +167,14 @@ mod tests {
};
use crate::{
bus::Bus,
cell::MsgCell,
error::Error,
handler::Handler,
message::{Message, SharedMessage},
receiver::IntoAbstractReceiver,
receivers::wrapper::HandlerWrapper,
type_tag::{TypeTag, TypeTagInfo},
bus::Bus, cell::MsgCell, derive_message_clone, error::Error, handler::Handler,
receiver::IntoAbstractReceiver, receivers::wrapper::HandlerWrapper,
};
#[derive(Debug, Clone, PartialEq)]
struct Msg(pub u32);
impl Message for Msg {
#[allow(non_snake_case)]
fn TYPE_TAG() -> TypeTag
where
Self: Sized,
{
TypeTagInfo::parse("demo::Msg").unwrap().into()
}
fn type_tag(&self) -> TypeTag {
Msg::TYPE_TAG()
}
fn type_layout(&self) -> std::alloc::Layout {
std::alloc::Layout::for_value(self)
}
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn Any> {
self as _
}
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any> {
self as _
}
fn as_shared_ref(&self) -> Option<&dyn SharedMessage> {
None
}
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
None
}
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
Err(self)
}
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
None
}
fn try_clone_into(&self, _into: &mut dyn Message) -> bool {
false
}
fn try_clone_boxed(&self) -> Option<Box<dyn Message>> {
None
}
fn is_cloneable(&self) -> bool {
false
}
fn try_clone(&self) -> Option<Self>
where
Self: Sized,
{
Some(Self(self.0))
}
}
derive_message_clone!(TEST_MSG, Msg, "test::Msg");
struct Test {
inner: u32,
@ -250,6 +184,7 @@ mod tests {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = std::future::Ready<Result<(), Error>>;
type CloseFuture<'a> = std::future::Ready<Result<(), Error>>;
fn handle(&self, msg: &mut MsgCell<Msg>, _: &Bus) -> Self::HandleFuture<'_> {
let val = msg.peek().0;
@ -263,6 +198,10 @@ mod tests {
fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> {
std::future::ready(Ok(()))
}
fn close(&mut self) -> Self::CloseFuture<'_> {
std::future::ready(Ok(()))
}
}
struct SleepTest {
@ -273,6 +212,7 @@ mod tests {
type Response = Msg;
type HandleFuture<'a> = impl Future<Output = Result<Self::Response, Error>> + 'a;
type FlushFuture<'a> = std::future::Ready<Result<(), Error>>;
type CloseFuture<'a> = std::future::Ready<Result<(), Error>>;
fn handle(&self, msg: &mut MsgCell<Msg>, _: &Bus) -> Self::HandleFuture<'_> {
let val = msg.peek().0;
@ -285,6 +225,10 @@ mod tests {
fn flush(&mut self, _: &Bus) -> Self::FlushFuture<'_> {
std::future::ready(Ok(()))
}
fn close(&mut self) -> Self::CloseFuture<'_> {
std::future::ready(Ok(()))
}
}
#[tokio::test]

View File

@ -9,7 +9,7 @@ use regex::Regex;
lazy_static! {
static ref INFO_RE: Regex = Regex::new(
r"^((?:\s*[a-zA-Z_][a-zA-Z0-9_]*::)*)\s*([a-zA-Z_][a-zA-Z0-9_]*)(?:<(.*)>)?\s*(.*)"
r"^((?:\s*[a-zA-Z_][a-zA-Z0-9_-]*::)*)\s*([a-zA-Z_][a-zA-Z0-9_]*)(?:<(.*)>)?\s*(.*)"
)
.unwrap();
}