Compare commits
No commits in common. "next" and "master" have entirely different histories.
16
Cargo.toml
16
Cargo.toml
@ -1,26 +1,16 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "messagebus"
|
name = "messagebus"
|
||||||
version = "0.15.1"
|
version = "0.1.0"
|
||||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
|
||||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
|
||||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
|
||||||
categories = ["network-programming", "asynchronous"]
|
|
||||||
description = "MessageBus allows intercommunicate with messages between modules"
|
|
||||||
exclude = [".gitignore", ".cargo/config", ".github/**", ".drone.yml"]
|
|
||||||
license = "MIT OR Apache-2.0"
|
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.75"
|
|
||||||
boxcar = "0.2.3"
|
boxcar = "0.2.3"
|
||||||
dashmap = "5.5.0"
|
dashmap = "5.5.0"
|
||||||
futures = "0.3.28"
|
futures = "0.3.28"
|
||||||
kanal = "0.1.0-pre8"
|
kanal = "0.1.0-pre8"
|
||||||
log = "0.4.20"
|
log = "0.4.20"
|
||||||
pin-project-lite = "0.2.13"
|
|
||||||
priority-queue = "1.3.2"
|
|
||||||
rand = { version = "0.8.5", default-features = false, features = ["std_rng", "std"] }
|
|
||||||
rand_xorshift = "0.3.0"
|
|
||||||
tokio = { version = "1.32.0", features = ["sync", "rt", "macros"] }
|
tokio = { version = "1.32.0", features = ["sync", "rt", "macros"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -1,83 +1,37 @@
|
|||||||
#![feature(return_position_impl_trait_in_trait)]
|
|
||||||
#![feature(async_fn_in_trait)]
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Error;
|
use messagebus::{Bus, Error, Message};
|
||||||
use messagebus::{Builder, Bus, Context, Handler, IntoMessages, Message};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Msg(pub i32);
|
pub struct Msg(pub i32);
|
||||||
impl Message for Msg {}
|
impl Message for Msg {}
|
||||||
|
|
||||||
pub struct Processor {
|
pub struct Processor {
|
||||||
_state: i32,
|
state: i32,
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler<Msg> for Processor {
|
|
||||||
type Result = ();
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
&mut self,
|
|
||||||
_msg: Msg,
|
|
||||||
_ctx: Context,
|
|
||||||
_bus: Bus,
|
|
||||||
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_error(
|
|
||||||
&mut self,
|
|
||||||
_err: messagebus::Error,
|
|
||||||
_ctx: Context,
|
|
||||||
_bus: messagebus::Bus,
|
|
||||||
) -> Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error> {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ProcSpawner;
|
|
||||||
impl Builder<Msg> for ProcSpawner {
|
|
||||||
type Context = Processor;
|
|
||||||
|
|
||||||
async fn build(
|
|
||||||
&self,
|
|
||||||
stream_id: u32,
|
|
||||||
_task_id: u32,
|
|
||||||
) -> Result<Self::Context, messagebus::Error> {
|
|
||||||
Ok(Processor {
|
|
||||||
_state: stream_id as _,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Processor {
|
impl Processor {
|
||||||
pub async fn spawn(_sid: u32) -> Result<(usize, Self), Error> {
|
pub async fn spawn(sid: u32) -> Result<(usize, Self), Error> {
|
||||||
Ok((4, Self { _state: 0 }))
|
Ok((4, Self { state: 0 }))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handler_msg(
|
pub async fn handler_msg(self: Arc<Self>, sid: u32, tid: u32, msg: Msg) -> Result<(), Error> {
|
||||||
self: Arc<Self>,
|
|
||||||
_sid: u32,
|
|
||||||
_tid: u32,
|
|
||||||
_msg: Msg,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn finalize_msg_handler(self: Arc<Self>, _sid: u32) -> Result<(), Error> {
|
pub async fn finalize_msg_handler(self: Arc<Self>, sid: u32) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run() {
|
async fn run() {
|
||||||
let bus = Bus::new();
|
let bus = Bus::new();
|
||||||
bus.register(ProcSpawner).await;
|
bus.register(
|
||||||
|
4,
|
||||||
|
Processor::spawn,
|
||||||
|
Processor::handler_msg,
|
||||||
|
Processor::finalize_msg_handler,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
@ -1,141 +0,0 @@
|
|||||||
use std::{marker::PhantomData, pin::Pin};
|
|
||||||
|
|
||||||
use futures::{Stream, StreamExt};
|
|
||||||
|
|
||||||
pub trait AsyncIterator: Send {
|
|
||||||
type Item: Send;
|
|
||||||
|
|
||||||
fn next(self: Pin<&mut Self>) -> impl futures::Future<Output = Option<Self::Item>> + Send + '_;
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
(0, None)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn map<U: Send, C: Send + FnMut(Self::Item) -> U>(self, cb: C) -> impl AsyncIterator<Item = U>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
Map { inner: self, cb }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project_lite::pin_project! {
|
|
||||||
pub struct Map<U, I: AsyncIterator, F: FnMut(I::Item) ->U> {
|
|
||||||
#[pin]
|
|
||||||
inner: I,
|
|
||||||
cb: F,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<U: Send, I: AsyncIterator, F: Send + FnMut(I::Item) -> U> AsyncIterator for Map<U, I, F> {
|
|
||||||
type Item = U;
|
|
||||||
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<U> {
|
|
||||||
let this = self.project();
|
|
||||||
Some((this.cb)(this.inner.next().await?))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Iter<I: Iterator> {
|
|
||||||
inner: I,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Send + Iterator + Unpin> AsyncIterator for Iter<I>
|
|
||||||
where
|
|
||||||
I::Item: Send,
|
|
||||||
{
|
|
||||||
type Item = I::Item;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
|
||||||
self.get_mut().inner.next()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
self.inner.size_hint()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn iter<I: IntoIterator + Send>(inner: I) -> Iter<I::IntoIter>
|
|
||||||
where
|
|
||||||
I::IntoIter: Send + Unpin,
|
|
||||||
I::Item: Send,
|
|
||||||
{
|
|
||||||
Iter {
|
|
||||||
inner: inner.into_iter(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pin_project_lite::pin_project! {
|
|
||||||
pub struct StreamIter<S: Stream> {
|
|
||||||
#[pin]
|
|
||||||
inner: S,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: Send + Stream> AsyncIterator for StreamIter<S>
|
|
||||||
where
|
|
||||||
S::Item: Send,
|
|
||||||
{
|
|
||||||
type Item = S::Item;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
|
||||||
self.project().inner.next().await
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
self.inner.size_hint()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stream<S: Send + Stream>(inner: S) -> StreamIter<S>
|
|
||||||
where
|
|
||||||
S::Item: Send,
|
|
||||||
{
|
|
||||||
StreamIter { inner }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Once<I>(Option<I>);
|
|
||||||
impl<I: Send + Unpin> AsyncIterator for Once<I> {
|
|
||||||
type Item = I;
|
|
||||||
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
|
||||||
self.get_mut().0.take()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn once<I: Send + Unpin>(item: I) -> Once<I> {
|
|
||||||
Once(Some(item))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Empty<I>(PhantomData<I>);
|
|
||||||
impl<I: Send> AsyncIterator for Empty<I> {
|
|
||||||
type Item = I;
|
|
||||||
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn empty<I: Send + Unpin>() -> Empty<I> {
|
|
||||||
Empty(Default::default())
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Send> AsyncIterator for tokio::sync::mpsc::Receiver<I> {
|
|
||||||
type Item = I;
|
|
||||||
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
|
||||||
self.get_mut().recv().await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Send> AsyncIterator for tokio::sync::oneshot::Receiver<I> {
|
|
||||||
type Item = I;
|
|
||||||
|
|
||||||
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
|
|
||||||
match self.await {
|
|
||||||
Ok(item) => Some(item),
|
|
||||||
Err(_) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
227
src/builder.rs
227
src/builder.rs
@ -1,227 +0,0 @@
|
|||||||
use std::{marker::PhantomData, sync::Arc};
|
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
|
|
||||||
use crate::{Error, Message};
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
pub struct Config {
|
|
||||||
pub queue_size: usize,
|
|
||||||
pub queue_per_task: bool,
|
|
||||||
pub ordered: bool,
|
|
||||||
pub task_count: u32,
|
|
||||||
pub lazy_task_creation: bool,
|
|
||||||
pub stream_per_message: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Config {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
queue_size: 4,
|
|
||||||
queue_per_task: false,
|
|
||||||
ordered: false,
|
|
||||||
task_count: 1,
|
|
||||||
lazy_task_creation: true,
|
|
||||||
stream_per_message: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait Builder<M: Message>: Send + Sync + 'static {
|
|
||||||
type Context: 'static;
|
|
||||||
|
|
||||||
fn config(&self, _stream_id: u32) -> Config {
|
|
||||||
Default::default()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build(
|
|
||||||
&self,
|
|
||||||
stream_id: u32,
|
|
||||||
_task_id: u32,
|
|
||||||
) -> impl Future<Output = Result<Self::Context, Error>> + Send + '_;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DefaultBuilder<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> {
|
|
||||||
config: Config,
|
|
||||||
callback: C,
|
|
||||||
_m: PhantomData<(M, H, F)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> Sync
|
|
||||||
for DefaultBuilder<M, H, C, F>
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M, H, C, F> DefaultBuilder<M, H, C, F>
|
|
||||||
where
|
|
||||||
M: Message,
|
|
||||||
H: Sync + Send + 'static,
|
|
||||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
|
||||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
|
||||||
{
|
|
||||||
pub fn new(queue_size: usize, callback: C) -> Self {
|
|
||||||
Self {
|
|
||||||
config: Config {
|
|
||||||
queue_size,
|
|
||||||
queue_per_task: false,
|
|
||||||
ordered: false,
|
|
||||||
task_count: 1,
|
|
||||||
lazy_task_creation: true,
|
|
||||||
stream_per_message: false,
|
|
||||||
},
|
|
||||||
callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ordered(self) -> Self {
|
|
||||||
let mut config = self.config;
|
|
||||||
config.ordered = true;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config,
|
|
||||||
callback: self.callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stream_per_message(self) -> Self {
|
|
||||||
let mut config = self.config;
|
|
||||||
config.stream_per_message = true;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config,
|
|
||||||
callback: self.callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn tasks(self, tasks: u32) -> Self {
|
|
||||||
let mut config = self.config;
|
|
||||||
config.task_count = tasks;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config,
|
|
||||||
callback: self.callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M, H, C, F> Builder<M> for DefaultBuilder<M, H, C, F>
|
|
||||||
where
|
|
||||||
M: Message,
|
|
||||||
H: Sync + Send + 'static,
|
|
||||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
|
||||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
|
||||||
{
|
|
||||||
type Context = H;
|
|
||||||
|
|
||||||
async fn build(&self, stream_id: u32, task_id: u32) -> Result<Self::Context, Error> {
|
|
||||||
(self.callback)(stream_id, task_id).await
|
|
||||||
}
|
|
||||||
|
|
||||||
fn config(&self, _stream_id: u32) -> Config {
|
|
||||||
self.config
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SharedBuilder<M, H, C, F> {
|
|
||||||
config: Config,
|
|
||||||
stream_handlers: dashmap::DashMap<u32, Arc<H>>,
|
|
||||||
callback: C,
|
|
||||||
_m: PhantomData<(M, F)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> Sync
|
|
||||||
for SharedBuilder<M, H, C, F>
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M, H, C, F> SharedBuilder<M, H, C, F>
|
|
||||||
where
|
|
||||||
M: Message,
|
|
||||||
H: Sync + Send + 'static,
|
|
||||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
|
||||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
|
||||||
{
|
|
||||||
pub fn new(queue_size: usize, task_count: u32, callback: C) -> Self {
|
|
||||||
Self {
|
|
||||||
config: Config {
|
|
||||||
queue_size,
|
|
||||||
queue_per_task: false,
|
|
||||||
ordered: false,
|
|
||||||
task_count,
|
|
||||||
lazy_task_creation: true,
|
|
||||||
stream_per_message: false,
|
|
||||||
},
|
|
||||||
stream_handlers: Default::default(),
|
|
||||||
callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stream_per_message(self) -> Self {
|
|
||||||
let mut config = self.config;
|
|
||||||
config.stream_per_message = true;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config,
|
|
||||||
callback: self.callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
stream_handlers: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ordered(self) -> Self {
|
|
||||||
let mut config = self.config;
|
|
||||||
config.ordered = true;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config,
|
|
||||||
stream_handlers: self.stream_handlers,
|
|
||||||
callback: self.callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn queue_per_task(self) -> Self {
|
|
||||||
let mut config = self.config;
|
|
||||||
config.queue_per_task = true;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config,
|
|
||||||
stream_handlers: self.stream_handlers,
|
|
||||||
callback: self.callback,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M, H, C, F> Builder<M> for SharedBuilder<M, H, C, F>
|
|
||||||
where
|
|
||||||
M: Message,
|
|
||||||
H: Sync + Send + 'static,
|
|
||||||
F: Send + Future<Output = Result<H, Error>> + 'static,
|
|
||||||
C: Sync + Send + Fn(u32, u32) -> F + 'static,
|
|
||||||
{
|
|
||||||
type Context = Arc<H>;
|
|
||||||
|
|
||||||
async fn build(&self, stream_id: u32, task_id: u32) -> Result<Self::Context, Error> {
|
|
||||||
if self.stream_handlers.contains_key(&stream_id) {
|
|
||||||
return Ok(self.stream_handlers.get(&stream_id).unwrap().clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let val = match (self.callback)(stream_id, task_id).await {
|
|
||||||
Ok(val) => Arc::new(val),
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.stream_handlers.insert(stream_id, val.clone());
|
|
||||||
Ok(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn config(&self, _stream_id: u32) -> Config {
|
|
||||||
self.config
|
|
||||||
}
|
|
||||||
}
|
|
147
src/chan.rs
147
src/chan.rs
@ -1,147 +0,0 @@
|
|||||||
use std::{
|
|
||||||
any::{Any, TypeId},
|
|
||||||
pin::Pin,
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
|
|
||||||
use crate::{message::Msg, Error, Message};
|
|
||||||
|
|
||||||
enum ChannelItem<T> {
|
|
||||||
Value(T),
|
|
||||||
Close,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) trait AbstractSender: Any + Send + Sync {
|
|
||||||
fn upcast(&self) -> &(dyn Any + Send + Sync);
|
|
||||||
fn close(&self);
|
|
||||||
fn message_type_id(&self) -> TypeId;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Any + Send + Sync> AbstractSender for kanal::AsyncSender<M> {
|
|
||||||
fn upcast(&self) -> &(dyn Any + Send + Sync) {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
fn close(&self) {
|
|
||||||
self.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn message_type_id(&self) -> TypeId {
|
|
||||||
TypeId::of::<M>()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) trait BusSenderClose: Any + Send + Sync {
|
|
||||||
fn upcast(&self) -> &(dyn Any + Send + Sync);
|
|
||||||
fn is_producer(&self) -> bool;
|
|
||||||
fn load(&self) -> (usize, usize);
|
|
||||||
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>>;
|
|
||||||
fn terminate(&self) -> Result<(), Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct BusSender<M: Message> {
|
|
||||||
is_producer: bool,
|
|
||||||
tx: Sender<Msg<M>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message> BusSenderClose for BusSender<M> {
|
|
||||||
fn upcast(&self) -> &(dyn Any + Send + Sync) {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>> {
|
|
||||||
Box::pin(async move {
|
|
||||||
self.tx.stop().await?;
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn terminate(&self) -> Result<(), Error> {
|
|
||||||
self.tx.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_producer(&self) -> bool {
|
|
||||||
self.is_producer
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load(&self) -> (usize, usize) {
|
|
||||||
self.tx.load()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message> BusSender<M> {
|
|
||||||
pub async fn send(&self, m: Msg<M>) -> Result<(), Error> {
|
|
||||||
self.tx.send(m).await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub(crate) fn new(is_producer: bool, tx: Sender<Msg<M>>) -> BusSender<M> {
|
|
||||||
Self { is_producer, tx }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub(crate) struct Sender<T> {
|
|
||||||
inner: kanal::AsyncSender<ChannelItem<T>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Sender<T> {
|
|
||||||
pub async fn send(&self, msg: T) -> Result<(), Error> {
|
|
||||||
self.inner
|
|
||||||
.send(ChannelItem::Value(msg))
|
|
||||||
.await
|
|
||||||
.map_err(Error::SendError)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn stop(&self) -> Result<(), Error> {
|
|
||||||
self.inner
|
|
||||||
.send(ChannelItem::Close)
|
|
||||||
.await
|
|
||||||
.map_err(Error::SendError)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn close(&self) -> Result<(), Error> {
|
|
||||||
self.inner.close();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load(&self) -> (usize, usize) {
|
|
||||||
(self.inner.len(), self.inner.capacity())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub(crate) struct Receiver<T> {
|
|
||||||
inner: kanal::AsyncReceiver<ChannelItem<T>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Receiver<T> {
|
|
||||||
#[inline]
|
|
||||||
pub async fn recv(&self) -> Option<T> {
|
|
||||||
let Ok(item) = self.inner.recv().await else {
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
match item {
|
|
||||||
ChannelItem::Value(val) => Some(val),
|
|
||||||
ChannelItem::Close => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn load(&self) -> (u32, u32) {
|
|
||||||
(self.inner.len() as _, self.inner.capacity() as _)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn is_empty(&self) -> bool {
|
|
||||||
self.inner.is_empty()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
|
|
||||||
let (tx, rx) = kanal::bounded_async(cap);
|
|
||||||
|
|
||||||
(Sender { inner: tx }, Receiver { inner: rx })
|
|
||||||
}
|
|
91
src/error.rs
91
src/error.rs
@ -1,91 +0,0 @@
|
|||||||
use std::{fmt, sync::Arc};
|
|
||||||
|
|
||||||
use kanal::{ReceiveError, SendError};
|
|
||||||
|
|
||||||
use crate::message::ErrorMessage;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Error {
|
|
||||||
HandlerIsNotRegistered,
|
|
||||||
Aborted,
|
|
||||||
SendError(SendError),
|
|
||||||
ReceiveError(kanal::ReceiveError),
|
|
||||||
ReorderingDropMessage(u64),
|
|
||||||
HandlerError(Arc<dyn ErrorMessage>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum VoidError {}
|
|
||||||
impl std::fmt::Display for VoidError {
|
|
||||||
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for Error {}
|
|
||||||
impl ErrorMessage for VoidError {}
|
|
||||||
|
|
||||||
impl Eq for Error {}
|
|
||||||
|
|
||||||
impl PartialEq for Error {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
match (self, other) {
|
|
||||||
(Error::HandlerIsNotRegistered, Error::HandlerIsNotRegistered) => true,
|
|
||||||
(Error::Aborted, Error::Aborted) => true,
|
|
||||||
(Error::SendError(err1), Error::SendError(err2)) => err1.eq(err2),
|
|
||||||
(Error::ReceiveError(err1), Error::ReceiveError(err2)) => err1.eq(err2),
|
|
||||||
(Error::ReorderingDropMessage(idx1), Error::ReorderingDropMessage(idx2)) => {
|
|
||||||
idx1.eq(idx2)
|
|
||||||
}
|
|
||||||
(Error::HandlerError(err1), Error::HandlerError(err2)) => Arc::ptr_eq(err1, err2),
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for Error {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
Error::HandlerError(err) => writeln!(f, "Handler Error: {}", err)?,
|
|
||||||
Error::HandlerIsNotRegistered => writeln!(f, "Handle is not registered!")?,
|
|
||||||
Error::Aborted => writeln!(f, "Operation Aborted!")?,
|
|
||||||
Error::SendError(reason) => writeln!(f, "Channel send error; reason {}", reason)?,
|
|
||||||
Error::ReceiveError(reason) => writeln!(f, "Channel receive error; reason {}", reason)?,
|
|
||||||
Error::ReorderingDropMessage(index) => writeln!(
|
|
||||||
f,
|
|
||||||
"Reordering drop message #{} (out of bound the queue)",
|
|
||||||
index
|
|
||||||
)?,
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Clone for Error {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
match self {
|
|
||||||
Error::HandlerError(err) => Error::HandlerError(err.clone()),
|
|
||||||
Error::SendError(err) => match err {
|
|
||||||
SendError::Closed => Error::SendError(SendError::Closed),
|
|
||||||
SendError::ReceiveClosed => Error::SendError(SendError::ReceiveClosed),
|
|
||||||
},
|
|
||||||
Error::ReceiveError(err) => match err {
|
|
||||||
ReceiveError::Closed => Error::ReceiveError(ReceiveError::Closed),
|
|
||||||
ReceiveError::SendClosed => Error::ReceiveError(ReceiveError::SendClosed),
|
|
||||||
},
|
|
||||||
Error::HandlerIsNotRegistered => Error::HandlerIsNotRegistered,
|
|
||||||
Error::Aborted => Error::Aborted,
|
|
||||||
Error::ReorderingDropMessage(idx) => Error::ReorderingDropMessage(*idx),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E> From<E> for Error
|
|
||||||
where
|
|
||||||
E: ErrorMessage,
|
|
||||||
{
|
|
||||||
fn from(error: E) -> Self {
|
|
||||||
Self::HandlerError(Arc::new(error))
|
|
||||||
}
|
|
||||||
}
|
|
243
src/handler.rs
243
src/handler.rs
@ -1,243 +0,0 @@
|
|||||||
use std::{
|
|
||||||
any::{type_name, Any},
|
|
||||||
marker::PhantomData,
|
|
||||||
pin::{pin, Pin},
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicU64, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
use tokio::sync::Notify;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
builder::Builder,
|
|
||||||
chan::Receiver,
|
|
||||||
message::{IntoMessages, Msg},
|
|
||||||
task::{TaskCounter, TaskSpawner},
|
|
||||||
AsyncIterator, Bus, BusInner, Error, ErrorMessage, Message,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub struct Context {
|
|
||||||
pub stream_id: u32,
|
|
||||||
pub task_id: u32,
|
|
||||||
pub index: u64,
|
|
||||||
pub load: (u32, u32),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait Handler<M: Message>: Send + 'static {
|
|
||||||
type Result: Message + Unpin;
|
|
||||||
type Error: ErrorMessage + Unpin;
|
|
||||||
|
|
||||||
fn handle(
|
|
||||||
&mut self,
|
|
||||||
msg: M,
|
|
||||||
ctx: Context,
|
|
||||||
bus: crate::Bus,
|
|
||||||
) -> impl Future<
|
|
||||||
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
|
|
||||||
> + Send
|
|
||||||
+ '_;
|
|
||||||
|
|
||||||
fn handle_error(
|
|
||||||
&mut self,
|
|
||||||
_err: Error,
|
|
||||||
_ctx: Context,
|
|
||||||
_bus: crate::Bus,
|
|
||||||
) -> impl Future<
|
|
||||||
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
|
|
||||||
> + Send
|
|
||||||
+ '_;
|
|
||||||
|
|
||||||
fn finalize(self, bus: crate::Bus) -> impl Future<Output = Result<(), Self::Error>> + Send;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct HandlerSpawner<M, B> {
|
|
||||||
pub(crate) builder: B,
|
|
||||||
_m: PhantomData<M>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M, B> HandlerSpawner<M, B> {
|
|
||||||
pub(crate) fn new(builder: B) -> Self {
|
|
||||||
Self {
|
|
||||||
builder,
|
|
||||||
_m: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message, B: Builder<M>> TaskSpawner<M> for HandlerSpawner<M, B>
|
|
||||||
where
|
|
||||||
B::Context: Any + Handler<M>,
|
|
||||||
{
|
|
||||||
fn spawn_task(
|
|
||||||
&self,
|
|
||||||
rx: Receiver<Msg<M>>,
|
|
||||||
stream_id: u32,
|
|
||||||
task_id: u32,
|
|
||||||
_abort: Arc<Notify>,
|
|
||||||
task_counter: Arc<TaskCounter>,
|
|
||||||
spawn_counter: Arc<TaskCounter>,
|
|
||||||
index_counter: Arc<AtomicU64>,
|
|
||||||
bus: Arc<BusInner>,
|
|
||||||
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
|
|
||||||
Box::pin(async move {
|
|
||||||
let bus = Bus { inner: bus.clone() };
|
|
||||||
|
|
||||||
let config = self.builder.config(stream_id);
|
|
||||||
let mut handler = self.builder.build(stream_id, task_id).await?;
|
|
||||||
|
|
||||||
let _handle = tokio::spawn(async move {
|
|
||||||
let _test = spawn_counter.clone().lease_unit(|| true);
|
|
||||||
|
|
||||||
while let Some(msg) = rx.recv().await {
|
|
||||||
let _test = task_counter.clone().lease_unit(|| rx.is_empty());
|
|
||||||
let ctx = Context {
|
|
||||||
stream_id,
|
|
||||||
task_id,
|
|
||||||
index: msg.index,
|
|
||||||
load: rx.load(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = match msg.inner {
|
|
||||||
Some(Ok(m)) => {
|
|
||||||
send_result(
|
|
||||||
&bus.inner,
|
|
||||||
&index_counter,
|
|
||||||
msg.index,
|
|
||||||
stream_id,
|
|
||||||
&config,
|
|
||||||
Some(
|
|
||||||
handler
|
|
||||||
.handle(m, ctx, bus.clone())
|
|
||||||
.await
|
|
||||||
.map(IntoMessages::into_messages),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
Some(Err(err)) => {
|
|
||||||
send_result(
|
|
||||||
&bus.inner,
|
|
||||||
&index_counter,
|
|
||||||
msg.index,
|
|
||||||
stream_id,
|
|
||||||
&config,
|
|
||||||
Some(
|
|
||||||
handler
|
|
||||||
.handle_error(err, ctx, bus.clone())
|
|
||||||
.await
|
|
||||||
.map(IntoMessages::into_messages),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
send_result::<
|
|
||||||
<B::Context as Handler<M>>::Result,
|
|
||||||
<B::Context as Handler<M>>::Error,
|
|
||||||
>(
|
|
||||||
&bus.inner,
|
|
||||||
&index_counter,
|
|
||||||
msg.index,
|
|
||||||
stream_id,
|
|
||||||
&config,
|
|
||||||
None::<Result<crate::Empty<_>, _>>,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(err) = res {
|
|
||||||
println!(
|
|
||||||
"Messagebus Send Error: {}/{} {}: {}",
|
|
||||||
stream_id,
|
|
||||||
task_id,
|
|
||||||
type_name::<<B::Context as Handler<M>>::Result>(),
|
|
||||||
err,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
println!(
|
|
||||||
"TASK #{} of type `{}` ENDED",
|
|
||||||
task_id,
|
|
||||||
std::any::type_name::<B>()
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Err(err) = handler.finalize(bus.clone()).await {
|
|
||||||
println!("TASK FINALIZE ERROR: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_producer(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn config(&self, stream_id: u32) -> crate::builder::Config {
|
|
||||||
self.builder.config(stream_id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_result<'a, M: Message, E: ErrorMessage>(
|
|
||||||
bus: &Arc<BusInner>,
|
|
||||||
index_counter: &AtomicU64,
|
|
||||||
index: u64,
|
|
||||||
stream_id: u32,
|
|
||||||
config: &crate::builder::Config,
|
|
||||||
res: Option<Result<impl AsyncIterator<Item = Result<M, E>> + Send + 'a, E>>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let reorder_buff = if config.ordered && config.task_count > 1 {
|
|
||||||
config.task_count
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
|
|
||||||
let one = match res {
|
|
||||||
Some(Ok(iter)) => {
|
|
||||||
let hint = iter.size_hint();
|
|
||||||
let mut iter = pin!(iter);
|
|
||||||
match hint {
|
|
||||||
(_, Some(0)) => None,
|
|
||||||
(_, Some(1)) => iter.next().await,
|
|
||||||
_ => {
|
|
||||||
while let Some(item) = iter.as_mut().next().await {
|
|
||||||
let index = index_counter.fetch_add(1, Ordering::Relaxed);
|
|
||||||
let stream_id = if config.stream_per_message {
|
|
||||||
bus.next_stream_id()
|
|
||||||
} else {
|
|
||||||
stream_id
|
|
||||||
};
|
|
||||||
|
|
||||||
bus.send::<M>(
|
|
||||||
Some(item.map_err(Into::into)),
|
|
||||||
index,
|
|
||||||
stream_id,
|
|
||||||
reorder_buff,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(Err(err)) => Some(Err(err)),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
bus.send(
|
|
||||||
one.map(|x| x.map_err(Into::into)),
|
|
||||||
index,
|
|
||||||
stream_id,
|
|
||||||
reorder_buff,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
1217
src/lib.rs
1217
src/lib.rs
File diff suppressed because it is too large
Load Diff
@ -1,76 +0,0 @@
|
|||||||
use core::fmt;
|
|
||||||
use std::any::Any;
|
|
||||||
|
|
||||||
use futures::Stream;
|
|
||||||
|
|
||||||
use crate::{async_iter::AsyncIterator, Error, Iter, StreamIter};
|
|
||||||
|
|
||||||
pub trait Message: Any + fmt::Debug + Clone + Send + Sync + Unpin {}
|
|
||||||
pub trait ErrorMessage: Any + fmt::Debug + fmt::Display + Send + Sync + Unpin {}
|
|
||||||
pub trait IntoMessages<M: Message, E: ErrorMessage> {
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct AsyncIter<I: AsyncIterator>(I);
|
|
||||||
pub fn async_iter<I: AsyncIterator>(i: I) -> AsyncIter<I> {
|
|
||||||
AsyncIter(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Message for () {}
|
|
||||||
impl ErrorMessage for anyhow::Error {}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
||||||
pub(crate) struct Msg<M: Message> {
|
|
||||||
pub(crate) inner: Option<Result<M, Error>>,
|
|
||||||
pub(crate) index: u64,
|
|
||||||
pub(crate) stream_id: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message, E: ErrorMessage, S: Stream<Item = Result<M, E>> + Send> IntoMessages<M, E>
|
|
||||||
for StreamIter<S>
|
|
||||||
{
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: ErrorMessage, I: Iterator + Send + Unpin> IntoMessages<I::Item, E> for Iter<I>
|
|
||||||
where
|
|
||||||
I::Item: Message,
|
|
||||||
{
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<I::Item, E>> {
|
|
||||||
self.map(Ok)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message, E: ErrorMessage, I: AsyncIterator<Item = Result<M, E>>> IntoMessages<M, E>
|
|
||||||
for AsyncIter<I>
|
|
||||||
{
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: ErrorMessage> IntoMessages<(), E> for () {
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> {
|
|
||||||
crate::empty()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const N: usize, M: Message, E: ErrorMessage> IntoMessages<M, E> for [M; N] {
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
|
||||||
crate::iter(self.into_iter().map(Ok))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Vec<M> {
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
|
||||||
crate::iter(self.into_iter().map(Ok))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Option<M> {
|
|
||||||
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
|
|
||||||
crate::iter(self.into_iter().map(Ok))
|
|
||||||
}
|
|
||||||
}
|
|
75
src/rand.rs
75
src/rand.rs
@ -1,75 +0,0 @@
|
|||||||
use std::sync::Mutex;
|
|
||||||
|
|
||||||
use rand::{RngCore, SeedableRng};
|
|
||||||
use rand_xorshift::XorShiftRng;
|
|
||||||
|
|
||||||
static RNG_SEEDS: [u8; 256] = [
|
|
||||||
7, 86, 77, 188, 83, 136, 60, 245, 248, 212, 156, 114, 143, 47, 160, 72, 190, 243, 158, 20, 240,
|
|
||||||
198, 25, 8, 27, 229, 179, 165, 186, 148, 239, 118, 187, 24, 152, 154, 102, 45, 101, 159, 8, 33,
|
|
||||||
158, 161, 42, 183, 189, 173, 58, 200, 121, 45, 11, 168, 245, 161, 186, 43, 244, 251, 244, 246,
|
|
||||||
137, 244, 112, 157, 102, 234, 65, 138, 23, 105, 46, 192, 114, 52, 233, 28, 93, 207, 186, 94,
|
|
||||||
55, 24, 182, 170, 61, 90, 180, 120, 32, 229, 219, 144, 227, 255, 18, 148, 69, 118, 164, 66, 5,
|
|
||||||
243, 18, 190, 21, 224, 151, 225, 229, 136, 112, 1, 181, 246, 64, 53, 249, 166, 22, 104, 255,
|
|
||||||
239, 127, 125, 29, 174, 115, 212, 213, 211, 230, 111, 194, 16, 20, 115, 192, 109, 254, 157,
|
|
||||||
175, 228, 10, 173, 10, 208, 214, 129, 57, 120, 53, 188, 24, 147, 223, 108, 77, 151, 1, 245,
|
|
||||||
151, 38, 186, 95, 28, 242, 0, 2, 161, 86, 154, 73, 138, 225, 40, 235, 26, 195, 42, 229, 15,
|
|
||||||
149, 41, 53, 230, 175, 36, 88, 205, 61, 113, 204, 253, 150, 193, 47, 231, 102, 23, 182, 225,
|
|
||||||
197, 29, 193, 120, 171, 198, 164, 148, 206, 57, 197, 193, 201, 102, 147, 249, 4, 230, 22, 75,
|
|
||||||
40, 145, 144, 113, 152, 115, 170, 41, 29, 154, 166, 74, 71, 23, 148, 139, 244, 114, 139, 66,
|
|
||||||
73, 231, 167, 59, 201, 33, 58, 188, 120, 70, 142, 251, 95,
|
|
||||||
];
|
|
||||||
|
|
||||||
struct RngGenInner {
|
|
||||||
rng: XorShiftRng,
|
|
||||||
counter: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RngGenInner {
|
|
||||||
#[inline]
|
|
||||||
fn next_u32_pair(&mut self, count: u32) -> (u32, u32) {
|
|
||||||
if self.counter >= 16 {
|
|
||||||
self.reseed_next();
|
|
||||||
self.counter = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
let r1 = self.rng.next_u32() % count;
|
|
||||||
let r2 = loop {
|
|
||||||
let val = self.rng.next_u32() % count;
|
|
||||||
if r1 != val {
|
|
||||||
break val;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.counter += 1;
|
|
||||||
|
|
||||||
(r1, r2)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reseed_next(&mut self) {
|
|
||||||
let offset = (self.rng.next_u32() % 240) as usize;
|
|
||||||
let seed: [u8; 16] = RNG_SEEDS[offset..offset + 16].try_into().unwrap();
|
|
||||||
self.rng = XorShiftRng::from_seed(seed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct RndGen {
|
|
||||||
inner: Mutex<RngGenInner>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for RndGen {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
inner: Mutex::new(RngGenInner {
|
|
||||||
rng: XorShiftRng::from_seed(RNG_SEEDS[0..16].try_into().unwrap()),
|
|
||||||
counter: 0,
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RndGen {
|
|
||||||
#[inline]
|
|
||||||
pub fn next_u32_pair(&self, count: u32) -> (u32, u32) {
|
|
||||||
self.inner.lock().unwrap().next_u32_pair(count)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,147 +0,0 @@
|
|||||||
use std::{cmp::Ordering, collections::BinaryHeap};
|
|
||||||
|
|
||||||
struct Entry<M> {
|
|
||||||
inner: M,
|
|
||||||
index: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M> PartialOrd for Entry<M> {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
||||||
Some(other.index.cmp(&self.index))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M> Ord for Entry<M> {
|
|
||||||
fn cmp(&self, other: &Self) -> Ordering {
|
|
||||||
other.index.cmp(&self.index)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M> PartialEq for Entry<M> {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
other.index.eq(&self.index)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M> Eq for Entry<M> {}
|
|
||||||
|
|
||||||
pub(crate) struct ReorderQueueInner<M> {
|
|
||||||
cap: usize,
|
|
||||||
recent_index: Option<u64>,
|
|
||||||
heap: BinaryHeap<Entry<M>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M> ReorderQueueInner<M> {
|
|
||||||
pub fn new(cap: usize) -> Self {
|
|
||||||
Self {
|
|
||||||
cap,
|
|
||||||
recent_index: None,
|
|
||||||
heap: BinaryHeap::with_capacity(cap + 1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn push(&mut self, index: u64, inner: M) -> Option<u64> {
|
|
||||||
if let Some(ri) = self.recent_index {
|
|
||||||
if index <= ri {
|
|
||||||
return Some(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.heap.push(Entry { inner, index });
|
|
||||||
|
|
||||||
if self.heap.len() > self.cap {
|
|
||||||
let _ = self.heap.pop();
|
|
||||||
|
|
||||||
self.recent_index = self.recent_index.map(|x| x + 1);
|
|
||||||
self.recent_index
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pop(&mut self) -> Option<(u64, M)> {
|
|
||||||
if let Some(ri) = self.recent_index {
|
|
||||||
let e = self.heap.peek()?;
|
|
||||||
if e.index == ri + 1 {
|
|
||||||
self.recent_index = Some(e.index);
|
|
||||||
Some((e.index, self.heap.pop()?.inner))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let e = self.heap.peek()?;
|
|
||||||
|
|
||||||
if e.index == 0 {
|
|
||||||
let e = self.heap.pop()?;
|
|
||||||
self.recent_index = Some(e.index);
|
|
||||||
Some((e.index, e.inner))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn force_pop(&mut self) -> Option<(u64, M)> {
|
|
||||||
let e = self.heap.pop()?;
|
|
||||||
self.recent_index = Some(e.index);
|
|
||||||
Some((e.index, e.inner))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use crate::Message;
|
|
||||||
|
|
||||||
use super::ReorderQueueInner;
|
|
||||||
|
|
||||||
impl Message for i32 {}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_reordering() {
|
|
||||||
let mut queue = ReorderQueueInner::new(8);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(0, 0), None);
|
|
||||||
assert_eq!(queue.pop(), Some((0, 0)));
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(3, 3), None);
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(2, 2), None);
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(4, 4), None);
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(1, 1), None);
|
|
||||||
assert_eq!(queue.pop(), Some((1, 1)));
|
|
||||||
assert_eq!(queue.pop(), Some((2, 2)));
|
|
||||||
assert_eq!(queue.pop(), Some((3, 3)));
|
|
||||||
assert_eq!(queue.pop(), Some((4, 4)));
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_overflow() {
|
|
||||||
let mut queue = ReorderQueueInner::new(4);
|
|
||||||
assert_eq!(queue.push(0, 0), None);
|
|
||||||
assert_eq!(queue.pop(), Some((0, 0)));
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(4, 4), None);
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(2, 2), None);
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(3, 3), None);
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
|
|
||||||
assert_eq!(queue.push(5, 5), Some(1));
|
|
||||||
assert_eq!(queue.pop(), Some((2, 2)));
|
|
||||||
assert_eq!(queue.pop(), Some((3, 3)));
|
|
||||||
assert_eq!(queue.pop(), Some((4, 4)));
|
|
||||||
assert_eq!(queue.pop(), Some((5, 5)));
|
|
||||||
assert_eq!(queue.pop(), None);
|
|
||||||
}
|
|
||||||
}
|
|
137
src/task.rs
137
src/task.rs
@ -1,137 +0,0 @@
|
|||||||
use std::{
|
|
||||||
pin::Pin,
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
use tokio::sync::Notify;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
builder::Config,
|
|
||||||
chan::{BusSender, Receiver, Sender},
|
|
||||||
handler::HandlerSpawner,
|
|
||||||
message::Msg,
|
|
||||||
Builder, BusInner, Error, Handler, Message,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub(crate) struct TaskCounter {
|
|
||||||
pub(crate) running: AtomicUsize,
|
|
||||||
notify: Notify,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct TaskCounterLease<S: Fn() -> bool> {
|
|
||||||
need_notify: S,
|
|
||||||
counter: Arc<TaskCounter>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: Fn() -> bool> Drop for TaskCounterLease<S> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let notify = (self.need_notify)();
|
|
||||||
let prev = self.counter.running.fetch_sub(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
if notify && prev == 1 {
|
|
||||||
self.counter.notify.notify_waiters();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S: Fn() -> bool> TaskCounterLease<S> {
|
|
||||||
fn new(counter: Arc<TaskCounter>, need_notify: S) -> Self {
|
|
||||||
counter.running.fetch_add(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
counter,
|
|
||||||
need_notify,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TaskCounter {
|
|
||||||
pub fn lease_unit<S: Fn() -> bool>(self: Arc<Self>, need_notify: S) -> TaskCounterLease<S> {
|
|
||||||
TaskCounterLease::new(self, need_notify)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub async fn wait(&self) {
|
|
||||||
self.notify.notified().await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) trait TaskSpawner<M: Message>: Send + Sync {
|
|
||||||
fn config(&self, stream_id: u32) -> Config;
|
|
||||||
fn is_producer(&self) -> bool;
|
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
fn spawn_task(
|
|
||||||
&self,
|
|
||||||
rx: Receiver<Msg<M>>,
|
|
||||||
stream_id: u32,
|
|
||||||
task_id: u32,
|
|
||||||
abort: Arc<Notify>,
|
|
||||||
task_counter: Arc<TaskCounter>,
|
|
||||||
spawn_counter: Arc<TaskCounter>,
|
|
||||||
index_counter: Arc<AtomicU64>,
|
|
||||||
bus: Arc<BusInner>,
|
|
||||||
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct TaskSpawnerWrapper<M: Message> {
|
|
||||||
inner: Arc<dyn TaskSpawner<M>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message> Clone for TaskSpawnerWrapper<M> {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
inner: self.inner.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<M: Message> TaskSpawnerWrapper<M> {
|
|
||||||
pub fn from_handler<B: Builder<M> + 'static>(builder: B) -> Self
|
|
||||||
where
|
|
||||||
B::Context: Handler<M>,
|
|
||||||
{
|
|
||||||
Self {
|
|
||||||
inner: Arc::new(HandlerSpawner::new(builder)) as _,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
pub async fn spawn_task(
|
|
||||||
&self,
|
|
||||||
(tx, rx): (Sender<Msg<M>>, Receiver<Msg<M>>),
|
|
||||||
stream_id: u32,
|
|
||||||
task_id: u32,
|
|
||||||
abort: Arc<Notify>,
|
|
||||||
task_counter: Arc<TaskCounter>,
|
|
||||||
spawn_counter: Arc<TaskCounter>,
|
|
||||||
index_counter: Arc<AtomicU64>,
|
|
||||||
bus: Arc<BusInner>,
|
|
||||||
) -> Result<BusSender<M>, Error> {
|
|
||||||
self.inner
|
|
||||||
.spawn_task(
|
|
||||||
rx,
|
|
||||||
stream_id,
|
|
||||||
task_id,
|
|
||||||
abort,
|
|
||||||
task_counter,
|
|
||||||
spawn_counter,
|
|
||||||
index_counter,
|
|
||||||
bus,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(BusSender::new(self.inner.is_producer(), tx))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub(crate) fn config(&self, stream_id: u32) -> Config {
|
|
||||||
self.inner.config(stream_id)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user