AsyncProducer Handler update + version bump

This commit is contained in:
Andrey Tkachenko 2021-11-25 16:48:41 +04:00
parent c927dad70e
commit c9f03f5b49
3 changed files with 32 additions and 17 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "messagebus" name = "messagebus"
version = "0.9.10" version = "0.9.11"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"] authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git" repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"] keywords = ["futures", "async", "tokio", "message", "bus"]
@ -11,10 +11,7 @@ exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018" edition = "2018"
[workspace] [workspace]
members = [ members = ["crates/remote", "crates/derive"]
"crates/remote",
"crates/derive",
]
[dependencies] [dependencies]
messagebus_derive = "0.2.5" messagebus_derive = "0.2.5"
@ -37,4 +34,10 @@ ctor = "0.1.21"
anyhow = "1.0" anyhow = "1.0"
env_logger = "0.9" env_logger = "0.9"
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] } tokio = { version = "1", features = [
"macros",
"parking_lot",
"rt-multi-thread",
"io-util",
"sync",
] }

View File

@ -1,20 +1,30 @@
use core::iter::FromIterator; use core::iter::FromIterator;
use std::ops::ControlFlow; use std::pin::Pin;
use crate::{error::StdSyncSendError, Bus, Message}; use crate::{error::StdSyncSendError, Bus, Message};
use async_trait::async_trait; use async_trait::async_trait;
use futures::Stream;
#[async_trait] #[derive(Debug, Clone, Copy)]
pub trait AsyncProducer<'a, M: Message>: Send { pub struct ProducerStats {
type Error: StdSyncSendError; pub completed: usize,
type Context: Send + 'a; pub failed: usize,
type Response: Message;
async fn start(&'a self, msg: M, bus: &Bus) -> Result<Self::Context, Self::Error>;
async fn next(&'a self, ctx: &mut Self::Context, bus: &Bus) -> Result<ControlFlow<Self::Response>, Self::Error>;
async fn finish(&'a self, _ctx: Self::Context, _bus: &Bus) -> Result<(), Self::Error>;
} }
#[async_trait]
pub trait AsyncProducer<M: Message>: Send + Sync {
type Item: Message;
type Response: Message;
type Error: StdSyncSendError;
async fn producer(
&self,
msg: M,
bus: &Bus,
) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Item, Self::Error>> + Send + '_>>, Self::Error>;
async fn finish(&self, stats: ProducerStats, bus: &Bus) -> Result<Self::Response, Self::Error>;
}
pub trait Handler<M: Message>: Send + Sync { pub trait Handler<M: Message>: Send + Sync {
type Error: StdSyncSendError; type Error: StdSyncSendError;

View File

@ -97,7 +97,9 @@ where
fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> { fn send(&self, mid: u64, m: M, req: bool, _bus: &Bus) -> Result<(), Error<M>> {
match self.tx.send(Request::Request(mid, m, req)) { match self.tx.send(Request::Request(mid, m, req)) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(mpsc::error::SendError(Request::Request(_, msg, _))) => Err(Error::send_closed(msg)), Err(mpsc::error::SendError(Request::Request(_, msg, _))) => {
Err(Error::send_closed(msg))
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }