Compare commits
No commits in common. "next" and "master" have entirely different histories.
16
Cargo.toml
16
Cargo.toml
@ -1,26 +1,16 @@
|
||||
[package]
|
||||
name = "messagebus"
|
||||
version = "0.15.1"
|
||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
description = "MessageBus allows intercommunicate with messages between modules"
|
||||
exclude = [".gitignore", ".cargo/config", ".github/**", ".drone.yml"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.75"
|
||||
boxcar = "0.2.3"
|
||||
dashmap = "5.5.0"
|
||||
futures = "0.3.28"
|
||||
kanal = "0.1.0-pre8"
|
||||
log = "0.4.20"
|
||||
pin-project-lite = "0.2.13"
|
||||
priority-queue = "1.3.2"
|
||||
rand = { version = "0.8.5", default-features = false, features = ["std_rng", "std"] }
|
||||
rand_xorshift = "0.3.0"
|
||||
tokio = { version = "1.32.0", features = ["sync", "rt", "macros"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -1,83 +1,37 @@
|
||||
#![feature(return_position_impl_trait_in_trait)]
|
||||
#![feature(async_fn_in_trait)]
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Error;
|
||||
use messagebus::{Builder, Bus, Context, Handler, IntoMessages, Message};
|
||||
use messagebus::{Bus, Error, Message};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Msg(pub i32);
|
||||
impl Message for Msg {}
|
||||
|
||||
pub struct Processor {
|
||||
_state: i32,
|
||||
}
|
||||
|
||||
impl Handler<Msg> for Processor {
|
||||
type Result = ();
|
||||
type Error = Error;
|
||||
|
||||
async fn handle(
|
||||
&mut self,
|
||||
_msg: Msg,
|
||||
_ctx: Context,
|
||||
_bus: Bus,
|
||||
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_error(
|
||||
&mut self,
|
||||
_err: messagebus::Error,
|
||||
_ctx: Context,
|
||||
_bus: messagebus::Bus,
|
||||
) -> Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct ProcSpawner;
|
||||
impl Builder<Msg> for ProcSpawner {
|
||||
type Context = Processor;
|
||||
|
||||
async fn build(
|
||||
&self,
|
||||
stream_id: u32,
|
||||
_task_id: u32,
|
||||
) -> Result<Self::Context, messagebus::Error> {
|
||||
Ok(Processor {
|
||||
_state: stream_id as _,
|
||||
})
|
||||
}
|
||||
state: i32,
|
||||
}
|
||||
|
||||
impl Processor {
|
||||
pub async fn spawn(_sid: u32) -> Result<(usize, Self), Error> {
|
||||
Ok((4, Self { _state: 0 }))
|
||||
pub async fn spawn(sid: u32) -> Result<(usize, Self), Error> {
|
||||
Ok((4, Self { state: 0 }))
|
||||
}
|
||||
|
||||
pub async fn handler_msg(
|
||||
self: Arc<Self>,
|
||||
_sid: u32,
|
||||
_tid: u32,
|
||||
_msg: Msg,
|
||||
) -> Result<(), Error> {
|
||||
pub async fn handler_msg(self: Arc<Self>, sid: u32, tid: u32, msg: Msg) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn finalize_msg_handler(self: Arc<Self>, _sid: u32) -> Result<(), Error> {
|
||||
pub async fn finalize_msg_handler(self: Arc<Self>, sid: u32) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn run() {
|
||||
let bus = Bus::new();
|
||||
bus.register(ProcSpawner).await;
|
||||
bus.register(
|
||||
4,
|
||||
Processor::spawn,
|
||||
Processor::handler_msg,
|
||||
Processor::finalize_msg_handler,
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -1,141 +0,0 @@
|
||||
use std::{marker::PhantomData, pin::Pin};
|
||||
|
||||
use futures::{Stream, StreamExt};
|
||||
|
||||
pub trait AsyncIterator: Send {
|
||||
type Item: Send;
|
||||
|
||||
fn next(self: Pin<&mut Self>) -> impl futures::Future<Output = Option<Self::Item>> + Send + '_;
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(0, None)
|
||||
}
|
||||
|
||||
fn map<U: Send, C: Send + FnMut(Self::Item) -> U>(self, cb: C) -> impl AsyncIterator<Item = U>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Map { inner: self, cb }
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
pub struct Map<U, I: AsyncIterator, F: FnMut(I::Item) ->U> {
|
||||
#[pin]
|
||||
inner: I,
|
||||
cb: F,
|
||||
}
|
||||
}
|
||||
|
||||
impl<U: Send, I: AsyncIterator, F: Send + FnMut(I::Item) -> U> AsyncIterator for Map<U, I, F> {
|
||||
type Item = U;
|
||||
|
||||
async fn next(self: Pin<&mut Self>) -> Option<U> {
|
||||
let this = self.project();
|
||||
Some((this.cb)(this.inner.next().await?))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Iter<I: Iterator> {
|
||||
inner: I,
|
||||
}
|
||||
|
||||
impl<I: Send + Iterator + Unpin> AsyncIterator for Iter<I>
|
||||
where
|
||||
I::Item: Send,
|
||||
{
|
||||
type Item = I::Item;
|
||||
|
||||
#[inline]
|
||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
||||
self.get_mut().inner.next()
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter<I: IntoIterator + Send>(inner: I) -> Iter<I::IntoIter>
|
||||
where
|
||||
I::IntoIter: Send + Unpin,
|
||||
I::Item: Send,
|
||||
{
|
||||
Iter {
|
||||
inner: inner.into_iter(),
|
||||
}
|
||||
}
|
||||
pin_project_lite::pin_project! {
|
||||
pub struct StreamIter<S: Stream> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Send + Stream> AsyncIterator for StreamIter<S>
|
||||
where
|
||||
S::Item: Send,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
#[inline]
|
||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
||||
self.project().inner.next().await
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream<S: Send + Stream>(inner: S) -> StreamIter<S>
|
||||
where
|
||||
S::Item: Send,
|
||||
{
|
||||
StreamIter { inner }
|
||||
}
|
||||
|
||||
pub struct Once<I>(Option<I>);
|
||||
impl<I: Send + Unpin> AsyncIterator for Once<I> {
|
||||
type Item = I;
|
||||
|
||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
||||
self.get_mut().0.take()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn once<I: Send + Unpin>(item: I) -> Once<I> {
|
||||
Once(Some(item))
|
||||
}
|
||||
|
||||
pub struct Empty<I>(PhantomData<I>);
|
||||
impl<I: Send> AsyncIterator for Empty<I> {
|
||||
type Item = I;
|
||||
|
||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn empty<I: Send + Unpin>() -> Empty<I> {
|
||||
Empty(Default::default())
|
||||
}
|
||||
|
||||
impl<I: Send> AsyncIterator for tokio::sync::mpsc::Receiver<I> {
|
||||
type Item = I;
|
||||
|
||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
||||
self.get_mut().recv().await
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Send> AsyncIterator for tokio::sync::oneshot::Receiver<I> {
|
||||
type Item = I;
|
||||
|
||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
||||
match self.await {
|
||||
Ok(item) => Some(item),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
227
src/builder.rs
227
src/builder.rs
@ -1,227 +0,0 @@
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
use futures::Future;
|
||||
|
||||
use crate::{Error, Message};
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct Config {
|
||||
pub queue_size: usize,
|
||||
pub queue_per_task: bool,
|
||||
pub ordered: bool,
|
||||
pub task_count: u32,
|
||||
pub lazy_task_creation: bool,
|
||||
pub stream_per_message: bool,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
queue_size: 4,
|
||||
queue_per_task: false,
|
||||
ordered: false,
|
||||
task_count: 1,
|
||||
lazy_task_creation: true,
|
||||
stream_per_message: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Builder<M: Message>: Send + Sync + 'static {
|
||||
type Context: 'static;
|
||||
|
||||
fn config(&self, _stream_id: u32) -> Config {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
fn build(
|
||||
&self,
|
||||
stream_id: u32,
|
||||
_task_id: u32,
|
||||
) -> impl Future<Output = Result<Self::Context, Error>> + Send + '_;
|
||||
}
|
||||
|
||||
pub struct DefaultBuilder<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> {
|
||||
config: Config,
|
||||
callback: C,
|
||||
_m: PhantomData<(M, H, F)>,
|
||||
}
|
||||
|
||||
unsafe impl<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> Sync
|
||||
for DefaultBuilder<M, H, C, F>
|
||||
{
|
||||
}
|
||||
|
||||
impl<M, H, C, F> DefaultBuilder<M, H, C, F>
|
||||
where
|
||||
M: Message,
|
||||
H: Sync + Send + 'static,
|
||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
||||
{
|
||||
pub fn new(queue_size: usize, callback: C) -> Self {
|
||||
Self {
|
||||
config: Config {
|
||||
queue_size,
|
||||
queue_per_task: false,
|
||||
ordered: false,
|
||||
task_count: 1,
|
||||
lazy_task_creation: true,
|
||||
stream_per_message: false,
|
||||
},
|
||||
callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ordered(self) -> Self {
|
||||
let mut config = self.config;
|
||||
config.ordered = true;
|
||||
|
||||
Self {
|
||||
config,
|
||||
callback: self.callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream_per_message(self) -> Self {
|
||||
let mut config = self.config;
|
||||
config.stream_per_message = true;
|
||||
|
||||
Self {
|
||||
config,
|
||||
callback: self.callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tasks(self, tasks: u32) -> Self {
|
||||
let mut config = self.config;
|
||||
config.task_count = tasks;
|
||||
|
||||
Self {
|
||||
config,
|
||||
callback: self.callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, H, C, F> Builder<M> for DefaultBuilder<M, H, C, F>
|
||||
where
|
||||
M: Message,
|
||||
H: Sync + Send + 'static,
|
||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
||||
{
|
||||
type Context = H;
|
||||
|
||||
async fn build(&self, stream_id: u32, task_id: u32) -> Result<Self::Context, Error> {
|
||||
(self.callback)(stream_id, task_id).await
|
||||
}
|
||||
|
||||
fn config(&self, _stream_id: u32) -> Config {
|
||||
self.config
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SharedBuilder<M, H, C, F> {
|
||||
config: Config,
|
||||
stream_handlers: dashmap::DashMap<u32, Arc<H>>,
|
||||
callback: C,
|
||||
_m: PhantomData<(M, F)>,
|
||||
}
|
||||
|
||||
unsafe impl<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> Sync
|
||||
for SharedBuilder<M, H, C, F>
|
||||
{
|
||||
}
|
||||
|
||||
impl<M, H, C, F> SharedBuilder<M, H, C, F>
|
||||
where
|
||||
M: Message,
|
||||
H: Sync + Send + 'static,
|
||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
||||
{
|
||||
pub fn new(queue_size: usize, task_count: u32, callback: C) -> Self {
|
||||
Self {
|
||||
config: Config {
|
||||
queue_size,
|
||||
queue_per_task: false,
|
||||
ordered: false,
|
||||
task_count,
|
||||
lazy_task_creation: true,
|
||||
stream_per_message: false,
|
||||
},
|
||||
stream_handlers: Default::default(),
|
||||
callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream_per_message(self) -> Self {
|
||||
let mut config = self.config;
|
||||
config.stream_per_message = true;
|
||||
|
||||
Self {
|
||||
config,
|
||||
callback: self.callback,
|
||||
_m: PhantomData,
|
||||
stream_handlers: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ordered(self) -> Self {
|
||||
let mut config = self.config;
|
||||
config.ordered = true;
|
||||
|
||||
Self {
|
||||
config,
|
||||
stream_handlers: self.stream_handlers,
|
||||
callback: self.callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn queue_per_task(self) -> Self {
|
||||
let mut config = self.config;
|
||||
config.queue_per_task = true;
|
||||
|
||||
Self {
|
||||
config,
|
||||
stream_handlers: self.stream_handlers,
|
||||
callback: self.callback,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, H, C, F> Builder<M> for SharedBuilder<M, H, C, F>
|
||||
where
|
||||
M: Message,
|
||||
H: Sync + Send + 'static,
|
||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
||||
{
|
||||
type Context = Arc<H>;
|
||||
|
||||
async fn build(&self, stream_id: u32, task_id: u32) -> Result<Self::Context, Error> {
|
||||
if self.stream_handlers.contains_key(&stream_id) {
|
||||
return Ok(self.stream_handlers.get(&stream_id).unwrap().clone());
|
||||
}
|
||||
|
||||
let val = match (self.callback)(stream_id, task_id).await {
|
||||
Ok(val) => Arc::new(val),
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
self.stream_handlers.insert(stream_id, val.clone());
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
fn config(&self, _stream_id: u32) -> Config {
|
||||
self.config
|
||||
}
|
||||
}
|
147
src/chan.rs
147
src/chan.rs
@ -1,147 +0,0 @@
|
||||
use std::{
|
||||
any::{Any, TypeId},
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
|
||||
use crate::{message::Msg, Error, Message};
|
||||
|
||||
enum ChannelItem<T> {
|
||||
Value(T),
|
||||
Close,
|
||||
}
|
||||
|
||||
pub(crate) trait AbstractSender: Any + Send + Sync {
|
||||
fn upcast(&self) -> &(dyn Any + Send + Sync);
|
||||
fn close(&self);
|
||||
fn message_type_id(&self) -> TypeId;
|
||||
}
|
||||
|
||||
impl<M: Any + Send + Sync> AbstractSender for kanal::AsyncSender<M> {
|
||||
fn upcast(&self) -> &(dyn Any + Send + Sync) {
|
||||
self
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.close();
|
||||
}
|
||||
|
||||
fn message_type_id(&self) -> TypeId {
|
||||
TypeId::of::<M>()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait BusSenderClose: Any + Send + Sync {
|
||||
fn upcast(&self) -> &(dyn Any + Send + Sync);
|
||||
fn is_producer(&self) -> bool;
|
||||
fn load(&self) -> (usize, usize);
|
||||
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>>;
|
||||
fn terminate(&self) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
pub(crate) struct BusSender<M: Message> {
|
||||
is_producer: bool,
|
||||
tx: Sender<Msg<M>>,
|
||||
}
|
||||
|
||||
impl<M: Message> BusSenderClose for BusSender<M> {
|
||||
fn upcast(&self) -> &(dyn Any + Send + Sync) {
|
||||
self
|
||||
}
|
||||
|
||||
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>> {
|
||||
Box::pin(async move {
|
||||
self.tx.stop().await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn terminate(&self) -> Result<(), Error> {
|
||||
self.tx.close()
|
||||
}
|
||||
|
||||
fn is_producer(&self) -> bool {
|
||||
self.is_producer
|
||||
}
|
||||
|
||||
fn load(&self) -> (usize, usize) {
|
||||
self.tx.load()
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> BusSender<M> {
|
||||
pub async fn send(&self, m: Msg<M>) -> Result<(), Error> {
|
||||
self.tx.send(m).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn new(is_producer: bool, tx: Sender<Msg<M>>) -> BusSender<M> {
|
||||
Self { is_producer, tx }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Sender<T> {
|
||||
inner: kanal::AsyncSender<ChannelItem<T>>,
|
||||
}
|
||||
|
||||
impl<T> Sender<T> {
|
||||
pub async fn send(&self, msg: T) -> Result<(), Error> {
|
||||
self.inner
|
||||
.send(ChannelItem::Value(msg))
|
||||
.await
|
||||
.map_err(Error::SendError)
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<(), Error> {
|
||||
self.inner
|
||||
.send(ChannelItem::Close)
|
||||
.await
|
||||
.map_err(Error::SendError)
|
||||
}
|
||||
|
||||
pub fn close(&self) -> Result<(), Error> {
|
||||
self.inner.close();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load(&self) -> (usize, usize) {
|
||||
(self.inner.len(), self.inner.capacity())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Receiver<T> {
|
||||
inner: kanal::AsyncReceiver<ChannelItem<T>>,
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
#[inline]
|
||||
pub async fn recv(&self) -> Option<T> {
|
||||
let Ok(item) = self.inner.recv().await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
match item {
|
||||
ChannelItem::Value(val) => Some(val),
|
||||
ChannelItem::Close => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn load(&self) -> (u32, u32) {
|
||||
(self.inner.len() as _, self.inner.capacity() as _)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.inner.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
|
||||
let (tx, rx) = kanal::bounded_async(cap);
|
||||
|
||||
(Sender { inner: tx }, Receiver { inner: rx })
|
||||
}
|
91
src/error.rs
91
src/error.rs
@ -1,91 +0,0 @@
|
||||
use std::{fmt, sync::Arc};
|
||||
|
||||
use kanal::{ReceiveError, SendError};
|
||||
|
||||
use crate::message::ErrorMessage;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
HandlerIsNotRegistered,
|
||||
Aborted,
|
||||
SendError(SendError),
|
||||
ReceiveError(kanal::ReceiveError),
|
||||
ReorderingDropMessage(u64),
|
||||
HandlerError(Arc<dyn ErrorMessage>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum VoidError {}
|
||||
impl std::fmt::Display for VoidError {
|
||||
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {}
|
||||
impl ErrorMessage for VoidError {}
|
||||
|
||||
impl Eq for Error {}
|
||||
|
||||
impl PartialEq for Error {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Error::HandlerIsNotRegistered, Error::HandlerIsNotRegistered) => true,
|
||||
(Error::Aborted, Error::Aborted) => true,
|
||||
(Error::SendError(err1), Error::SendError(err2)) => err1.eq(err2),
|
||||
(Error::ReceiveError(err1), Error::ReceiveError(err2)) => err1.eq(err2),
|
||||
(Error::ReorderingDropMessage(idx1), Error::ReorderingDropMessage(idx2)) => {
|
||||
idx1.eq(idx2)
|
||||
}
|
||||
(Error::HandlerError(err1), Error::HandlerError(err2)) => Arc::ptr_eq(err1, err2),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Error::HandlerError(err) => writeln!(f, "Handler Error: {}", err)?,
|
||||
Error::HandlerIsNotRegistered => writeln!(f, "Handle is not registered!")?,
|
||||
Error::Aborted => writeln!(f, "Operation Aborted!")?,
|
||||
Error::SendError(reason) => writeln!(f, "Channel send error; reason {}", reason)?,
|
||||
Error::ReceiveError(reason) => writeln!(f, "Channel receive error; reason {}", reason)?,
|
||||
Error::ReorderingDropMessage(index) => writeln!(
|
||||
f,
|
||||
"Reordering drop message #{} (out of bound the queue)",
|
||||
index
|
||||
)?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Error {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Error::HandlerError(err) => Error::HandlerError(err.clone()),
|
||||
Error::SendError(err) => match err {
|
||||
SendError::Closed => Error::SendError(SendError::Closed),
|
||||
SendError::ReceiveClosed => Error::SendError(SendError::ReceiveClosed),
|
||||
},
|
||||
Error::ReceiveError(err) => match err {
|
||||
ReceiveError::Closed => Error::ReceiveError(ReceiveError::Closed),
|
||||
ReceiveError::SendClosed => Error::ReceiveError(ReceiveError::SendClosed),
|
||||
},
|
||||
Error::HandlerIsNotRegistered => Error::HandlerIsNotRegistered,
|
||||
Error::Aborted => Error::Aborted,
|
||||
Error::ReorderingDropMessage(idx) => Error::ReorderingDropMessage(*idx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> From<E> for Error
|
||||
where
|
||||
E: ErrorMessage,
|
||||
{
|
||||
fn from(error: E) -> Self {
|
||||
Self::HandlerError(Arc::new(error))
|
||||
}
|
||||
}
|
243
src/handler.rs
243
src/handler.rs
@ -1,243 +0,0 @@
|
||||
use std::{
|
||||
any::{type_name, Any},
|
||||
marker::PhantomData,
|
||||
pin::{pin, Pin},
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::{
|
||||
builder::Builder,
|
||||
chan::Receiver,
|
||||
message::{IntoMessages, Msg},
|
||||
task::{TaskCounter, TaskSpawner},
|
||||
AsyncIterator, Bus, BusInner, Error, ErrorMessage, Message,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct Context {
|
||||
pub stream_id: u32,
|
||||
pub task_id: u32,
|
||||
pub index: u64,
|
||||
pub load: (u32, u32),
|
||||
}
|
||||
|
||||
pub trait Handler<M: Message>: Send + 'static {
|
||||
type Result: Message + Unpin;
|
||||
type Error: ErrorMessage + Unpin;
|
||||
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: M,
|
||||
ctx: Context,
|
||||
bus: crate::Bus,
|
||||
) -> impl Future<
|
||||
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
|
||||
> + Send
|
||||
+ '_;
|
||||
|
||||
fn handle_error(
|
||||
&mut self,
|
||||
_err: Error,
|
||||
_ctx: Context,
|
||||
_bus: crate::Bus,
|
||||
) -> impl Future<
|
||||
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
|
||||
> + Send
|
||||
+ '_;
|
||||
|
||||
fn finalize(self, bus: crate::Bus) -> impl Future<Output = Result<(), Self::Error>> + Send;
|
||||
}
|
||||
|
||||
pub(crate) struct HandlerSpawner<M, B> {
|
||||
pub(crate) builder: B,
|
||||
_m: PhantomData<M>,
|
||||
}
|
||||
|
||||
impl<M, B> HandlerSpawner<M, B> {
|
||||
pub(crate) fn new(builder: B) -> Self {
|
||||
Self {
|
||||
builder,
|
||||
_m: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message, B: Builder<M>> TaskSpawner<M> for HandlerSpawner<M, B>
|
||||
where
|
||||
B::Context: Any + Handler<M>,
|
||||
{
|
||||
fn spawn_task(
|
||||
&self,
|
||||
rx: Receiver<Msg<M>>,
|
||||
stream_id: u32,
|
||||
task_id: u32,
|
||||
_abort: Arc<Notify>,
|
||||
task_counter: Arc<TaskCounter>,
|
||||
spawn_counter: Arc<TaskCounter>,
|
||||
index_counter: Arc<AtomicU64>,
|
||||
bus: Arc<BusInner>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
|
||||
Box::pin(async move {
|
||||
let bus = Bus { inner: bus.clone() };
|
||||
|
||||
let config = self.builder.config(stream_id);
|
||||
let mut handler = self.builder.build(stream_id, task_id).await?;
|
||||
|
||||
let _handle = tokio::spawn(async move {
|
||||
let _test = spawn_counter.clone().lease_unit(|| true);
|
||||
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let _test = task_counter.clone().lease_unit(|| rx.is_empty());
|
||||
let ctx = Context {
|
||||
stream_id,
|
||||
task_id,
|
||||
index: msg.index,
|
||||
load: rx.load(),
|
||||
};
|
||||
|
||||
let res = match msg.inner {
|
||||
Some(Ok(m)) => {
|
||||
send_result(
|
||||
&bus.inner,
|
||||
&index_counter,
|
||||
msg.index,
|
||||
stream_id,
|
||||
&config,
|
||||
Some(
|
||||
handler
|
||||
.handle(m, ctx, bus.clone())
|
||||
.await
|
||||
.map(IntoMessages::into_messages),
|
||||
),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
send_result(
|
||||
&bus.inner,
|
||||
&index_counter,
|
||||
msg.index,
|
||||
stream_id,
|
||||
&config,
|
||||
Some(
|
||||
handler
|
||||
.handle_error(err, ctx, bus.clone())
|
||||
.await
|
||||
.map(IntoMessages::into_messages),
|
||||
),
|
||||
)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
send_result::<
|
||||
<B::Context as Handler<M>>::Result,
|
||||
<B::Context as Handler<M>>::Error,
|
||||
>(
|
||||
&bus.inner,
|
||||
&index_counter,
|
||||
msg.index,
|
||||
stream_id,
|
||||
&config,
|
||||
None::<Result<crate::Empty<_>, _>>,
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = res {
|
||||
println!(
|
||||
"Messagebus Send Error: {}/{} {}: {}",
|
||||
stream_id,
|
||||
task_id,
|
||||
type_name::<<B::Context as Handler<M>>::Result>(),
|
||||
err,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"TASK #{} of type `{}` ENDED",
|
||||
task_id,
|
||||
std::any::type_name::<B>()
|
||||
);
|
||||
|
||||
if let Err(err) = handler.finalize(bus.clone()).await {
|
||||
println!("TASK FINALIZE ERROR: {:?}", err);
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn is_producer(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn config(&self, stream_id: u32) -> crate::builder::Config {
|
||||
self.builder.config(stream_id)
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_result<'a, M: Message, E: ErrorMessage>(
|
||||
bus: &Arc<BusInner>,
|
||||
index_counter: &AtomicU64,
|
||||
index: u64,
|
||||
stream_id: u32,
|
||||
config: &crate::builder::Config,
|
||||
res: Option<Result<impl AsyncIterator<Item = Result<M, E>> + Send + 'a, E>>,
|
||||
) -> Result<(), Error> {
|
||||
let reorder_buff = if config.ordered && config.task_count > 1 {
|
||||
config.task_count
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let one = match res {
|
||||
Some(Ok(iter)) => {
|
||||
let hint = iter.size_hint();
|
||||
let mut iter = pin!(iter);
|
||||
match hint {
|
||||
(_, Some(0)) => None,
|
||||
(_, Some(1)) => iter.next().await,
|
||||
_ => {
|
||||
while let Some(item) = iter.as_mut().next().await {
|
||||
let index = index_counter.fetch_add(1, Ordering::Relaxed);
|
||||
let stream_id = if config.stream_per_message {
|
||||
bus.next_stream_id()
|
||||
} else {
|
||||
stream_id
|
||||
};
|
||||
|
||||
bus.send::<M>(
|
||||
Some(item.map_err(Into::into)),
|
||||
index,
|
||||
stream_id,
|
||||
reorder_buff,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => Some(Err(err)),
|
||||
None => None,
|
||||
};
|
||||
|
||||
bus.send(
|
||||
one.map(|x| x.map_err(Into::into)),
|
||||
index,
|
||||
stream_id,
|
||||
reorder_buff,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
1245
src/lib.rs
1245
src/lib.rs
File diff suppressed because it is too large
Load Diff
@ -1,76 +0,0 @@
|
||||
use core::fmt;
|
||||
use std::any::Any;
|
||||
|
||||
use futures::Stream;
|
||||
|
||||
use crate::{async_iter::AsyncIterator, Error, Iter, StreamIter};
|
||||
|
||||
pub trait Message: Any + fmt::Debug + Clone + Send + Sync + Unpin {}
|
||||
pub trait ErrorMessage: Any + fmt::Debug + fmt::Display + Send + Sync + Unpin {}
|
||||
pub trait IntoMessages<M: Message, E: ErrorMessage> {
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>;
|
||||
}
|
||||
|
||||
pub struct AsyncIter<I: AsyncIterator>(I);
|
||||
pub fn async_iter<I: AsyncIterator>(i: I) -> AsyncIter<I> {
|
||||
AsyncIter(i)
|
||||
}
|
||||
|
||||
impl Message for () {}
|
||||
impl ErrorMessage for anyhow::Error {}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct Msg<M: Message> {
|
||||
pub(crate) inner: Option<Result<M, Error>>,
|
||||
pub(crate) index: u64,
|
||||
pub(crate) stream_id: u32,
|
||||
}
|
||||
|
||||
impl<M: Message, E: ErrorMessage, S: Stream<Item = Result<M, E>> + Send> IntoMessages<M, E>
|
||||
for StreamIter<S>
|
||||
{
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: ErrorMessage, I: Iterator + Send + Unpin> IntoMessages<I::Item, E> for Iter<I>
|
||||
where
|
||||
I::Item: Message,
|
||||
{
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<I::Item, E>> {
|
||||
self.map(Ok)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message, E: ErrorMessage, I: AsyncIterator<Item = Result<M, E>>> IntoMessages<M, E>
|
||||
for AsyncIter<I>
|
||||
{
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: ErrorMessage> IntoMessages<(), E> for () {
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> {
|
||||
crate::empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize, M: Message, E: ErrorMessage> IntoMessages<M, E> for [M; N] {
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
||||
crate::iter(self.into_iter().map(Ok))
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Vec<M> {
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
||||
crate::iter(self.into_iter().map(Ok))
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Option<M> {
|
||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
||||
crate::iter(self.into_iter().map(Ok))
|
||||
}
|
||||
}
|
75
src/rand.rs
75
src/rand.rs
@ -1,75 +0,0 @@
|
||||
use std::sync::Mutex;
|
||||
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use rand_xorshift::XorShiftRng;
|
||||
|
||||
static RNG_SEEDS: [u8; 256] = [
|
||||
7, 86, 77, 188, 83, 136, 60, 245, 248, 212, 156, 114, 143, 47, 160, 72, 190, 243, 158, 20, 240,
|
||||
198, 25, 8, 27, 229, 179, 165, 186, 148, 239, 118, 187, 24, 152, 154, 102, 45, 101, 159, 8, 33,
|
||||
158, 161, 42, 183, 189, 173, 58, 200, 121, 45, 11, 168, 245, 161, 186, 43, 244, 251, 244, 246,
|
||||
137, 244, 112, 157, 102, 234, 65, 138, 23, 105, 46, 192, 114, 52, 233, 28, 93, 207, 186, 94,
|
||||
55, 24, 182, 170, 61, 90, 180, 120, 32, 229, 219, 144, 227, 255, 18, 148, 69, 118, 164, 66, 5,
|
||||
243, 18, 190, 21, 224, 151, 225, 229, 136, 112, 1, 181, 246, 64, 53, 249, 166, 22, 104, 255,
|
||||
239, 127, 125, 29, 174, 115, 212, 213, 211, 230, 111, 194, 16, 20, 115, 192, 109, 254, 157,
|
||||
175, 228, 10, 173, 10, 208, 214, 129, 57, 120, 53, 188, 24, 147, 223, 108, 77, 151, 1, 245,
|
||||
151, 38, 186, 95, 28, 242, 0, 2, 161, 86, 154, 73, 138, 225, 40, 235, 26, 195, 42, 229, 15,
|
||||
149, 41, 53, 230, 175, 36, 88, 205, 61, 113, 204, 253, 150, 193, 47, 231, 102, 23, 182, 225,
|
||||
197, 29, 193, 120, 171, 198, 164, 148, 206, 57, 197, 193, 201, 102, 147, 249, 4, 230, 22, 75,
|
||||
40, 145, 144, 113, 152, 115, 170, 41, 29, 154, 166, 74, 71, 23, 148, 139, 244, 114, 139, 66,
|
||||
73, 231, 167, 59, 201, 33, 58, 188, 120, 70, 142, 251, 95,
|
||||
];
|
||||
|
||||
struct RngGenInner {
|
||||
rng: XorShiftRng,
|
||||
counter: u32,
|
||||
}
|
||||
|
||||
impl RngGenInner {
|
||||
#[inline]
|
||||
fn next_u32_pair(&mut self, count: u32) -> (u32, u32) {
|
||||
if self.counter >= 16 {
|
||||
self.reseed_next();
|
||||
self.counter = 0;
|
||||
}
|
||||
|
||||
let r1 = self.rng.next_u32() % count;
|
||||
let r2 = loop {
|
||||
let val = self.rng.next_u32() % count;
|
||||
if r1 != val {
|
||||
break val;
|
||||
}
|
||||
};
|
||||
|
||||
self.counter += 1;
|
||||
|
||||
(r1, r2)
|
||||
}
|
||||
|
||||
fn reseed_next(&mut self) {
|
||||
let offset = (self.rng.next_u32() % 240) as usize;
|
||||
let seed: [u8; 16] = RNG_SEEDS[offset..offset + 16].try_into().unwrap();
|
||||
self.rng = XorShiftRng::from_seed(seed);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RndGen {
|
||||
inner: Mutex<RngGenInner>,
|
||||
}
|
||||
|
||||
impl Default for RndGen {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(RngGenInner {
|
||||
rng: XorShiftRng::from_seed(RNG_SEEDS[0..16].try_into().unwrap()),
|
||||
counter: 0,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RndGen {
|
||||
#[inline]
|
||||
pub fn next_u32_pair(&self, count: u32) -> (u32, u32) {
|
||||
self.inner.lock().unwrap().next_u32_pair(count)
|
||||
}
|
||||
}
|
@ -1,147 +0,0 @@
|
||||
use std::{cmp::Ordering, collections::BinaryHeap};
|
||||
|
||||
struct Entry<M> {
|
||||
inner: M,
|
||||
index: u64,
|
||||
}
|
||||
|
||||
impl<M> PartialOrd for Entry<M> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(other.index.cmp(&self.index))
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> Ord for Entry<M> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.index.cmp(&self.index)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> PartialEq for Entry<M> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
other.index.eq(&self.index)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> Eq for Entry<M> {}
|
||||
|
||||
pub(crate) struct ReorderQueueInner<M> {
|
||||
cap: usize,
|
||||
recent_index: Option<u64>,
|
||||
heap: BinaryHeap<Entry<M>>,
|
||||
}
|
||||
|
||||
impl<M> ReorderQueueInner<M> {
|
||||
pub fn new(cap: usize) -> Self {
|
||||
Self {
|
||||
cap,
|
||||
recent_index: None,
|
||||
heap: BinaryHeap::with_capacity(cap + 1),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&mut self, index: u64, inner: M) -> Option<u64> {
|
||||
if let Some(ri) = self.recent_index {
|
||||
if index <= ri {
|
||||
return Some(index);
|
||||
}
|
||||
}
|
||||
|
||||
self.heap.push(Entry { inner, index });
|
||||
|
||||
if self.heap.len() > self.cap {
|
||||
let _ = self.heap.pop();
|
||||
|
||||
self.recent_index = self.recent_index.map(|x| x + 1);
|
||||
self.recent_index
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pop(&mut self) -> Option<(u64, M)> {
|
||||
if let Some(ri) = self.recent_index {
|
||||
let e = self.heap.peek()?;
|
||||
if e.index == ri + 1 {
|
||||
self.recent_index = Some(e.index);
|
||||
Some((e.index, self.heap.pop()?.inner))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
let e = self.heap.peek()?;
|
||||
|
||||
if e.index == 0 {
|
||||
let e = self.heap.pop()?;
|
||||
self.recent_index = Some(e.index);
|
||||
Some((e.index, e.inner))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn force_pop(&mut self) -> Option<(u64, M)> {
|
||||
let e = self.heap.pop()?;
|
||||
self.recent_index = Some(e.index);
|
||||
Some((e.index, e.inner))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::Message;
|
||||
|
||||
use super::ReorderQueueInner;
|
||||
|
||||
impl Message for i32 {}
|
||||
|
||||
#[test]
|
||||
fn test_reordering() {
|
||||
let mut queue = ReorderQueueInner::new(8);
|
||||
|
||||
assert_eq!(queue.push(0, 0), None);
|
||||
assert_eq!(queue.pop(), Some((0, 0)));
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(3, 3), None);
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(2, 2), None);
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(4, 4), None);
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(1, 1), None);
|
||||
assert_eq!(queue.pop(), Some((1, 1)));
|
||||
assert_eq!(queue.pop(), Some((2, 2)));
|
||||
assert_eq!(queue.pop(), Some((3, 3)));
|
||||
assert_eq!(queue.pop(), Some((4, 4)));
|
||||
assert_eq!(queue.pop(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_overflow() {
|
||||
let mut queue = ReorderQueueInner::new(4);
|
||||
assert_eq!(queue.push(0, 0), None);
|
||||
assert_eq!(queue.pop(), Some((0, 0)));
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(4, 4), None);
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(2, 2), None);
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(3, 3), None);
|
||||
assert_eq!(queue.pop(), None);
|
||||
|
||||
assert_eq!(queue.push(5, 5), Some(1));
|
||||
assert_eq!(queue.pop(), Some((2, 2)));
|
||||
assert_eq!(queue.pop(), Some((3, 3)));
|
||||
assert_eq!(queue.pop(), Some((4, 4)));
|
||||
assert_eq!(queue.pop(), Some((5, 5)));
|
||||
assert_eq!(queue.pop(), None);
|
||||
}
|
||||
}
|
137
src/task.rs
137
src/task.rs
@ -1,137 +0,0 @@
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::{
|
||||
builder::Config,
|
||||
chan::{BusSender, Receiver, Sender},
|
||||
handler::HandlerSpawner,
|
||||
message::Msg,
|
||||
Builder, BusInner, Error, Handler, Message,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TaskCounter {
|
||||
pub(crate) running: AtomicUsize,
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
pub(crate) struct TaskCounterLease<S: Fn() -> bool> {
|
||||
need_notify: S,
|
||||
counter: Arc<TaskCounter>,
|
||||
}
|
||||
|
||||
impl<S: Fn() -> bool> Drop for TaskCounterLease<S> {
|
||||
fn drop(&mut self) {
|
||||
let notify = (self.need_notify)();
|
||||
let prev = self.counter.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
if notify && prev == 1 {
|
||||
self.counter.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Fn() -> bool> TaskCounterLease<S> {
|
||||
fn new(counter: Arc<TaskCounter>, need_notify: S) -> Self {
|
||||
counter.running.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Self {
|
||||
counter,
|
||||
need_notify,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskCounter {
|
||||
pub fn lease_unit<S: Fn() -> bool>(self: Arc<Self>, need_notify: S) -> TaskCounterLease<S> {
|
||||
TaskCounterLease::new(self, need_notify)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn wait(&self) {
|
||||
self.notify.notified().await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait TaskSpawner<M: Message>: Send + Sync {
|
||||
fn config(&self, stream_id: u32) -> Config;
|
||||
fn is_producer(&self) -> bool;
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn spawn_task(
|
||||
&self,
|
||||
rx: Receiver<Msg<M>>,
|
||||
stream_id: u32,
|
||||
task_id: u32,
|
||||
abort: Arc<Notify>,
|
||||
task_counter: Arc<TaskCounter>,
|
||||
spawn_counter: Arc<TaskCounter>,
|
||||
index_counter: Arc<AtomicU64>,
|
||||
bus: Arc<BusInner>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
|
||||
}
|
||||
|
||||
pub(crate) struct TaskSpawnerWrapper<M: Message> {
|
||||
inner: Arc<dyn TaskSpawner<M>>,
|
||||
}
|
||||
|
||||
impl<M: Message> Clone for TaskSpawnerWrapper<M> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TaskSpawnerWrapper<M> {
|
||||
pub fn from_handler<B: Builder<M> + 'static>(builder: B) -> Self
|
||||
where
|
||||
B::Context: Handler<M>,
|
||||
{
|
||||
Self {
|
||||
inner: Arc::new(HandlerSpawner::new(builder)) as _,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn spawn_task(
|
||||
&self,
|
||||
(tx, rx): (Sender<Msg<M>>, Receiver<Msg<M>>),
|
||||
stream_id: u32,
|
||||
task_id: u32,
|
||||
abort: Arc<Notify>,
|
||||
task_counter: Arc<TaskCounter>,
|
||||
spawn_counter: Arc<TaskCounter>,
|
||||
index_counter: Arc<AtomicU64>,
|
||||
bus: Arc<BusInner>,
|
||||
) -> Result<BusSender<M>, Error> {
|
||||
self.inner
|
||||
.spawn_task(
|
||||
rx,
|
||||
stream_id,
|
||||
task_id,
|
||||
abort,
|
||||
task_counter,
|
||||
spawn_counter,
|
||||
index_counter,
|
||||
bus,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(BusSender::new(self.inner.is_producer(), tx))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn config(&self, stream_id: u32) -> Config {
|
||||
self.inner.config(stream_id)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user