AsynIterator

This commit is contained in:
Andrey Tkachenko 2023-11-17 16:54:15 +04:00
parent 4d1cced70e
commit 33db3cb15d
9 changed files with 369 additions and 129 deletions

View File

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1.0.75"
boxcar = "0.2.3" boxcar = "0.2.3"
dashmap = "5.5.0" dashmap = "5.5.0"
futures = "0.3.28" futures = "0.3.28"

View File

@ -3,7 +3,8 @@
use std::sync::Arc; use std::sync::Arc;
use messagebus::{Builder, Bus, Error, Handler, IntoMessageStream, Message}; use anyhow::Error;
use messagebus::{Builder, Bus, Handler, IntoMessages, Message};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Msg(pub i32); pub struct Msg(pub i32);
@ -15,34 +16,42 @@ pub struct Processor {
impl Handler<Msg> for Processor { impl Handler<Msg> for Processor {
type Result = (); type Result = ();
type Error = Error;
async fn handle( async fn handle(
&mut self, &mut self,
_msg: Msg, _msg: Msg,
_stream_id: u32, _stream_id: u32,
_task_id: u32, _task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
Ok(()) ) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
}
async fn finalize(self) -> Result<(), Error> {
Ok(()) Ok(())
} }
async fn handle_error( async fn handle_error(
&mut self, &mut self,
_err: Error, _err: messagebus::Error,
_stream_id: u32, _stream_id: u32,
_task_id: u32, _task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: messagebus::Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error> {
Ok(None) Ok(None)
} }
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
Ok(())
}
} }
struct ProcSpawner; struct ProcSpawner;
impl Builder<Msg> for ProcSpawner { impl Builder<Msg> for ProcSpawner {
type Context = Processor; type Context = Processor;
async fn build(&self, stream_id: u32, _task_id: u32) -> Result<Self::Context, Error> { async fn build(
&self,
stream_id: u32,
_task_id: u32,
) -> Result<Self::Context, messagebus::Error> {
Ok(Processor { Ok(Processor {
_state: stream_id as _, _state: stream_id as _,
}) })

141
src/async_iter.rs Normal file
View File

@ -0,0 +1,141 @@
use std::{marker::PhantomData, pin::Pin};
use futures::{Stream, StreamExt};
pub trait AsyncIterator: Send {
type Item: Send;
fn next(self: Pin<&mut Self>) -> impl futures::Future<Output = Option<Self::Item>> + Send + '_;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
fn map<U: Send, C: Send + FnMut(Self::Item) -> U>(self, cb: C) -> impl AsyncIterator<Item = U>
where
Self: Sized,
{
Map { inner: self, cb }
}
}
pin_project_lite::pin_project! {
pub struct Map<U, I: AsyncIterator, F: FnMut(I::Item) ->U> {
#[pin]
inner: I,
cb: F,
}
}
impl<U: Send, I: AsyncIterator, F: Send + FnMut(I::Item) -> U> AsyncIterator for Map<U, I, F> {
type Item = U;
async fn next(self: Pin<&mut Self>) -> Option<U> {
let this = self.project();
Some((this.cb)(this.inner.next().await?))
}
}
pub struct Iter<I: Iterator> {
inner: I,
}
impl<I: Send + Iterator + Unpin> AsyncIterator for Iter<I>
where
I::Item: Send,
{
type Item = I::Item;
#[inline]
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.get_mut().inner.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub fn iter<I: IntoIterator + Send>(inner: I) -> Iter<I::IntoIter>
where
I::IntoIter: Send + Unpin,
I::Item: Send,
{
Iter {
inner: inner.into_iter(),
}
}
pin_project_lite::pin_project! {
pub struct StreamIter<S: Stream> {
#[pin]
inner: S,
}
}
impl<S: Send + Stream> AsyncIterator for StreamIter<S>
where
S::Item: Send,
{
type Item = S::Item;
#[inline]
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.project().inner.next().await
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub fn stream<S: Send + Stream>(inner: S) -> StreamIter<S>
where
S::Item: Send,
{
StreamIter { inner }
}
pub struct Once<I>(Option<I>);
impl<I: Send + Unpin> AsyncIterator for Once<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.get_mut().0.take()
}
}
pub fn once<I: Send + Unpin>(item: I) -> Once<I> {
Once(Some(item))
}
pub struct Empty<I>(PhantomData<I>);
impl<I: Send> AsyncIterator for Empty<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
None
}
}
pub fn empty<I: Send + Unpin>() -> Empty<I> {
Empty(Default::default())
}
impl<I: Send> AsyncIterator for tokio::sync::mpsc::Receiver<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.get_mut().recv().await
}
}
impl<I: Send> AsyncIterator for tokio::sync::oneshot::Receiver<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
match self.await {
Ok(item) => Some(item),
Err(_) => None,
}
}
}

