From 2ae4e8bfed2bc61714cb3ff1deafa0417162aae1 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Wed, 25 Oct 2023 21:43:39 +0400 Subject: [PATCH] Next --- Cargo.toml | 3 + examples/demo.rs | 40 +- src/builder.rs | 102 +++++ src/chan.rs | 113 +++++ src/error.rs | 25 ++ src/handler.rs | 110 +++++ src/lib.rs | 966 +++++++++++++++---------------------------- src/message.rs | 28 ++ src/producer.rs | 116 ++++++ src/rand.rs | 75 ++++ src/reorder_queue.rs | 68 +++ src/task.rs | 110 +++++ 12 files changed, 1112 insertions(+), 644 deletions(-) create mode 100644 src/builder.rs create mode 100644 src/chan.rs create mode 100644 src/error.rs create mode 100644 src/handler.rs create mode 100644 src/message.rs create mode 100644 src/producer.rs create mode 100644 src/rand.rs create mode 100644 src/reorder_queue.rs create mode 100644 src/task.rs diff --git a/Cargo.toml b/Cargo.toml index 0d8556d..46a98c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,9 @@ dashmap = "5.5.0" futures = "0.3.28" kanal = "0.1.0-pre8" log = "0.4.20" +priority-queue = "1.3.2" +rand = { version = "0.8.5", default-features = false, features = ["std_rng", "std"] } +rand_xorshift = "0.3.0" tokio = { version = "1.32.0", features = ["sync", "rt", "macros"] } [dev-dependencies] diff --git a/examples/demo.rs b/examples/demo.rs index 96d82f9..aba7c95 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -1,6 +1,8 @@ +#![feature(impl_trait_in_assoc_type)] + use std::sync::Arc; -use messagebus::{Bus, Error, Message}; +use messagebus::{Builder, Bus, Error, Handler, IntoMessage, Message}; #[derive(Debug, Clone)] pub struct Msg(pub i32); @@ -10,6 +12,35 @@ pub struct Processor { state: i32, } +impl Handler for Processor { + type Result = (); + type IntoMessage = impl IntoMessage; + type HandleFut<'a> = impl futures::Future> + 'a; + type FinalizeFut<'a> = impl futures::Future> + 'a; + + fn handle(&mut self, msg: Msg, _stream_id: u32, _task_id: u32) -> Self::HandleFut<'_> { + async move { Ok(()) } + } + + fn finalize<'a>(self) -> Self::FinalizeFut<'a> { + async move { Ok(()) } + } +} + +struct ProcSpawner; +impl Builder for ProcSpawner { + type Context = Processor; + type BuildFut<'a> = impl futures::Future> + 'a; + + fn build(&self, stream_id: u32, _task_id: u32) -> Self::BuildFut<'_> { + async move { + Ok(Processor { + state: stream_id as _, + }) + } + } +} + impl Processor { pub async fn spawn(sid: u32) -> Result<(usize, Self), Error> { Ok((4, Self { state: 0 })) @@ -26,12 +57,7 @@ impl Processor { async fn run() { let bus = Bus::new(); - bus.register( - 4, - Processor::spawn, - Processor::handler_msg, - Processor::finalize_msg_handler, - ); + bus.register(ProcSpawner).await; } #[tokio::main] diff --git a/src/builder.rs b/src/builder.rs new file mode 100644 index 0000000..a8b79aa --- /dev/null +++ b/src/builder.rs @@ -0,0 +1,102 @@ +use std::{marker::PhantomData, sync::Arc}; + +use futures::Future; + +use crate::{Error, Message}; + +pub trait Builder: Send + Sync + 'static { + type Context: 'static; + type BuildFut<'a>: Future> + Send + 'a + where + Self: 'a; + + fn parallel(&self, _stream_id: u32) -> (u32, bool) { + (1, false) + } + + fn queue_size(&self, _stream_id: u32, _task_id: u32) -> usize { + 4 + } + + fn build(&self, stream_id: u32, _task_id: u32) -> Self::BuildFut<'_>; +} + +pub struct DefaultBuilder(usize, PhantomData<(M, H)>); + +impl DefaultBuilder { + pub fn new(queue_size: usize) -> Self { + Self(queue_size, Default::default()) + } +} + +impl Builder for DefaultBuilder { + type Context = H; + type BuildFut<'a> = impl Future> + Send + 'a; + + fn build(&self, _stream_id: u32, _task_id: u32) -> Self::BuildFut<'_> { + async move { Ok(::default()) } + } + + fn queue_size(&self, _stream_id: u32, _task_id: u32) -> usize { + self.0 + } +} + +pub struct SharedBuilder { + queue_size: usize, + parallel: u32, + stream_handlers: dashmap::DashMap>, + callback: C, + ordered: bool, + _m: PhantomData<(M, F)>, +} + +impl SharedBuilder +where + M: Message, + H: Sync + Send + 'static, + F: Sync + Send + Future> + 'static, + C: Sync + Send + Fn(u32, u32) -> F + 'static, +{ + pub fn new(queue_size: usize, parallel: u32, ordered: bool, callback: C) -> Self { + Self { + queue_size, + parallel, + stream_handlers: Default::default(), + callback, + ordered, + _m: PhantomData, + } + } +} + +impl Builder for SharedBuilder +where + M: Message, + H: Sync + Send + 'static, + F: Sync + Send + Future> + 'static, + C: Sync + Send + Fn(u32, u32) -> F + 'static, +{ + type Context = Arc; + type BuildFut<'a> = impl Future> + Send + 'a; + + fn build(&self, stream_id: u32, task_id: u32) -> Self::BuildFut<'_> { + async move { + if self.stream_handlers.contains_key(&stream_id) { + return Ok(self.stream_handlers.get(&stream_id).unwrap().clone()); + } + + let val = Arc::new((self.callback)(stream_id, task_id).await?); + self.stream_handlers.insert(stream_id, val.clone()); + Ok(val.clone()) + } + } + + fn queue_size(&self, _stream_id: u32, _task_id: u32) -> usize { + self.queue_size + } + + fn parallel(&self, _stream_id: u32) -> (u32, bool) { + (self.parallel, self.ordered) + } +} diff --git a/src/chan.rs b/src/chan.rs new file mode 100644 index 0000000..a29eaaf --- /dev/null +++ b/src/chan.rs @@ -0,0 +1,113 @@ +use std::{any::Any, pin::Pin}; + +use futures::Future; + +use crate::{message::Msg, Error, Message}; + +enum ChannelItem { + Value(T), + Close, +} + +pub(crate) trait BusSenderClose: Any + Send + Sync { + fn upcast(&self) -> &(dyn Any + Send + Sync); + fn is_producer(&self) -> bool; + fn load(&self) -> (usize, usize); + fn stop(&self) -> Pin> + '_>>; + fn terminate(&self) -> Result<(), Error>; +} + +pub(crate) struct BusSender { + is_producer: bool, + tx: Sender>, +} + +impl BusSenderClose for BusSender { + fn upcast(&self) -> &(dyn Any + Send + Sync) { + self + } + + fn stop(&self) -> Pin> + '_>> { + Box::pin(async move { + self.tx.stop().await?; + Ok(()) + }) + } + + fn terminate(&self) -> Result<(), Error> { + self.tx.close() + } + + fn is_producer(&self) -> bool { + self.is_producer + } + + fn load(&self) -> (usize, usize) { + self.tx.load() + } +} + +impl BusSender { + pub async fn send(&self, m: Msg) -> Result<(), Error> { + self.tx.send(m).await + } + + #[inline] + pub(crate) fn new(is_producer: bool, tx: Sender>) -> BusSender { + Self { is_producer, tx } + } +} + +pub(crate) struct Sender { + inner: kanal::AsyncSender>, +} + +impl Sender { + pub async fn send(&self, msg: T) -> Result<(), Error> { + self.inner.send(ChannelItem::Value(msg)).await?; + Ok(()) + } + + pub async fn stop(&self) -> Result<(), Error> { + self.inner.send(ChannelItem::Close).await?; + Ok(()) + } + + pub fn close(&self) -> Result<(), Error> { + self.inner.close(); + Ok(()) + } + + pub fn load(&self) -> (usize, usize) { + (self.inner.len(), self.inner.capacity()) + } +} + +pub(crate) struct Receiver { + inner: kanal::AsyncReceiver>, +} + +impl Receiver { + #[inline] + pub async fn recv(&self) -> Option { + let Ok(item) = self.inner.recv().await else { + return None; + }; + + match item { + ChannelItem::Value(val) => Some(val), + ChannelItem::Close => None, + } + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +pub(crate) fn channel(cap: usize) -> (Sender, Receiver) { + let (tx, rx) = kanal::bounded_async(cap); + + (Sender { inner: tx }, Receiver { inner: rx }) +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..f68c1f9 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,25 @@ +#[derive(Debug)] +pub enum Error { + HandlerIsNotRegistered, + Aborted, + SendError(String), + ReceiveError(kanal::ReceiveError), +} + +impl From for Error { + fn from(value: kanal::SendError) -> Self { + Self::SendError(format!("{}", value)) + } +} + +impl From for Error { + fn from(value: kanal::ReceiveError) -> Self { + Self::ReceiveError(value) + } +} + +// impl From> for Error { +// fn from(value: mpsc::error::SendError) -> Self { +// Self::SendError(format!("{}", value)) +// } +// } diff --git a/src/handler.rs b/src/handler.rs new file mode 100644 index 0000000..ab22546 --- /dev/null +++ b/src/handler.rs @@ -0,0 +1,110 @@ +use std::{ + any::{Any, TypeId}, + marker::PhantomData, + sync::Arc, +}; + +use futures::Future; +use tokio::sync::Notify; + +use crate::{ + builder::Builder, + chan::{channel, Sender}, + message::Msg, + task::{TaskCounter, TaskSpawner}, + BusInner, Error, IntoMessage, Message, +}; + +pub trait Handler: Send + Sync + 'static { + type Result: Message; + type IntoMessage: IntoMessage; + type HandleFut<'a>: Future> + Send + 'a + where + Self: 'a; + + type FinalizeFut<'a>: Future> + Send + 'a + where + Self: 'a; + + fn handle(&mut self, msg: M, stream_id: u32, task_id: u32) -> Self::HandleFut<'_>; + fn finalize<'a>(self) -> Self::FinalizeFut<'a>; +} + +pub(crate) struct HandlerSpawner { + pub(crate) builder: B, + _m: PhantomData, +} + +impl HandlerSpawner { + pub(crate) fn new(builder: B) -> Self { + Self { + builder, + _m: PhantomData, + } + } +} + +impl> TaskSpawner for HandlerSpawner +where + B::Context: Any + Handler, +{ + fn spawn_task( + &self, + stream_id: u32, + task_id: u32, + _abort: Arc, + task_counter: Arc, + bus: Arc, + ) -> Box>, Error>> + Send + '_> { + Box::new(async move { + let bus = bus.clone(); + let (tx, rx) = channel::>(self.builder.queue_size(stream_id, task_id)); + let mut ctx = self.builder.build(stream_id, task_id).await?; + + let _handle = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + task_counter.inc_running(); + + let res_msg = match ctx.handle(msg.inner, stream_id, task_id).await { + Ok(res) => res.into_message(), + Err(err) => { + println!("TASK HANDLE ERROR: {:?}", err); + continue; + } + }; + + if let Some(inner) = res_msg { + if inner.type_id() != TypeId::of::<()>() { + if let Err(err) = bus + .send(Msg { + inner, + index: msg.index, + stream_id, + }) + .await + { + println!("BUS SEND ERROR: {:?}", err); + continue; + } + } + } + + task_counter.dec_running(rx.is_empty()); + } + + if let Err(err) = ctx.finalize().await { + println!("TASK FINALIZE ERROR: {:?}", err); + } + }); + Ok(tx) + }) + } + + fn is_producer(&self) -> bool { + false + } + + fn parallel(&self, stream_id: u32) -> (u32, bool) { + self.builder.parallel(stream_id) + } +} diff --git a/src/lib.rs b/src/lib.rs index 0734343..22801f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,617 +1,210 @@ #![feature(impl_trait_in_assoc_type)] +mod builder; +mod chan; +mod error; +mod handler; +mod message; +mod producer; +mod rand; +mod reorder_queue; +mod task; + use std::{ any::{Any, TypeId}, - fmt, - pin::{pin, Pin}, + collections::HashMap, sync::{ - atomic::{AtomicI64, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, }, }; +use chan::{BusSender, BusSenderClose}; use dashmap::DashMap; -use futures::{future::Either, Future, Stream, StreamExt}; -use tokio::sync::{mpsc, Notify}; +use futures::Future; +use message::Msg; +use rand::RndGen; +use task::{TaskCounter, TaskSpawnerWrapper}; +use tokio::sync::{Notify, RwLock}; -trait AsyncReceiver { - type Item: Send + 'static; - type Error: std::error::Error + Send + Sync + 'static; - type Fut<'a>: Future> + Send + 'a - where - Self: 'a; +pub use builder::{Builder, DefaultBuilder, SharedBuilder}; +pub use error::Error; +pub use handler::Handler; +pub use message::{IntoMessage, Message}; +pub use producer::Producer; - fn recv(&mut self) -> Self::Fut<'_>; -} - -trait AsyncSender: Clone { - type Item: Send + 'static; - type Error: std::error::Error + Send + Sync + 'static; - type Fut<'a>: Future> + Send + 'a - where - Self: 'a; - - fn send(&self, item: Self::Item) -> Self::Fut<'_>; - fn try_send(&self, item: Self::Item) -> Result, Self::Error>; -} - -impl AsyncReceiver for kanal::AsyncReceiver { - type Item = I; - type Error = kanal::ReceiveError; - type Fut<'a> = kanal::ReceiveFuture<'a, I>; - - fn recv(&mut self) -> Self::Fut<'_> { - kanal::AsyncReceiver::recv(self) - } -} - -impl AsyncSender for kanal::AsyncSender { - type Item = I; - type Error = kanal::SendError; - type Fut<'a> = kanal::SendFuture<'a, Self::Item>; - - fn send(&self, item: Self::Item) -> Self::Fut<'_> { - kanal::AsyncSender::send(self, item) - } - - fn try_send(&self, item: Self::Item) -> Result, Self::Error> { - let mut item = Some(item); - kanal::AsyncSender::try_send_option(self, &mut item)?; - Ok(item) - } -} - -// type Sender = kanal::AsyncSender; -// type Receiver = kanal::AsyncReceiver; - -// fn channel(cap: usize) -> (Sender, Receiver) { -// kanal::bounded_async(cap) -// } - -type Sender = mpsc::Sender; -type Receiver = mpsc::Receiver; - -fn channel(cap: usize) -> (Sender, Receiver) { - mpsc::channel(cap) -} - -const DISPATCHER_STREAM_ID: u32 = u32::MAX; -const DEFAUL_STREAM_ID: u32 = u32::MAX - 1; - -const DISPATCHER_TASK_ID: u32 = u32::MAX; -const DEFAUL_TASK_ID: u32 = 0; - -#[derive(Debug)] -pub enum Error { - HandlerIsNotRegistered, - Aborted, - SendError(String), - // SendError(kanal::SendError), -} - -impl From for Error { - fn from(value: kanal::SendError) -> Self { - Self::SendError(format!("{}", value)) - } -} - -impl From> for Error { - fn from(value: mpsc::error::SendError) -> Self { - Self::SendError(format!("{}", value)) - } -} - -pub trait Message: fmt::Debug + Clone + Send + Sync + 'static {} - -impl Message for u64 {} -impl Message for u32 {} -impl Message for () {} - -#[derive(Debug, Clone)] -pub struct Msg { - stream_id: u32, - task_id: u32, - inner: M, -} - -impl Msg { - pub fn stream_id(&self) -> u32 { - self.stream_id - } - - pub fn task_id(&self) -> u32 { - self.task_id - } - - pub fn into_inner(self) -> M { - self.inner - } -} - -#[derive(Default)] -struct TaskCounter { - counter: AtomicI64, - notify: Notify, -} - -impl TaskCounter { - #[inline] - pub async fn wait_last_one(&self) { - self.notify.notified().await - } -} - -struct BusSenders { - senders: boxcar::Vec>>, -} - -impl BusSenders { - pub fn new(sender: Sender>) -> Self { - Self { - senders: boxcar::vec![sender], - } - } - - async fn send(&self, msg: Msg) -> Result<(), Error> { - match self.senders.count() { - 0 => (), - 1 => self.senders[0].send(msg).await?, - 2 => { - let (r1, r2) = futures::future::join( - self.senders[0].send(msg.clone()), - self.senders[1].send(msg), - ) - .await; - - r1.or(r2)? - } - 3 => { - let (r1, r2, r3) = futures::future::join3( - self.senders[0].send(msg.clone()), - self.senders[1].send(msg.clone()), - self.senders[2].send(msg), - ) - .await; - - r1.or(r2).or(r3)? - } - 4 => { - let (r1, r2, r3, r4) = futures::future::join4( - self.senders[0].send(msg.clone()), - self.senders[1].send(msg.clone()), - self.senders[2].send(msg.clone()), - self.senders[3].send(msg), - ) - .await; - - r1.or(r2).or(r3).or(r4)? - } - _ => { - let vec = futures::future::join_all( - self.senders.iter().map(|(_, s)| s.send(msg.clone())), - ) - .await; - - vec.into_iter().find(Result::is_err).unwrap_or(Ok(()))? - } - }; - - Ok(()) - } -} - -pub trait IntoMessage: Send { - fn into_message(self) -> Option; -} - -impl IntoMessage for Option { - fn into_message(self) -> Option { - self - } -} - -impl IntoMessage for M { - fn into_message(self) -> Option { - Some(self) - } -} -pub trait ProducerBuilder { - type Context: Producer + 'static; - type BuildFut<'a>: Future> + Send + 'a - where - Self: 'a; - - fn build<'a>(stream_id: u32) -> Self::BuildFut<'a>; -} - -pub trait HandlerBuilder { - type Context: Handler + 'static; - type BuildFut<'a>: Future> + Send + 'a - where - Self: 'a; - - fn build<'a>(stream_id: u32) -> Self::BuildFut<'a>; -} - -pub trait Handler: Send + Sync + 'static { - type Result: Message; - type IntoMessage: IntoMessage; - type HandleFut<'a>: Future> + Send + 'a - where - Self: 'a; - - type FinalizeFut<'a>: Future> + Send + 'a - where - Self: 'a; - - fn handle<'a>(&self, msg: M, stream_id: u32, task_id: u32) -> Self::HandleFut<'a>; - fn finalize<'a>(self, stream_id: u32) -> Self::FinalizeFut<'a>; -} - -pub trait Producer: Send + 'static { - type Item: Message; - type IntoMessage: IntoMessage; - type Stream<'a>: Stream> + Send + Sync + 'a - where - Self: 'a; - - type FinalizeFut<'a>: Future> + Send + 'a - where - Self: 'a; - - fn stream(&mut self, msg: M, stream_id: u32) -> Self::Stream<'_>; - fn finalize<'a>(self, stream_id: u32) -> Self::FinalizeFut<'a>; -} +pub const DEFAUL_STREAM_ID: u32 = u32::MAX; +pub const DEFAUL_TASK_ID: u32 = 0; #[derive(Default)] struct BusInner { - senders: DashMap<(u32, u32, TypeId), Arc>, - contexts: DashMap<(u32, TypeId), (usize, Arc)>, - producers_tasks: TaskCounter, - handlers_tasks: TaskCounter, - producers_stop_notify: Notify, - producers_abort_notify: Notify, - abort_notify: Notify, + senders: DashMap<(u32, u32, TypeId), Arc>, + spawners: RwLock>>, + counters: DashMap, + abort_notify: Arc, + task_counter: Arc, + rng: RndGen, } impl BusInner { - async fn task_dispatcher< - M: Message, - R: Message, - C: Send + Sync + 'static, - F: Future> + Send + 'static, - H: Future, Error>> + Send + 'static, - T: Future> + Send + 'static, - >( - self: Arc, - mut rx: Receiver>, - builder: impl Fn(u32) -> F + Send + 'static, - handler: impl Fn(Arc, u32, u32, M) -> H + Clone + Send + 'static, - finalize: impl Fn(Arc, u32) -> T + Clone + Send + 'static, - ) -> Result<(), Error> { - let tid = TypeId::of::(); - let mut notified = pin!(self.abort_notify.notified()); - - while let Either::Left((Some(msg), _)) = - futures::future::select(pin!(rx.recv()), notified.as_mut()).await - { - if msg.stream_id == DISPATCHER_STREAM_ID { - log::warn!("ERROR: GOT GATEWAY IN STREAM_ID!!!"); - continue; - } - - if !self - .senders - .contains_key(&(msg.stream_id, msg.task_id, tid)) - { - let (queue, ctx) = if let Some(rec) = self.contexts.get(&(msg.stream_id, tid)) { - let (queue, ctx) = rec.value(); - - (*queue, ctx.clone().downcast::().unwrap()) - } else { - let fut = (builder)(msg.stream_id); - - let (queue, ctx) = match fut.await { - Ok(tpl) => tpl, - Err(err) => { - println!("BUILDER ERROR: {:?}", err); - continue; - } - }; - let ctx = Arc::new(ctx); - - self.contexts - .insert((msg.stream_id, tid), (queue, ctx.clone())); - - (queue, ctx) - }; - - let (tx, rx) = channel::>(queue); - - let stream_id = msg.stream_id; - let task_id = msg.task_id; - - let handler = handler.clone(); - let finalize = finalize.clone(); - let self_clone = self.clone(); - - tokio::spawn(async move { - let tsk_cnt = &self_clone.handlers_tasks; - tsk_cnt.counter.fetch_add(1, Ordering::Relaxed); - - if let Err(err) = self_clone - .clone() - .task_handler(rx, ctx, handler, finalize) - .await - { - println!("error: {:?}", err); - } - - if tsk_cnt.counter.fetch_sub(1, Ordering::Relaxed) == 1 { - tsk_cnt.notify.notify_one(); - } - }); - - tx.send(msg).await.unwrap(); - - self.senders.insert( - (stream_id, task_id, tid), - Arc::new(BusSenders::new(tx)) as _, - ); - } else { - let senders = self - .senders - .get(&(msg.stream_id, msg.task_id, tid)) - .unwrap() - .clone(); - - senders - .downcast_ref::>() - .unwrap() - .send(msg) - .await - .unwrap(); - } + fn get_task_id(&self, stream_id: u32, task_count: u32) -> u32 { + if task_count == 1 { + return DEFAUL_TASK_ID; } - println!("DISPATCHER ENDED {}", std::any::type_name::()); + let type_id = TypeId::of::(); + let (id1, id2) = self.rng.next_u32_pair(task_count); - Ok(()) - } - - async fn task_handler< - M: Message, - R: Message, - C: Send + Sync + 'static, - H: Future, Error>> + Send + 'static, - T: Future> + Send + 'static, - >( - self: Arc, - mut rx: Receiver>, - ctx: Arc, - handler: impl Fn(Arc, u32, u32, M) -> H + Clone + Send + 'static, - finalize: impl Fn(Arc, u32) -> T + Clone + Send + 'static, - ) -> Result<(), Error> { - let mut notified = pin!(self.abort_notify.notified()); - - while let Either::Left((Some(msg), _)) = - futures::future::select(pin!(rx.recv()), notified.as_mut()).await - { - let stream_id = msg.stream_id; - let fut = (handler)(ctx.clone(), stream_id, msg.task_id, msg.into_inner()); - let res = fut.await.unwrap(); - - if let Some(m) = res.into_message() { - if m.type_id() != TypeId::of::<()>() { - if let Err(err) = self.send(stream_id, m).await { - println!("BUS SEND ERROR: {:?}", err); - continue; - } - } - } - } - - println!("HANDLER ENDED {}", std::any::type_name::()); - - let fut = finalize(ctx, 0); - fut.await - } - - async fn producer_task< - M: Message, - R: Message, - S: Stream> + Send + 'static, - F: Future> + Send + 'static, - T: Future> + Send + 'static, - >( - self: Arc, - mut rx: Receiver>, - builder: impl Fn(u32, M) -> F + Send + 'static, - finalize: impl Fn(u32, Pin<&mut S>) -> T + Clone + Send + 'static, - ) -> Result<(), Error> { - let mut notified = pin!(self.abort_notify.notified()); - - while let Either::Left((Some(msg), _)) = - futures::future::select(pin!(rx.recv()), notified.as_mut()).await - { - let stream_id = msg.stream_id; - let fut = (builder)(stream_id, msg.into_inner()); - let mut stream = pin!(fut.await.unwrap()); - - let streaming_task = async { - loop { - match stream.next().await { - Some(Ok(msg)) => { - if let Err(err) = self.send(stream_id, msg).await { - println!("BUS SEND ERROR: {:?}", err); - continue; - } - } - Some(Err(err)) => { - println!("PRODUCER ERROR: {:?}", err); - continue; - } - None => { - println!( - "PRODUCER DRAINED THE STREAM {} of type {}", - stream_id, - std::any::type_name::() - ); - break; - } - } - } - }; - - let aborted = tokio::select! { - _ = streaming_task => false, - _ = self.producers_abort_notify.notified() => { - println!("ABORTED: {}", stream_id); - true - } - }; - - let fut = finalize(stream_id, stream); - if let Err(err) = fut.await { - println!("PRODUCER FINALIZE ERROR: {:?}", err); - } - - if aborted { - break; - } - } - - println!( - "PRODUCER TASK ENDED FOR TYPE {}", - std::any::type_name::() - ); - - Ok(()) - } - - pub async fn send(&self, stream_id: u32, msg: M) -> Result<(), Error> { - let task_id = DEFAUL_TASK_ID; - - let sender_ref = self + let Some(l1) = self .senders - .get(&(stream_id, task_id, TypeId::of::())) - .or_else(|| { - self.senders - .get(&(DISPATCHER_STREAM_ID, DISPATCHER_TASK_ID, TypeId::of::())) - }); - - let Some(sender_ref) = sender_ref else { - return Err(Error::HandlerIsNotRegistered); + .get(&(stream_id, id1, type_id)) + .as_deref() + .map(|x| x.load()) + else { + return id1; }; - let sender = sender_ref.clone(); - drop(sender_ref); + let Some(l2) = self + .senders + .get(&(stream_id, id2, type_id)) + .as_deref() + .map(|x| x.load()) + else { + return id2; + }; - sender - .downcast_ref::>() - .unwrap() - .send(Msg { - stream_id, - task_id, - inner: msg, - }) - .await?; + if l1.0 < l2.0 { + id1 + } else { + id2 + } + } + + async fn send_inner( + self: &Arc, + msg: Msg, + task_count: u32, + ) -> Result<(), Error> { + let type_id = TypeId::of::(); + let stream_id = msg.stream_id; + let task_id = self.get_task_id::(stream_id, task_count); + + if !self.senders.contains_key(&(stream_id, task_id, type_id)) { + let spawner = if let Some(spawner) = self.spawners.read().await.get(&type_id) { + spawner + .downcast_ref::>() + .unwrap() + .clone() + } else { + return Err(Error::HandlerIsNotRegistered); + }; + + let tx = spawner + .spawn_task( + stream_id, + task_id, + self.abort_notify.clone(), + self.task_counter.clone(), + self.clone(), + ) + .await?; + + tx.send(msg).await.unwrap(); + + self.senders + .insert((stream_id, task_id, type_id), Arc::new(tx) as _); + } else { + let senders = self + .senders + .get(&(stream_id, task_id, type_id)) + .unwrap() + .clone(); + + senders + .upcast() + .downcast_ref::>() + .unwrap() + .send(msg) + .await + .unwrap(); + } Ok(()) } - pub fn register_dispatcher< - C: Send + Sync + 'static, - M: Message, - R: Message, - F: Future> + Send + 'static, - H: Future, Error>> + Send + 'static, - T: Future> + Send + 'static, - >( - &self, - bus: &Bus, - queue_size: usize, - builder: impl Fn(u32) -> F + Send + 'static, - handler: impl Fn(Arc, u32, u32, M) -> H + Clone + Send + 'static, - finalize: impl Fn(Arc, u32) -> T + Clone + Send + 'static, - ) { - let (tx, rx) = channel::>(queue_size); + pub async fn send(self: &Arc, msg: Msg) -> Result<(), Error> { + let type_id = TypeId::of::(); + let stream_id = msg.stream_id; - self.senders - .entry((DISPATCHER_STREAM_ID, DISPATCHER_TASK_ID, TypeId::of::())) - .or_insert_with(|| Arc::new(BusSenders::new(tx))); + let (task_count, ordered) = if let Some(spawner) = self + .spawners + .read() + .await + .get(&type_id) + .and_then(|x| x.downcast_ref::>()) + { + spawner.parallel(stream_id) + } else { + (1, false) + }; - let bus = bus.clone(); - tokio::spawn(async move { - let tsk_cnt = &bus.inner.handlers_tasks; - tsk_cnt.counter.fetch_add(1, Ordering::Relaxed); + if ordered { + // let queue = self + // .reordering_queue + // .get_or_insert(&(stream_id, type_id), task_count); - if let Err(err) = bus - .inner - .clone() - .task_dispatcher(rx, builder, handler, finalize) - .await - { - println!("error: {:?}", err); - } + // queue.push(msg); + // while let Some(msg) = queue.pop() { + // self.send_inner(msg, task_count).await?; + // } - if tsk_cnt.counter.fetch_sub(1, Ordering::Relaxed) == 1 { - tsk_cnt.notify.notify_one(); - } - }); + // Ok(()) + self.send_inner(msg, task_count).await + } else { + self.send_inner(msg, task_count).await + } } - pub fn register_producer< - M: Message, - I: Message, - S: Stream> + Send + 'static, - F: Future> + Send + 'static, - T: Future> + Send + 'static, - >( - &self, - bus: &Bus, - queue_size: usize, - builder: impl Fn(u32, M) -> F + Send + 'static, - finalize: impl Fn(u32, Pin<&mut S>) -> T + Clone + Send + 'static, - ) { - let (tx, rx) = channel::>(queue_size); - - self.senders - .entry((DISPATCHER_STREAM_ID, DISPATCHER_TASK_ID, TypeId::of::())) - .or_insert_with(|| Arc::new(BusSenders::new(tx))); - - let bus = bus.clone(); - tokio::spawn(async move { - let tsk_cnt = &bus.inner.producers_tasks; - - tsk_cnt.counter.fetch_add(1, Ordering::Relaxed); - - if let Err(err) = bus.inner.clone().producer_task(rx, builder, finalize).await { - println!("error: {:?}", err); - } - - if tsk_cnt.counter.fetch_sub(1, Ordering::Relaxed) == 1 { - tsk_cnt.notify.notify_one(); - } - }); + pub async fn register_dispatcher>(self: Arc, builder: B) + where + B::Context: Handler, + { + let type_id = TypeId::of::(); + self.counters.insert(type_id, AtomicU64::new(0)); + self.spawners.write().await.insert( + type_id, + Box::new(TaskSpawnerWrapper::from_handler(builder)) as _, + ); } + pub async fn register_producer>(self: Arc, builder: B) + where + B::Context: Producer, + { + self.spawners.write().await.insert( + TypeId::of::(), + Box::new(TaskSpawnerWrapper::from_producer(builder)) as _, + ); + } + + #[inline] pub async fn close(&self, force: bool) { - self.producers_abort_notify.notify_waiters(); - if force { self.abort_notify.notify_waiters(); } + + for item in self.senders.iter() { + if force { + let _ = item.value().terminate(); + } else if item.is_producer() { + let _ = item.value().stop().await; + } + } } + #[inline] pub async fn wait(&self) { - self.producers_stop_notify.notify_waiters(); - self.producers_tasks.wait_last_one().await; - println!("producers done"); - - self.abort_notify.notify_waiters(); - self.handlers_tasks.wait_last_one().await; - println!("handlers done"); + self.task_counter.wait().await; } } @@ -628,52 +221,61 @@ impl Bus { } #[inline] - pub fn register< - C: Send + Sync + 'static, - M: Message, - R: Message, - F: Future> + Send + 'static, - H: Future, Error>> + Send + 'static, - T: Future> + Send + 'static, - >( - &self, - queue: usize, - builder: impl Fn(u32) -> F + Send + 'static, - handler: impl Fn(Arc, u32, u32, M) -> H + Clone + Send + 'static, - finalize: impl Fn(Arc, u32) -> T + Clone + Send + 'static, - ) -> &Self { - self.inner - .register_dispatcher(self, queue, builder, handler, finalize); - + pub async fn register>(&self, builder: B) -> &Self + where + B::Context: Handler, + { + self.inner.clone().register_dispatcher(builder).await; self } #[inline] - pub fn register_producer< - M: Message, - I: Message, - S: Stream> + Send + 'static, - F: Future> + Send + 'static, - T: Future> + Send + 'static, - >( - &self, - queue_size: usize, - builder: impl Fn(u32, M) -> F + Send + 'static, - finalize: impl Fn(u32, Pin<&mut S>) -> T + Clone + Send + 'static, - ) -> &Self { - self.inner - .register_producer(self, queue_size, builder, finalize); + pub async fn register_producer>(&self, builder: B) -> &Self + where + B::Context: Producer, + { + self.inner.clone().register_producer(builder).await; self } #[inline] - pub async fn send(&self, msg: M) -> Result<(), Error> { - self.inner.send(DEFAUL_STREAM_ID, msg).await + pub async fn send(&self, inner: M) -> Result<(), Error> { + let index = self + .inner + .counters + .get(&TypeId::of::()) + .map(|x| x.fetch_add(1, Ordering::Relaxed)) + .unwrap_or(0); + + self.inner + .send(Msg { + inner, + index, + stream_id: DEFAUL_STREAM_ID, + }) + .await } #[inline] - pub async fn send_with_stream(&self, stream_id: u32, msg: M) -> Result<(), Error> { - self.inner.send(stream_id, msg).await + pub async fn send_with_stream( + &self, + stream_id: u32, + inner: M, + ) -> Result<(), Error> { + let index = self + .inner + .counters + .get(&TypeId::of::()) + .map(|x| x.fetch_add(1, Ordering::Relaxed)) + .unwrap_or(0); + + self.inner + .send(Msg { + inner, + index, + stream_id, + }) + .await } /// @@ -684,7 +286,7 @@ impl Bus { } /// - /// Closing providers and waiting when queues were drained + /// Closing providers and waiting when handler queues were drained #[inline] pub async fn close(&self) { self.inner.close(false).await; @@ -700,46 +302,136 @@ impl Bus { #[cfg(test)] mod tests { - use crate::Bus; - use async_stream::stream; + use std::{sync::Arc, time::Duration}; - #[tokio::test] - async fn test() { + use async_stream::stream; + use futures::{Future, Stream}; + use rand::RngCore; + + use crate::{ + handler::Handler, producer::Producer, Bus, DefaultBuilder, Error, IntoMessage, Message, + SharedBuilder, + }; + impl Message for u64 {} + impl Message for u32 {} + + #[derive(Default)] + struct TestProducer; + impl Producer for TestProducer { + type Item = u64; + type IntoMessage = impl IntoMessage; + + type Stream<'a> = impl Stream> + Send + 'a; + type FinalizeFut<'a> = impl Future> + Send + 'a; + + fn stream(&mut self, _msg: u32, _stream_id: u32, _task_id: u32) -> Self::Stream<'_> { + stream! { + for i in 0u64..10 { + yield Ok(i) + } + } + } + + fn finalize<'a>(self) -> Self::FinalizeFut<'a> { + async move { + println!("producer finalized"); + Ok(()) + } + } + } + + struct TestConsumer(u32); + impl Default for TestConsumer { + fn default() -> Self { + Self(rand::thread_rng().next_u32()) + } + } + + impl Handler for Arc { + type Result = (); + type IntoMessage = impl IntoMessage; + type HandleFut<'a> = impl Future> + Send + 'a; + type FinalizeFut<'a> = impl Future> + Send + 'a; + + fn handle(&mut self, msg: u64, stream_id: u32, task_id: u32) -> Self::HandleFut<'_> { + async move { + tokio::time::sleep(Duration::from_millis(1000)).await; + println!( + "[{}] shared consumer handle {}u64 ({}:{})", + self.0, msg, stream_id, task_id + ); + Ok(()) + } + } + + fn finalize<'a>(self) -> Self::FinalizeFut<'a> { + async move { + println!("[{}] shared consumer finalized", self.0); + Ok(()) + } + } + } + + impl Handler for TestConsumer { + type Result = (); + type IntoMessage = impl IntoMessage; + type HandleFut<'a> = impl Future> + Send + 'a; + type FinalizeFut<'a> = impl Future> + Send + 'a; + + fn handle(&mut self, msg: u64, stream_id: u32, task_id: u32) -> Self::HandleFut<'_> { + async move { + tokio::time::sleep(Duration::from_millis(1000)).await; + println!( + "[{}] consumer handle {}u64 ({}:{})", + self.0, msg, stream_id, task_id + ); + Ok(()) + } + } + + fn finalize<'a>(self) -> Self::FinalizeFut<'a> { + async move { + println!("[{}] consumer finalized", self.0); + Ok(()) + } + } + } + + // #[tokio::test] + async fn test_streams() { let bus = Bus::default(); - bus.register( - 1, - move |_| async move { Ok((1, ())) }, - move |_, sid, _, msg: u64| async move { - println!("MSG: {} {}", sid, msg); - Ok(()) - }, - |_, s| async move { - println!("handler {} finalized", s); + bus.register(DefaultBuilder::::new(2)) + .await; - Ok(()) - }, - ); - - bus.register_producer( - 1, - move |_, _msg: u32| async move { - Ok(stream! { - for i in 0u64..10 { - yield Ok( i) - } - }) - }, - |s, _| async move { - println!("producer {} finalized", s); - Ok(()) - }, - ); + bus.register_producer(DefaultBuilder::::new(2)) + .await; for start in 0u32..10 { bus.send_with_stream(start, start).await.unwrap(); } + bus.close().await; + bus.wait().await; + } + + #[tokio::test] + async fn test_tasks_shared() { + let bus = Bus::default(); + + bus.register(SharedBuilder::new(2, 5, false, |sid, _tid| async move { + Ok(TestConsumer(sid)) + })) + .await; + + bus.register_producer(DefaultBuilder::::new(2)) + .await; + + for start in 0u32..10 { + bus.send_with_stream(start, start).await.unwrap(); + } + + bus.close().await; bus.wait().await; } } diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..18d0716 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,28 @@ +use core::fmt; + +#[derive(Debug, Clone)] +pub(crate) struct Msg { + pub(crate) inner: M, + pub(crate) index: u64, + pub(crate) stream_id: u32, +} + +pub trait Message: fmt::Debug + Clone + Send + Sync + 'static {} + +impl Message for () {} + +pub trait IntoMessage: Send { + fn into_message(self) -> Option; +} + +impl IntoMessage for Option { + fn into_message(self) -> Option { + self + } +} + +impl IntoMessage for M { + fn into_message(self) -> Option { + Some(self) + } +} diff --git a/src/producer.rs b/src/producer.rs new file mode 100644 index 0000000..51cc0a6 --- /dev/null +++ b/src/producer.rs @@ -0,0 +1,116 @@ +use std::{marker::PhantomData, pin::pin, sync::Arc}; + +use futures::{Future, Stream, StreamExt}; +use tokio::sync::Notify; + +use crate::{ + builder::Builder, + chan::{channel, Sender}, + message::Msg, + task::{TaskCounter, TaskSpawner}, + BusInner, Error, IntoMessage, Message, +}; + +pub trait Producer: Send + Sync + 'static { + type Item: Message; + type IntoMessage: IntoMessage; + type Stream<'a>: Stream> + Send + 'a + where + Self: 'a; + + type FinalizeFut<'a>: Future> + Send + 'a + where + Self: 'a; + + fn stream(&mut self, msg: M, stream_id: u32, task_id: u32) -> Self::Stream<'_>; + fn finalize<'a>(self) -> Self::FinalizeFut<'a>; +} + +pub(crate) struct ProducerSpawner { + pub(crate) builder: B, + _m: PhantomData, +} + +impl ProducerSpawner { + pub(crate) fn new(builder: B) -> Self { + Self { + builder, + _m: PhantomData, + } + } +} + +impl> TaskSpawner for ProducerSpawner +where + B::Context: Producer, +{ + fn spawn_task( + &self, + stream_id: u32, + task_id: u32, + abort: Arc, + task_counter: Arc, + bus: Arc, + ) -> Box>, Error>> + Send + '_> { + Box::new(async move { + let (tx, rx) = channel::>(self.builder.queue_size(stream_id, task_id)); + let mut ctx = self.builder.build(stream_id, task_id).await?; + + let _handle = tokio::spawn(async move { + while let Some(recv_msg) = rx.recv().await { + task_counter.inc_running(); + + let mut stream = pin!(ctx + .stream(recv_msg.inner, stream_id, task_id) + .take_until(abort.notified())); + + let mut index = 0; + + loop { + index += 1; + + match stream.next().await { + Some(Ok(msg)) => { + if let Some(inner) = msg.into_message() { + if let Err(err) = bus + .send(Msg { + inner, + index: index - 1, + stream_id, + }) + .await + { + println!("BUS SEND ERROR: {:?}", err); + continue; + } + } + } + Some(Err(err)) => { + println!("PRODUCER ERROR: {:?}", err); + continue; + } + + None => break, + } + } + + task_counter.dec_running(rx.is_empty()); + } + + if let Err(err) = ctx.finalize().await { + println!("TASK FINALIZE ERROR: {:?}", err); + } + }); + + Ok(tx) + }) + } + + fn is_producer(&self) -> bool { + true + } + + fn parallel(&self, stream_id: u32) -> (u32, bool) { + self.builder.parallel(stream_id) + } +} diff --git a/src/rand.rs b/src/rand.rs new file mode 100644 index 0000000..e976b49 --- /dev/null +++ b/src/rand.rs @@ -0,0 +1,75 @@ +use std::sync::Mutex; + +use rand::{RngCore, SeedableRng}; +use rand_xorshift::XorShiftRng; + +static RNG_SEEDS: [u8; 256] = [ + 7, 86, 77, 188, 83, 136, 60, 245, 248, 212, 156, 114, 143, 47, 160, 72, 190, 243, 158, 20, 240, + 198, 25, 8, 27, 229, 179, 165, 186, 148, 239, 118, 187, 24, 152, 154, 102, 45, 101, 159, 8, 33, + 158, 161, 42, 183, 189, 173, 58, 200, 121, 45, 11, 168, 245, 161, 186, 43, 244, 251, 244, 246, + 137, 244, 112, 157, 102, 234, 65, 138, 23, 105, 46, 192, 114, 52, 233, 28, 93, 207, 186, 94, + 55, 24, 182, 170, 61, 90, 180, 120, 32, 229, 219, 144, 227, 255, 18, 148, 69, 118, 164, 66, 5, + 243, 18, 190, 21, 224, 151, 225, 229, 136, 112, 1, 181, 246, 64, 53, 249, 166, 22, 104, 255, + 239, 127, 125, 29, 174, 115, 212, 213, 211, 230, 111, 194, 16, 20, 115, 192, 109, 254, 157, + 175, 228, 10, 173, 10, 208, 214, 129, 57, 120, 53, 188, 24, 147, 223, 108, 77, 151, 1, 245, + 151, 38, 186, 95, 28, 242, 0, 2, 161, 86, 154, 73, 138, 225, 40, 235, 26, 195, 42, 229, 15, + 149, 41, 53, 230, 175, 36, 88, 205, 61, 113, 204, 253, 150, 193, 47, 231, 102, 23, 182, 225, + 197, 29, 193, 120, 171, 198, 164, 148, 206, 57, 197, 193, 201, 102, 147, 249, 4, 230, 22, 75, + 40, 145, 144, 113, 152, 115, 170, 41, 29, 154, 166, 74, 71, 23, 148, 139, 244, 114, 139, 66, + 73, 231, 167, 59, 201, 33, 58, 188, 120, 70, 142, 251, 95, +]; + +struct RngGenInner { + rng: XorShiftRng, + counter: u32, +} + +impl RngGenInner { + #[inline] + fn next_u32_pair(&mut self, count: u32) -> (u32, u32) { + if self.counter >= 16 { + self.reseed_next(); + self.counter = 0; + } + + let r1 = self.rng.next_u32() % count; + let r2 = loop { + let val = self.rng.next_u32() % count; + if r1 != val { + break val; + } + }; + + self.counter += 1; + + (r1, r2) + } + + fn reseed_next(&mut self) { + let offset = (self.rng.next_u32() % 240) as usize; + let seed: [u8; 16] = RNG_SEEDS[offset..offset + 16].try_into().unwrap(); + self.rng = XorShiftRng::from_seed(seed); + } +} + +pub(crate) struct RndGen { + inner: Mutex, +} + +impl Default for RndGen { + fn default() -> Self { + Self { + inner: Mutex::new(RngGenInner { + rng: XorShiftRng::from_seed(RNG_SEEDS[0..16].try_into().unwrap()), + counter: 0, + }), + } + } +} + +impl RndGen { + #[inline] + pub fn next_u32_pair(&self, count: u32) -> (u32, u32) { + self.inner.lock().unwrap().next_u32_pair(count) + } +} diff --git a/src/reorder_queue.rs b/src/reorder_queue.rs new file mode 100644 index 0000000..b971b17 --- /dev/null +++ b/src/reorder_queue.rs @@ -0,0 +1,68 @@ +use std::{cmp::Ordering, collections::BinaryHeap}; + +use crate::{message::Msg, Message}; + +struct Entry(Msg); + +impl PartialOrd for Entry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(other.0.index.cmp(&self.0.index)) + } +} + +impl Ord for Entry { + fn cmp(&self, other: &Self) -> Ordering { + other.0.index.cmp(&self.0.index) + } +} + +impl PartialEq for Entry { + fn eq(&self, other: &Self) -> bool { + other.0.index.eq(&self.0.index) + } +} + +impl Eq for Entry {} + +pub(crate) struct ReorderQueue { + cap: usize, + recent_index: Option, + heap: BinaryHeap>, +} + +impl ReorderQueue { + pub fn new(cap: usize) -> Self { + Self { + cap, + recent_index: None, + heap: BinaryHeap::with_capacity(cap + 1), + } + } + + pub fn push(&mut self, msg: Msg) { + self.heap.push(Entry(msg)); + + if self.heap.len() == self.cap { + self.recent_index = None; + } + } + + pub fn pop(&mut self) -> Option> { + match self.recent_index { + None => { + let e = self.heap.pop()?; + self.recent_index = Some(e.0.index); + Some(e.0) + } + Some(ri) => { + let e = self.heap.peek()?; + if e.0.index == ri + 1 { + self.recent_index = Some(e.0.index); + Some(self.heap.pop()?.0) + } else { + None + } + } + } + } +} diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..a6040c0 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,110 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use futures::Future; +use tokio::sync::Notify; + +use crate::{ + chan::{BusSender, Sender}, + handler::HandlerSpawner, + message::Msg, + producer::ProducerSpawner, + Builder, BusInner, Error, Handler, Message, Producer, +}; + +#[derive(Default)] +pub(crate) struct TaskCounter { + running: AtomicUsize, + notify: Notify, +} + +impl TaskCounter { + #[inline] + pub fn inc_running(&self) { + self.running.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn dec_running(&self, notify: bool) { + let prev = self.running.fetch_sub(1, Ordering::Relaxed); + if notify && prev == 1 { + self.notify.notify_waiters(); + } + } + + #[inline] + pub async fn wait(&self) { + self.notify.notified().await + } +} + +pub(crate) trait TaskSpawner: Send + Sync { + fn parallel(&self, stream_id: u32) -> (u32, bool); + fn is_producer(&self) -> bool; + fn spawn_task( + &self, + stream_id: u32, + task_id: u32, + abort: Arc, + task_counter: Arc, + bus: Arc, + ) -> Box>, Error>> + Send + '_>; +} + +pub(crate) struct TaskSpawnerWrapper { + inner: Arc>, +} + +impl Clone for TaskSpawnerWrapper { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl TaskSpawnerWrapper { + pub fn from_handler + 'static>(builder: B) -> Self + where + B::Context: Handler, + { + Self { + inner: Arc::new(HandlerSpawner::new(builder)) as _, + } + } + + pub fn from_producer + 'static>(builder: B) -> Self + where + B::Context: Producer, + { + Self { + inner: Arc::new(ProducerSpawner::new(builder)) as _, + } + } + + #[inline] + pub async fn spawn_task( + &self, + stream_id: u32, + task_id: u32, + abort: Arc, + task_counter: Arc, + bus: Arc, + ) -> Result, Error> { + Ok(BusSender::new( + self.inner.is_producer(), + Box::into_pin( + self.inner + .spawn_task(stream_id, task_id, abort, task_counter, bus), + ) + .await?, + )) + } + + #[inline] + pub(crate) fn parallel(&self, stream_id: u32) -> (u32, bool) { + self.inner.parallel(stream_id) + } +}