View File

@ -149,9 +149,13 @@ where
return Ok(self.stream_handlers.get(&stream_id).unwrap().clone()); return Ok(self.stream_handlers.get(&stream_id).unwrap().clone());
} }
let val = Arc::new((self.callback)(stream_id, task_id).await?); let val = match (self.callback)(stream_id, task_id).await {
Ok(val) => Arc::new(val),
Err(err) => return Err(err),
};
self.stream_handlers.insert(stream_id, val.clone()); self.stream_handlers.insert(stream_id, val.clone());
Ok(val.clone()) Ok(val)
} }
fn config(&self, _stream_id: u32) -> Config { fn config(&self, _stream_id: u32) -> Config {

View File

@ -88,13 +88,17 @@ pub(crate) struct Sender<T> {
impl<T> Sender<T> { impl<T> Sender<T> {
pub async fn send(&self, msg: T) -> Result<(), Error> { pub async fn send(&self, msg: T) -> Result<(), Error> {
self.inner.send(ChannelItem::Value(msg)).await?; self.inner
Ok(()) .send(ChannelItem::Value(msg))
.await
.map_err(Error::SendError)
} }
pub async fn stop(&self) -> Result<(), Error> { pub async fn stop(&self) -> Result<(), Error> {
self.inner.send(ChannelItem::Close).await?; self.inner
Ok(()) .send(ChannelItem::Close)
.await
.map_err(Error::SendError)
} }
pub fn close(&self) -> Result<(), Error> { pub fn close(&self) -> Result<(), Error> {

View File

@ -1,19 +1,43 @@
use std::fmt; use std::{fmt, sync::Arc};
use kanal::ReceiveError; use kanal::{ReceiveError, SendError};
#[derive(Debug, PartialEq, Eq)] use crate::message::ErrorMessage;
#[derive(Debug)]
pub enum Error { pub enum Error {
HandlerIsNotRegistered, HandlerIsNotRegistered,
Aborted, Aborted,
SendError(String), SendError(SendError),
ReceiveError(kanal::ReceiveError), ReceiveError(kanal::ReceiveError),
ReorderingDropMessage(u64), ReorderingDropMessage(u64),
HandlerError(Arc<dyn ErrorMessage>),
}
impl std::error::Error for Error {}
impl Eq for Error {}
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Error::HandlerIsNotRegistered, Error::HandlerIsNotRegistered) => true,
(Error::Aborted, Error::Aborted) => true,
(Error::SendError(err1), Error::SendError(err2)) => err1.eq(err2),
(Error::ReceiveError(err1), Error::ReceiveError(err2)) => err1.eq(err2),
(Error::ReorderingDropMessage(idx1), Error::ReorderingDropMessage(idx2)) => {
idx1.eq(idx2)
}
(Error::HandlerError(err1), Error::HandlerError(err2)) => Arc::ptr_eq(err1, err2),
_ => false,
}
}
} }
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Error::HandlerError(err) => writeln!(f, "Handler Error: {}", err)?,
Error::HandlerIsNotRegistered => writeln!(f, "Handle is not registered!")?, Error::HandlerIsNotRegistered => writeln!(f, "Handle is not registered!")?,
Error::Aborted => writeln!(f, "Operation Aborted!")?, Error::Aborted => writeln!(f, "Operation Aborted!")?,
Error::SendError(reason) => writeln!(f, "Channel send error; reason {}", reason)?, Error::SendError(reason) => writeln!(f, "Channel send error; reason {}", reason)?,
@ -29,12 +53,14 @@ impl fmt::Display for Error {
} }
} }
impl std::error::Error for Error {}
impl Clone for Error { impl Clone for Error {
fn clone(&self) -> Self { fn clone(&self) -> Self {
match self { match self {
Error::SendError(err) => Error::SendError(err.clone()), Error::HandlerError(err) => Error::HandlerError(err.clone()),
Error::SendError(err) => match err {
SendError::Closed => Error::SendError(SendError::Closed),
SendError::ReceiveClosed => Error::SendError(SendError::ReceiveClosed),
},
Error::ReceiveError(err) => match err { Error::ReceiveError(err) => match err {
ReceiveError::Closed => Error::ReceiveError(ReceiveError::Closed), ReceiveError::Closed => Error::ReceiveError(ReceiveError::Closed),
ReceiveError::SendClosed => Error::ReceiveError(ReceiveError::SendClosed), ReceiveError::SendClosed => Error::ReceiveError(ReceiveError::SendClosed),
@ -46,20 +72,11 @@ impl Clone for Error {
} }
} }
impl From<kanal::SendError> for Error { impl<E> From<E> for Error
fn from(value: kanal::SendError) -> Self { where
Self::SendError(format!("{}", value)) E: ErrorMessage,
{
fn from(error: E) -> Self {
Self::HandlerError(Arc::new(error))
} }
} }
impl From<kanal::ReceiveError> for Error {
fn from(value: kanal::ReceiveError) -> Self {
Self::ReceiveError(value)
}
}
// impl<M> From<mpsc::error::SendError<M>> for Error {
// fn from(value: mpsc::error::SendError<M>) -> Self {
// Self::SendError(format!("{}", value))
// }
// }

View File

@ -8,38 +8,44 @@ use std::{
}, },
}; };
use futures::{future, Future, Stream, StreamExt}; use futures::Future;
use tokio::sync::Notify; use tokio::sync::Notify;
use crate::{ use crate::{
builder::Builder, builder::Builder,
chan::Receiver, chan::Receiver,
message::Msg, message::{IntoMessages, Msg},
task::{TaskCounter, TaskSpawner}, task::{TaskCounter, TaskSpawner},
BusInner, Error, IntoMessageStream, Message, AsyncIterator, Bus, BusInner, Error, ErrorMessage, Message,
}; };
pub trait Handler<M: Message>: Send + Sync + 'static { pub trait Handler<M: Message>: Send + Sync + 'static {
type Result: Message; type Result: Message + Unpin;
type Error: ErrorMessage + Unpin;
fn handle( fn handle(
&mut self, &mut self,
msg: M, msg: M,
stream_id: u32, stream_id: u32,
task_id: u32, task_id: u32,
) -> impl Future<Output = Result<impl IntoMessageStream<Self::Result> + '_, Error>> + Send + '_; bus: crate::Bus,
) -> impl Future<
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
> + Send
+ '_;
fn handle_error( fn handle_error(
&mut self, &mut self,
_err: Error, _err: Error,
_stream_id: u32, _stream_id: u32,
_task_id: u32, _task_id: u32,
) -> impl Future<Output = Result<impl IntoMessageStream<Self::Result> + '_, Error>> + Send + '_ _bus: crate::Bus,
{ ) -> impl Future<
future::ready(Ok(None)) Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
} > + Send
+ '_;
fn finalize(self) -> impl Future<Output = Result<(), Error>> + Send; fn finalize(self, bus: crate::Bus) -> impl Future<Output = Result<(), Self::Error>> + Send;
} }
pub(crate) struct HandlerSpawner<M, B> { pub(crate) struct HandlerSpawner<M, B> {
@ -72,7 +78,8 @@ where
bus: Arc<BusInner>, bus: Arc<BusInner>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> { ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
Box::pin(async move { Box::pin(async move {
let bus = bus.clone(); let bus = Bus { inner: bus.clone() };
let config = self.builder.config(stream_id); let config = self.builder.config(stream_id);
let mut ctx = self.builder.build(stream_id, task_id).await?; let mut ctx = self.builder.build(stream_id, task_id).await?;
@ -85,42 +92,45 @@ where
let res = match msg.inner { let res = match msg.inner {
Some(Ok(m)) => { Some(Ok(m)) => {
send_result( send_result(
&bus, &bus.inner,
&index_counter, &index_counter,
msg.index, msg.index,
stream_id, stream_id,
&config, &config,
Some( Some(
ctx.handle(m, stream_id, task_id) ctx.handle(m, stream_id, task_id, bus.clone())
.await .await
.map(IntoMessageStream::into_message_stream), .map(IntoMessages::into_messages),
), ),
) )
.await .await
} }
Some(Err(err)) => { Some(Err(err)) => {
send_result( send_result(
&bus, &bus.inner,
&index_counter, &index_counter,
msg.index, msg.index,
stream_id, stream_id,
&config, &config,
Some( Some(
ctx.handle_error(err, stream_id, task_id) ctx.handle_error(err, stream_id, task_id, bus.clone())
.await .await
.map(IntoMessageStream::into_message_stream), .map(IntoMessages::into_messages),
), ),
) )
.await .await
} }
None => { None => {
send_result::<<B::Context as Handler<M>>::Result>( send_result::<
&bus, <B::Context as Handler<M>>::Result,
<B::Context as Handler<M>>::Error,
>(
&bus.inner,
&index_counter, &index_counter,
msg.index, msg.index,
stream_id, stream_id,
&config, &config,
None::<Result<futures::stream::Empty<_>, _>>, None::<Result<crate::Empty<_>, _>>,
) )
.await .await
} }
@ -143,7 +153,7 @@ where
std::any::type_name::<B>() std::any::type_name::<B>()
); );
if let Err(err) = ctx.finalize().await { if let Err(err) = ctx.finalize(bus.clone()).await {
println!("TASK FINALIZE ERROR: {:?}", err); println!("TASK FINALIZE ERROR: {:?}", err);
} }
}); });
@ -160,13 +170,13 @@ where
} }
} }
async fn send_result<'a, M: Message>( async fn send_result<'a, M: Message, E: ErrorMessage>(
bus: &Arc<BusInner>, bus: &Arc<BusInner>,
index_counter: &AtomicU64, index_counter: &AtomicU64,
index: u64, index: u64,
stream_id: u32, stream_id: u32,
config: &crate::builder::Config, config: &crate::builder::Config,
res: Option<Result<impl Stream<Item = Result<M, Error>> + Send + 'a, Error>>, res: Option<Result<impl AsyncIterator<Item = Result<M, E>> + Send + 'a, E>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let reorder_buff = if config.ordered && config.task_count > 1 { let reorder_buff = if config.ordered && config.task_count > 1 {
config.task_count config.task_count
@ -175,20 +185,21 @@ async fn send_result<'a, M: Message>(
}; };
let one = match res { let one = match res {
Some(Ok(stream)) => { Some(Ok(iter)) => {
let hint = stream.size_hint(); let hint = iter.size_hint();
let mut iter = pin!(iter);
match hint { match hint {
(0, Some(0)) => None, (_, Some(0)) => None,
(1, Some(1)) => { (_, Some(1)) => iter.next().await,
let mut stream = pin!(stream);
stream.next().await
}
_ => { _ => {
let mut stream = pin!(stream); while let Some(item) = iter.as_mut().next().await {
while let Some(item) = stream.next().await {
let index = index_counter.fetch_add(1, Ordering::Relaxed); let index = index_counter.fetch_add(1, Ordering::Relaxed);
bus.send::<M>(Some(item), index, stream_id, reorder_buff) bus.send::<M>(
Some(item.map_err(Into::into)),
index,
stream_id,
reorder_buff,
)
.await?; .await?;
} }
return Ok(()); return Ok(());
@ -199,7 +210,13 @@ async fn send_result<'a, M: Message>(
None => None, None => None,
}; };
bus.send(one, index, stream_id, reorder_buff).await?; bus.send(
one.map(|x| x.map_err(Into::into)),
index,
stream_id,
reorder_buff,
)
.await?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,7 @@
#![feature(return_position_impl_trait_in_trait)] #![feature(return_position_impl_trait_in_trait)]
#![feature(async_fn_in_trait)] #![feature(async_fn_in_trait)]
mod async_iter;
mod builder; mod builder;
mod chan; mod chan;
mod error; mod error;
@ -29,10 +30,11 @@ use reorder_queue::ReorderQueueInner;
use task::{TaskCounter, TaskSpawnerWrapper}; use task::{TaskCounter, TaskSpawnerWrapper};
use tokio::sync::{Notify, RwLock}; use tokio::sync::{Notify, RwLock};
pub use async_iter::*;
pub use builder::{Builder, DefaultBuilder, SharedBuilder}; pub use builder::{Builder, DefaultBuilder, SharedBuilder};
pub use error::Error; pub use error::Error;
pub use handler::Handler; pub use handler::Handler;
pub use message::{IntoMessageStream, Message, MessageIterator, MessageStream, MessageTryIterator}; pub use message::{ErrorMessage, IntoMessages, Message};
pub const DEFAUL_STREAM_ID: u32 = u32::MAX; pub const DEFAUL_STREAM_ID: u32 = u32::MAX;
pub const DEFAUL_TASK_ID: u32 = 0; pub const DEFAUL_TASK_ID: u32 = 0;
@ -283,9 +285,8 @@ impl BusInner {
index, index,
stream_id, stream_id,
}) })
.await?; .await
.map_err(Error::SendError)
Ok(())
} }
pub async fn register<M: Message, B: Builder<M>>(self: Arc<Self>, builder: B) pub async fn register<M: Message, B: Builder<M>>(self: Arc<Self>, builder: B)
@ -424,8 +425,9 @@ mod tests {
use rand::RngCore; use rand::RngCore;
use crate::{ use crate::{
handler::Handler, Bus, DefaultBuilder, Error, IntoMessageStream, Message, SharedBuilder, stream, Bus, DefaultBuilder, Error, Handler, IntoMessages, Message, SharedBuilder,
}; };
impl Message for u64 {} impl Message for u64 {}
impl Message for u32 {} impl Message for u32 {}
impl Message for i16 {} impl Message for i16 {}
@ -435,21 +437,33 @@ mod tests {
struct TestProducer; struct TestProducer;
impl Handler<u32> for TestProducer { impl Handler<u32> for TestProducer {
type Result = u64; type Result = u64;
type Error = anyhow::Error;
async fn handle( async fn handle(
&mut self, &mut self,
_msg: u32, _msg: u32,
_stream_id: u32, _stream_id: u32,
_task_id: u32, _task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
Ok(crate::message::MessageStream(stream! { ) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(stream(stream! {
for i in 0u64..10 { for i in 0u64..10 {
yield Ok(i) yield Ok(i)
} }
})) }))
} }
async fn finalize(self) -> Result<(), Error> { async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
}
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
println!("producer finalized"); println!("producer finalized");
Ok(()) Ok(())
} }
@ -464,22 +478,35 @@ mod tests {
impl Handler<u64> for Arc<TestConsumer> { impl Handler<u64> for Arc<TestConsumer> {
type Result = (); type Result = ();
type Error = anyhow::Error;
async fn handle( async fn handle(
&mut self, &mut self,
msg: u64, msg: u64,
stream_id: u32, stream_id: u32,
task_id: u32, task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
tokio::time::sleep(Duration::from_millis(1000)).await; tokio::time::sleep(Duration::from_millis(1000)).await;
println!( println!(
"[{}] shared consumer handle {}u64 ({}:{})", "[{}] shared consumer handle {}u64 ({}:{})",
self.0, msg, stream_id, task_id self.0, msg, stream_id, task_id
); );
Ok(()) Ok(())
} }
async fn finalize(self) -> Result<(), Error> { async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
}
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
println!("[{}] shared consumer finalized", self.0); println!("[{}] shared consumer finalized", self.0);
Ok(()) Ok(())
} }
@ -487,13 +514,15 @@ mod tests {
impl Handler<u64> for TestConsumer { impl Handler<u64> for TestConsumer {
type Result = (); type Result = ();
type Error = anyhow::Error;
async fn handle( async fn handle(
&mut self, &mut self,
msg: u64, msg: u64,
stream_id: u32, stream_id: u32,
task_id: u32, task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
println!( println!(
"[{}] consumer handle {}u64 ({}:{})", "[{}] consumer handle {}u64 ({}:{})",
@ -502,7 +531,17 @@ mod tests {
Ok(()) Ok(())
} }
async fn finalize(self) -> Result<(), Error> { async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
}
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
println!("[{}] consumer finalized", self.0); println!("[{}] consumer finalized", self.0);
Ok(()) Ok(())
} }
@ -512,13 +551,15 @@ mod tests {
impl Handler<i16> for Arc<TestHandler> { impl Handler<i16> for Arc<TestHandler> {
type Result = u16; type Result = u16;
type Error = anyhow::Error;
async fn handle( async fn handle(
&mut self, &mut self,
msg: i16, msg: i16,
_stream_id: u32, _stream_id: u32,
task_id: u32, task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
if task_id % 2 == 0 { if task_id % 2 == 0 {
tokio::time::sleep(Duration::from_millis(13)).await; tokio::time::sleep(Duration::from_millis(13)).await;
} else { } else {
@ -526,10 +567,20 @@ mod tests {
} }
println!("handle {}", msg); println!("handle {}", msg);
Ok(msg as u16) Ok([msg as u16])
} }
async fn finalize(self) -> Result<(), Error> { async fn handle_error(
&mut self,
_err: Error,
_stream_id: u32,
_task_id: u32,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(None)
}
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
} }
@ -541,20 +592,22 @@ mod tests {
impl Handler<u16> for TestCollector { impl Handler<u16> for TestCollector {
type Result = (); type Result = ();
type Error = anyhow::Error;
async fn handle( async fn handle(
&mut self, &mut self,
msg: u16, msg: u16,
_stream_id: u32, _stream_id: u32,
_task_id: u32, _task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
println!("{}", msg); println!("{}", msg);
self.inner.push(msg); self.inner.push(msg);
Ok(None) Ok(None)
} }
async fn finalize(self) -> Result<(), Error> { async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
println!("Checking"); println!("Checking");
assert_eq!(self.inner, (0u16..1024).collect::<Vec<_>>()); assert_eq!(self.inner, (0u16..1024).collect::<Vec<_>>());
Ok(()) Ok(())
@ -565,7 +618,8 @@ mod tests {
err: Error, err: Error,
_stream_id: u32, _stream_id: u32,
_task_id: u32, _task_id: u32,
) -> Result<impl IntoMessageStream<Self::Result>, Error> { _bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
println!("{:?}", err); println!("{:?}", err);
Ok(None) Ok(None)

View File

@ -3,7 +3,16 @@ use std::any::Any;
use futures::Stream; use futures::Stream;
use crate::Error; use crate::{async_iter::AsyncIterator, Error, Iter, StreamIter};
pub trait Message: Any + fmt::Debug + Clone + Send + Sync + Unpin {}
pub trait ErrorMessage: Any + fmt::Debug + fmt::Display + Send + Sync + Unpin {}
pub trait IntoMessages<M: Message, E: ErrorMessage> {
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>;
}
impl Message for () {}
impl ErrorMessage for anyhow::Error {}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Msg<M: Message> { pub(crate) struct Msg<M: Message> {
@ -12,59 +21,43 @@ pub(crate) struct Msg<M: Message> {
pub(crate) stream_id: u32, pub(crate) stream_id: u32,
} }
pub struct MessageStream<S>(pub S); impl<M: Message, E: ErrorMessage, S: Stream<Item = Result<M, E>> + Send> IntoMessages<M, E>
pub struct MessageIterator<I>(pub I); for StreamIter<S>
pub struct MessageTryIterator<I>(pub I);
pub trait Message: Any + fmt::Debug + Clone + Send + Sync + 'static {}
impl Message for () {}
pub trait IntoMessageStream<M: Message>: Send {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send;
}
impl<M: Message, S: Stream<Item = Result<M, Error>> + Send> IntoMessageStream<M>
for MessageStream<S>
{ {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send { fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
self.0 self
} }
} }
impl<M: Message, I: Iterator<Item = M> + Send> IntoMessageStream<M> for MessageIterator<I> { impl<E: ErrorMessage, I: Iterator + Send + Unpin> IntoMessages<I::Item, E> for Iter<I>
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> { where
futures::stream::iter(self.0.map(Ok)) I::Item: Message,
}
}
impl<M: Message, I: Iterator<Item = Result<M, Error>> + Send> IntoMessageStream<M>
for MessageTryIterator<I>
{ {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send { fn into_messages(self) -> impl AsyncIterator<Item = Result<I::Item, E>> {
futures::stream::iter(self.0) self.map(Ok)
} }
} }
impl<M: Message> IntoMessageStream<M> for Option<M> { impl<E: ErrorMessage> IntoMessages<(), E> for () {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send { fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> {
futures::stream::iter(self.into_iter().map(Ok)) crate::empty()
} }
} }
impl<M: Message> IntoMessageStream<M> for Vec<M> { impl<const N: usize, M: Message, E: ErrorMessage> IntoMessages<M, E> for [M; N] {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send { fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
futures::stream::iter(self.into_iter().map(Ok)) crate::iter(self.into_iter().map(Ok))
} }
} }
impl<const N: usize, M: Message> IntoMessageStream<M> for [M; N] { impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Vec<M> {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send { fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
futures::stream::iter(self.map(Ok)) crate::iter(self.into_iter().map(Ok))
} }
} }
impl<M: Message> IntoMessageStream<M> for M { impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Option<M> {
fn into_message_stream(self) -> impl Stream<Item = Result<M, Error>> + Send { fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
futures::stream::iter([Ok(self)]) crate::iter(self.into_iter().map(Ok))
} }
} }