Compare commits

..

No commits in common. "next" and "master" have entirely different histories.
next ... master

12 changed files with 634 additions and 1981 deletions

View File

@ -1,26 +1,16 @@
[package]
name = "messagebus"
version = "0.15.1"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]
categories = ["network-programming", "asynchronous"]
description = "MessageBus allows intercommunicate with messages between modules"
exclude = [".gitignore", ".cargo/config", ".github/**", ".drone.yml"]
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.75"
boxcar = "0.2.3"
dashmap = "5.5.0"
futures = "0.3.28"
kanal = "0.1.0-pre8"
log = "0.4.20"
pin-project-lite = "0.2.13"
priority-queue = "1.3.2"
rand = { version = "0.8.5", default-features = false, features = ["std_rng", "std"] }
rand_xorshift = "0.3.0"
tokio = { version = "1.32.0", features = ["sync", "rt", "macros"] }
[dev-dependencies]

View File

@ -1,83 +1,37 @@
#![feature(return_position_impl_trait_in_trait)]
#![feature(async_fn_in_trait)]
use std::sync::Arc;
use anyhow::Error;
use messagebus::{Builder, Bus, Context, Handler, IntoMessages, Message};
use messagebus::{Bus, Error, Message};
#[derive(Debug, Clone)]
pub struct Msg(pub i32);
impl Message for Msg {}
pub struct Processor {
_state: i32,
}
impl Handler<Msg> for Processor {
type Result = ();
type Error = Error;
async fn handle(
&mut self,
_msg: Msg,
_ctx: Context,
_bus: Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error>, Self::Error> {
Ok(())
}
async fn handle_error(
&mut self,
_err: messagebus::Error,
_ctx: Context,
_bus: messagebus::Bus,
) -> Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error> {
Ok(None)
}
async fn finalize(self, _bus: Bus) -> Result<(), Self::Error> {
Ok(())
}
}
struct ProcSpawner;
impl Builder<Msg> for ProcSpawner {
type Context = Processor;
async fn build(
&self,
stream_id: u32,
_task_id: u32,
) -> Result<Self::Context, messagebus::Error> {
Ok(Processor {
_state: stream_id as _,
})
}
state: i32,
}
impl Processor {
pub async fn spawn(_sid: u32) -> Result<(usize, Self), Error> {
Ok((4, Self { _state: 0 }))
pub async fn spawn(sid: u32) -> Result<(usize, Self), Error> {
Ok((4, Self { state: 0 }))
}
pub async fn handler_msg(
self: Arc<Self>,
_sid: u32,
_tid: u32,
_msg: Msg,
) -> Result<(), Error> {
pub async fn handler_msg(self: Arc<Self>, sid: u32, tid: u32, msg: Msg) -> Result<(), Error> {
Ok(())
}
pub async fn finalize_msg_handler(self: Arc<Self>, _sid: u32) -> Result<(), Error> {
pub async fn finalize_msg_handler(self: Arc<Self>, sid: u32) -> Result<(), Error> {
Ok(())
}
}
async fn run() {
let bus = Bus::new();
bus.register(ProcSpawner).await;
bus.register(
4,
Processor::spawn,
Processor::handler_msg,
Processor::finalize_msg_handler,
);
}
#[tokio::main]

View File

@ -1,141 +0,0 @@
use std::{marker::PhantomData, pin::Pin};
use futures::{Stream, StreamExt};
pub trait AsyncIterator: Send {
type Item: Send;
fn next(self: Pin<&mut Self>) -> impl futures::Future<Output = Option<Self::Item>> + Send + '_;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
fn map<U: Send, C: Send + FnMut(Self::Item) -> U>(self, cb: C) -> impl AsyncIterator<Item = U>
where
Self: Sized,
{
Map { inner: self, cb }
}
}
pin_project_lite::pin_project! {
pub struct Map<U, I: AsyncIterator, F: FnMut(I::Item) ->U> {
#[pin]
inner: I,
cb: F,
}
}
impl<U: Send, I: AsyncIterator, F: Send + FnMut(I::Item) -> U> AsyncIterator for Map<U, I, F> {
type Item = U;
async fn next(self: Pin<&mut Self>) -> Option<U> {
let this = self.project();
Some((this.cb)(this.inner.next().await?))
}
}
pub struct Iter<I: Iterator> {
inner: I,
}
impl<I: Send + Iterator + Unpin> AsyncIterator for Iter<I>
where
I::Item: Send,
{
type Item = I::Item;
#[inline]
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.get_mut().inner.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub fn iter<I: IntoIterator + Send>(inner: I) -> Iter<I::IntoIter>
where
I::IntoIter: Send + Unpin,
I::Item: Send,
{
Iter {
inner: inner.into_iter(),
}
}
pin_project_lite::pin_project! {
pub struct StreamIter<S: Stream> {
#[pin]
inner: S,
}
}
impl<S: Send + Stream> AsyncIterator for StreamIter<S>
where
S::Item: Send,
{
type Item = S::Item;
#[inline]
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.project().inner.next().await
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub fn stream<S: Send + Stream>(inner: S) -> StreamIter<S>
where
S::Item: Send,
{
StreamIter { inner }
}
pub struct Once<I>(Option<I>);
impl<I: Send + Unpin> AsyncIterator for Once<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.get_mut().0.take()
}
}
pub fn once<I: Send + Unpin>(item: I) -> Once<I> {
Once(Some(item))
}
pub struct Empty<I>(PhantomData<I>);
impl<I: Send> AsyncIterator for Empty<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
None
}
}
pub fn empty<I: Send + Unpin>() -> Empty<I> {
Empty(Default::default())
}
impl<I: Send> AsyncIterator for tokio::sync::mpsc::Receiver<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
self.get_mut().recv().await
}
}
impl<I: Send> AsyncIterator for tokio::sync::oneshot::Receiver<I> {
type Item = I;
async fn next(self: Pin<&mut Self>) -> Option<Self::Item> {
match self.await {
Ok(item) => Some(item),
Err(_) => None,
}
}
}

View File

@ -1,227 +0,0 @@
use std::{marker::PhantomData, sync::Arc};
use futures::Future;
use crate::{Error, Message};
#[derive(Clone, Copy, Debug)]
pub struct Config {
pub queue_size: usize,
pub queue_per_task: bool,
pub ordered: bool,
pub task_count: u32,
pub lazy_task_creation: bool,
pub stream_per_message: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
queue_size: 4,
queue_per_task: false,
ordered: false,
task_count: 1,
lazy_task_creation: true,
stream_per_message: false,
}
}
}
pub trait Builder<M: Message>: Send + Sync + 'static {
type Context: 'static;
fn config(&self, _stream_id: u32) -> Config {
Default::default()
}
fn build(
&self,
stream_id: u32,
_task_id: u32,
) -> impl Future<Output = Result<Self::Context, Error>> + Send + '_;
}
pub struct DefaultBuilder<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> {
config: Config,
callback: C,
_m: PhantomData<(M, H, F)>,
}
unsafe impl<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> Sync
for DefaultBuilder<M, H, C, F>
{
}
impl<M, H, C, F> DefaultBuilder<M, H, C, F>
where
M: Message,
H: Sync + Send + 'static,
F: Send + Future<Output = Result<H, Error>> + 'static,
C: Sync + Send + Fn(u32, u32) -> F + 'static,
{
pub fn new(queue_size: usize, callback: C) -> Self {
Self {
config: Config {
queue_size,
queue_per_task: false,
ordered: false,
task_count: 1,
lazy_task_creation: true,
stream_per_message: false,
},
callback,
_m: PhantomData,
}
}
pub fn ordered(self) -> Self {
let mut config = self.config;
config.ordered = true;
Self {
config,
callback: self.callback,
_m: PhantomData,
}
}
pub fn stream_per_message(self) -> Self {
let mut config = self.config;
config.stream_per_message = true;
Self {
config,
callback: self.callback,
_m: PhantomData,
}
}
pub fn tasks(self, tasks: u32) -> Self {
let mut config = self.config;
config.task_count = tasks;
Self {
config,
callback: self.callback,
_m: PhantomData,
}
}
}
impl<M, H, C, F> Builder<M> for DefaultBuilder<M, H, C, F>
where
M: Message,
H: Sync + Send + 'static,
F: Send + Future<Output = Result<H, Error>> + 'static,
C: Sync + Send + Fn(u32, u32) -> F + 'static,
{
type Context = H;
async fn build(&self, stream_id: u32, task_id: u32) -> Result<Self::Context, Error> {
(self.callback)(stream_id, task_id).await
}
fn config(&self, _stream_id: u32) -> Config {
self.config
}
}
pub struct SharedBuilder<M, H, C, F> {
config: Config,
stream_handlers: dashmap::DashMap<u32, Arc<H>>,
callback: C,
_m: PhantomData<(M, F)>,
}
unsafe impl<M: Send + Sync, H: Send + Sync, C: Send + Sync, F: Send> Sync
for SharedBuilder<M, H, C, F>
{
}
impl<M, H, C, F> SharedBuilder<M, H, C, F>
where
M: Message,
H: Sync + Send + 'static,
F: Send + Future<Output = Result<H, Error>> + 'static,
C: Sync + Send + Fn(u32, u32) -> F + 'static,
{
pub fn new(queue_size: usize, task_count: u32, callback: C) -> Self {
Self {
config: Config {
queue_size,
queue_per_task: false,
ordered: false,
task_count,
lazy_task_creation: true,
stream_per_message: false,
},
stream_handlers: Default::default(),
callback,
_m: PhantomData,
}
}
pub fn stream_per_message(self) -> Self {
let mut config = self.config;
config.stream_per_message = true;
Self {
config,
callback: self.callback,
_m: PhantomData,
stream_handlers: Default::default(),
}
}
pub fn ordered(self) -> Self {
let mut config = self.config;
config.ordered = true;
Self {
config,
stream_handlers: self.stream_handlers,
callback: self.callback,
_m: PhantomData,
}
}
pub fn queue_per_task(self) -> Self {
let mut config = self.config;
config.queue_per_task = true;
Self {
config,
stream_handlers: self.stream_handlers,
callback: self.callback,
_m: PhantomData,
}
}
}
impl<M, H, C, F> Builder<M> for SharedBuilder<M, H, C, F>
where
M: Message,
H: Sync + Send + 'static,
F: Send + Future<Output = Result<H, Error>> + 'static,
C: Sync + Send + Fn(u32, u32) -> F + 'static,
{
type Context = Arc<H>;
async fn build(&self, stream_id: u32, task_id: u32) -> Result<Self::Context, Error> {
if self.stream_handlers.contains_key(&stream_id) {
return Ok(self.stream_handlers.get(&stream_id).unwrap().clone());
}
let val = match (self.callback)(stream_id, task_id).await {
Ok(val) => Arc::new(val),
Err(err) => return Err(err),
};
self.stream_handlers.insert(stream_id, val.clone());
Ok(val)
}
fn config(&self, _stream_id: u32) -> Config {
self.config
}
}

View File

@ -1,147 +0,0 @@
use std::{
any::{Any, TypeId},
pin::Pin,
};
use futures::Future;
use crate::{message::Msg, Error, Message};
enum ChannelItem<T> {
Value(T),
Close,
}
pub(crate) trait AbstractSender: Any + Send + Sync {
fn upcast(&self) -> &(dyn Any + Send + Sync);
fn close(&self);
fn message_type_id(&self) -> TypeId;
}
impl<M: Any + Send + Sync> AbstractSender for kanal::AsyncSender<M> {
fn upcast(&self) -> &(dyn Any + Send + Sync) {
self
}
fn close(&self) {
self.close();
}
fn message_type_id(&self) -> TypeId {
TypeId::of::<M>()
}
}
pub(crate) trait BusSenderClose: Any + Send + Sync {
fn upcast(&self) -> &(dyn Any + Send + Sync);
fn is_producer(&self) -> bool;
fn load(&self) -> (usize, usize);
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>>;
fn terminate(&self) -> Result<(), Error>;
}
pub(crate) struct BusSender<M: Message> {
is_producer: bool,
tx: Sender<Msg<M>>,
}
impl<M: Message> BusSenderClose for BusSender<M> {
fn upcast(&self) -> &(dyn Any + Send + Sync) {
self
}
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>> {
Box::pin(async move {
self.tx.stop().await?;
Ok(())
})
}
fn terminate(&self) -> Result<(), Error> {
self.tx.close()
}
fn is_producer(&self) -> bool {
self.is_producer
}
fn load(&self) -> (usize, usize) {
self.tx.load()
}
}
impl<M: Message> BusSender<M> {
pub async fn send(&self, m: Msg<M>) -> Result<(), Error> {
self.tx.send(m).await
}
#[inline]
pub(crate) fn new(is_producer: bool, tx: Sender<Msg<M>>) -> BusSender<M> {
Self { is_producer, tx }
}
}
#[derive(Clone)]
pub(crate) struct Sender<T> {
inner: kanal::AsyncSender<ChannelItem<T>>,
}
impl<T> Sender<T> {
pub async fn send(&self, msg: T) -> Result<(), Error> {
self.inner
.send(ChannelItem::Value(msg))
.await
.map_err(Error::SendError)
}
pub async fn stop(&self) -> Result<(), Error> {
self.inner
.send(ChannelItem::Close)
.await
.map_err(Error::SendError)
}
pub fn close(&self) -> Result<(), Error> {
self.inner.close();
Ok(())
}
pub fn load(&self) -> (usize, usize) {
(self.inner.len(), self.inner.capacity())
}
}
#[derive(Clone)]
pub(crate) struct Receiver<T> {
inner: kanal::AsyncReceiver<ChannelItem<T>>,
}
impl<T> Receiver<T> {
#[inline]
pub async fn recv(&self) -> Option<T> {
let Ok(item) = self.inner.recv().await else {
return None;
};
match item {
ChannelItem::Value(val) => Some(val),
ChannelItem::Close => None,
}
}
#[inline]
pub fn load(&self) -> (u32, u32) {
(self.inner.len() as _, self.inner.capacity() as _)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
pub(crate) fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = kanal::bounded_async(cap);
(Sender { inner: tx }, Receiver { inner: rx })
}

View File

@ -1,91 +0,0 @@
use std::{fmt, sync::Arc};
use kanal::{ReceiveError, SendError};
use crate::message::ErrorMessage;
#[derive(Debug)]
pub enum Error {
HandlerIsNotRegistered,
Aborted,
SendError(SendError),
ReceiveError(kanal::ReceiveError),
ReorderingDropMessage(u64),
HandlerError(Arc<dyn ErrorMessage>),
}
#[derive(Debug)]
pub enum VoidError {}
impl std::fmt::Display for VoidError {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
}
}
impl std::error::Error for Error {}
impl ErrorMessage for VoidError {}
impl Eq for Error {}
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Error::HandlerIsNotRegistered, Error::HandlerIsNotRegistered) => true,
(Error::Aborted, Error::Aborted) => true,
(Error::SendError(err1), Error::SendError(err2)) => err1.eq(err2),
(Error::ReceiveError(err1), Error::ReceiveError(err2)) => err1.eq(err2),
(Error::ReorderingDropMessage(idx1), Error::ReorderingDropMessage(idx2)) => {
idx1.eq(idx2)
}
(Error::HandlerError(err1), Error::HandlerError(err2)) => Arc::ptr_eq(err1, err2),
_ => false,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::HandlerError(err) => writeln!(f, "Handler Error: {}", err)?,
Error::HandlerIsNotRegistered => writeln!(f, "Handle is not registered!")?,
Error::Aborted => writeln!(f, "Operation Aborted!")?,
Error::SendError(reason) => writeln!(f, "Channel send error; reason {}", reason)?,
Error::ReceiveError(reason) => writeln!(f, "Channel receive error; reason {}", reason)?,
Error::ReorderingDropMessage(index) => writeln!(
f,
"Reordering drop message #{} (out of bound the queue)",
index
)?,
}
Ok(())
}
}
impl Clone for Error {
fn clone(&self) -> Self {
match self {
Error::HandlerError(err) => Error::HandlerError(err.clone()),
Error::SendError(err) => match err {
SendError::Closed => Error::SendError(SendError::Closed),
SendError::ReceiveClosed => Error::SendError(SendError::ReceiveClosed),
},
Error::ReceiveError(err) => match err {
ReceiveError::Closed => Error::ReceiveError(ReceiveError::Closed),
ReceiveError::SendClosed => Error::ReceiveError(ReceiveError::SendClosed),
},
Error::HandlerIsNotRegistered => Error::HandlerIsNotRegistered,
Error::Aborted => Error::Aborted,
Error::ReorderingDropMessage(idx) => Error::ReorderingDropMessage(*idx),
}
}
}
impl<E> From<E> for Error
where
E: ErrorMessage,
{
fn from(error: E) -> Self {
Self::HandlerError(Arc::new(error))
}
}

View File

@ -1,243 +0,0 @@
use std::{
any::{type_name, Any},
marker::PhantomData,
pin::{pin, Pin},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use futures::Future;
use tokio::sync::Notify;
use crate::{
builder::Builder,
chan::Receiver,
message::{IntoMessages, Msg},
task::{TaskCounter, TaskSpawner},
AsyncIterator, Bus, BusInner, Error, ErrorMessage, Message,
};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct Context {
pub stream_id: u32,
pub task_id: u32,
pub index: u64,
pub load: (u32, u32),
}
pub trait Handler<M: Message>: Send + 'static {
type Result: Message + Unpin;
type Error: ErrorMessage + Unpin;
fn handle(
&mut self,
msg: M,
ctx: Context,
bus: crate::Bus,
) -> impl Future<
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
> + Send
+ '_;
fn handle_error(
&mut self,
_err: Error,
_ctx: Context,
_bus: crate::Bus,
) -> impl Future<
Output = Result<impl IntoMessages<Self::Result, Self::Error> + Send + '_, Self::Error>,
> + Send
+ '_;
fn finalize(self, bus: crate::Bus) -> impl Future<Output = Result<(), Self::Error>> + Send;
}
pub(crate) struct HandlerSpawner<M, B> {
pub(crate) builder: B,
_m: PhantomData<M>,
}
impl<M, B> HandlerSpawner<M, B> {
pub(crate) fn new(builder: B) -> Self {
Self {
builder,
_m: PhantomData,
}
}
}
impl<M: Message, B: Builder<M>> TaskSpawner<M> for HandlerSpawner<M, B>
where
B::Context: Any + Handler<M>,
{
fn spawn_task(
&self,
rx: Receiver<Msg<M>>,
stream_id: u32,
task_id: u32,
_abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
spawn_counter: Arc<TaskCounter>,
index_counter: Arc<AtomicU64>,
bus: Arc<BusInner>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
Box::pin(async move {
let bus = Bus { inner: bus.clone() };
let config = self.builder.config(stream_id);
let mut handler = self.builder.build(stream_id, task_id).await?;
let _handle = tokio::spawn(async move {
let _test = spawn_counter.clone().lease_unit(|| true);
while let Some(msg) = rx.recv().await {
let _test = task_counter.clone().lease_unit(|| rx.is_empty());
let ctx = Context {
stream_id,
task_id,
index: msg.index,
load: rx.load(),
};
let res = match msg.inner {
Some(Ok(m)) => {
send_result(
&bus.inner,
&index_counter,
msg.index,
stream_id,
&config,
Some(
handler
.handle(m, ctx, bus.clone())
.await
.map(IntoMessages::into_messages),
),
)
.await
}
Some(Err(err)) => {
send_result(
&bus.inner,
&index_counter,
msg.index,
stream_id,
&config,
Some(
handler
.handle_error(err, ctx, bus.clone())
.await
.map(IntoMessages::into_messages),
),
)
.await
}
None => {
send_result::<
<B::Context as Handler<M>>::Result,
<B::Context as Handler<M>>::Error,
>(
&bus.inner,
&index_counter,
msg.index,
stream_id,
&config,
None::<Result<crate::Empty<_>, _>>,
)
.await
}
};
if let Err(err) = res {
println!(
"Messagebus Send Error: {}/{} {}: {}",
stream_id,
task_id,
type_name::<<B::Context as Handler<M>>::Result>(),
err,
);
}
}
println!(
"TASK #{} of type `{}` ENDED",
task_id,
std::any::type_name::<B>()
);
if let Err(err) = handler.finalize(bus.clone()).await {
println!("TASK FINALIZE ERROR: {:?}", err);
}
});
Ok(())
})
}
fn is_producer(&self) -> bool {
false
}
fn config(&self, stream_id: u32) -> crate::builder::Config {
self.builder.config(stream_id)
}
}
async fn send_result<'a, M: Message, E: ErrorMessage>(
bus: &Arc<BusInner>,
index_counter: &AtomicU64,
index: u64,
stream_id: u32,
config: &crate::builder::Config,
res: Option<Result<impl AsyncIterator<Item = Result<M, E>> + Send + 'a, E>>,
) -> Result<(), Error> {
let reorder_buff = if config.ordered && config.task_count > 1 {
config.task_count
} else {
0
};
let one = match res {
Some(Ok(iter)) => {
let hint = iter.size_hint();
let mut iter = pin!(iter);
match hint {
(_, Some(0)) => None,
(_, Some(1)) => iter.next().await,
_ => {
while let Some(item) = iter.as_mut().next().await {
let index = index_counter.fetch_add(1, Ordering::Relaxed);
let stream_id = if config.stream_per_message {
bus.next_stream_id()
} else {
stream_id
};
bus.send::<M>(
Some(item.map_err(Into::into)),
index,
stream_id,
reorder_buff,
)
.await?;
}
return Ok(());
}
}
}
Some(Err(err)) => Some(Err(err)),
None => None,
};
bus.send(
one.map(|x| x.map_err(Into::into)),
index,
stream_id,
reorder_buff,
)
.await?;
Ok(())
}

1245
src/lib.rs

File diff suppressed because it is too large Load Diff

View File

@ -1,76 +0,0 @@
use core::fmt;
use std::any::Any;
use futures::Stream;
use crate::{async_iter::AsyncIterator, Error, Iter, StreamIter};
pub trait Message: Any + fmt::Debug + Clone + Send + Sync + Unpin {}
pub trait ErrorMessage: Any + fmt::Debug + fmt::Display + Send + Sync + Unpin {}
pub trait IntoMessages<M: Message, E: ErrorMessage> {
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>>;
}
pub struct AsyncIter<I: AsyncIterator>(I);
pub fn async_iter<I: AsyncIterator>(i: I) -> AsyncIter<I> {
AsyncIter(i)
}
impl Message for () {}
impl ErrorMessage for anyhow::Error {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Msg<M: Message> {
pub(crate) inner: Option<Result<M, Error>>,
pub(crate) index: u64,
pub(crate) stream_id: u32,
}
impl<M: Message, E: ErrorMessage, S: Stream<Item = Result<M, E>> + Send> IntoMessages<M, E>
for StreamIter<S>
{
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
self
}
}
impl<E: ErrorMessage, I: Iterator + Send + Unpin> IntoMessages<I::Item, E> for Iter<I>
where
I::Item: Message,
{
fn into_messages(self) -> impl AsyncIterator<Item = Result<I::Item, E>> {
self.map(Ok)
}
}
impl<M: Message, E: ErrorMessage, I: AsyncIterator<Item = Result<M, E>>> IntoMessages<M, E>
for AsyncIter<I>
{
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
self.0
}
}
impl<E: ErrorMessage> IntoMessages<(), E> for () {
fn into_messages(self) -> impl AsyncIterator<Item = Result<(), E>> {
crate::empty()
}
}
impl<const N: usize, M: Message, E: ErrorMessage> IntoMessages<M, E> for [M; N] {
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
crate::iter(self.into_iter().map(Ok))
}
}
impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Vec<M> {
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
crate::iter(self.into_iter().map(Ok))
}
}
impl<M: Message, E: ErrorMessage> IntoMessages<M, E> for Option<M> {
fn into_messages(self) -> impl AsyncIterator<Item = Result<M, E>> {
crate::iter(self.into_iter().map(Ok))
}
}

View File

@ -1,75 +0,0 @@
use std::sync::Mutex;
use rand::{RngCore, SeedableRng};
use rand_xorshift::XorShiftRng;
static RNG_SEEDS: [u8; 256] = [
7, 86, 77, 188, 83, 136, 60, 245, 248, 212, 156, 114, 143, 47, 160, 72, 190, 243, 158, 20, 240,
198, 25, 8, 27, 229, 179, 165, 186, 148, 239, 118, 187, 24, 152, 154, 102, 45, 101, 159, 8, 33,
158, 161, 42, 183, 189, 173, 58, 200, 121, 45, 11, 168, 245, 161, 186, 43, 244, 251, 244, 246,
137, 244, 112, 157, 102, 234, 65, 138, 23, 105, 46, 192, 114, 52, 233, 28, 93, 207, 186, 94,
55, 24, 182, 170, 61, 90, 180, 120, 32, 229, 219, 144, 227, 255, 18, 148, 69, 118, 164, 66, 5,
243, 18, 190, 21, 224, 151, 225, 229, 136, 112, 1, 181, 246, 64, 53, 249, 166, 22, 104, 255,
239, 127, 125, 29, 174, 115, 212, 213, 211, 230, 111, 194, 16, 20, 115, 192, 109, 254, 157,
175, 228, 10, 173, 10, 208, 214, 129, 57, 120, 53, 188, 24, 147, 223, 108, 77, 151, 1, 245,
151, 38, 186, 95, 28, 242, 0, 2, 161, 86, 154, 73, 138, 225, 40, 235, 26, 195, 42, 229, 15,
149, 41, 53, 230, 175, 36, 88, 205, 61, 113, 204, 253, 150, 193, 47, 231, 102, 23, 182, 225,
197, 29, 193, 120, 171, 198, 164, 148, 206, 57, 197, 193, 201, 102, 147, 249, 4, 230, 22, 75,
40, 145, 144, 113, 152, 115, 170, 41, 29, 154, 166, 74, 71, 23, 148, 139, 244, 114, 139, 66,
73, 231, 167, 59, 201, 33, 58, 188, 120, 70, 142, 251, 95,
];
struct RngGenInner {
rng: XorShiftRng,
counter: u32,
}
impl RngGenInner {
#[inline]
fn next_u32_pair(&mut self, count: u32) -> (u32, u32) {
if self.counter >= 16 {
self.reseed_next();
self.counter = 0;
}
let r1 = self.rng.next_u32() % count;
let r2 = loop {
let val = self.rng.next_u32() % count;
if r1 != val {
break val;
}
};
self.counter += 1;
(r1, r2)
}
fn reseed_next(&mut self) {
let offset = (self.rng.next_u32() % 240) as usize;
let seed: [u8; 16] = RNG_SEEDS[offset..offset + 16].try_into().unwrap();
self.rng = XorShiftRng::from_seed(seed);
}
}
pub(crate) struct RndGen {
inner: Mutex<RngGenInner>,
}
impl Default for RndGen {
fn default() -> Self {
Self {
inner: Mutex::new(RngGenInner {
rng: XorShiftRng::from_seed(RNG_SEEDS[0..16].try_into().unwrap()),
counter: 0,
}),
}
}
}
impl RndGen {
#[inline]
pub fn next_u32_pair(&self, count: u32) -> (u32, u32) {
self.inner.lock().unwrap().next_u32_pair(count)
}
}

View File

@ -1,147 +0,0 @@
use std::{cmp::Ordering, collections::BinaryHeap};
struct Entry<M> {
inner: M,
index: u64,
}
impl<M> PartialOrd for Entry<M> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(other.index.cmp(&self.index))
}
}
impl<M> Ord for Entry<M> {
fn cmp(&self, other: &Self) -> Ordering {
other.index.cmp(&self.index)
}
}
impl<M> PartialEq for Entry<M> {
fn eq(&self, other: &Self) -> bool {
other.index.eq(&self.index)
}
}
impl<M> Eq for Entry<M> {}
pub(crate) struct ReorderQueueInner<M> {
cap: usize,
recent_index: Option<u64>,
heap: BinaryHeap<Entry<M>>,
}
impl<M> ReorderQueueInner<M> {
pub fn new(cap: usize) -> Self {
Self {
cap,
recent_index: None,
heap: BinaryHeap::with_capacity(cap + 1),
}
}
pub fn push(&mut self, index: u64, inner: M) -> Option<u64> {
if let Some(ri) = self.recent_index {
if index <= ri {
return Some(index);
}
}
self.heap.push(Entry { inner, index });
if self.heap.len() > self.cap {
let _ = self.heap.pop();
self.recent_index = self.recent_index.map(|x| x + 1);
self.recent_index
} else {
None
}
}
pub fn pop(&mut self) -> Option<(u64, M)> {
if let Some(ri) = self.recent_index {
let e = self.heap.peek()?;
if e.index == ri + 1 {
self.recent_index = Some(e.index);
Some((e.index, self.heap.pop()?.inner))
} else {
None
}
} else {
let e = self.heap.peek()?;
if e.index == 0 {
let e = self.heap.pop()?;
self.recent_index = Some(e.index);
Some((e.index, e.inner))
} else {
None
}
}
}
pub fn force_pop(&mut self) -> Option<(u64, M)> {
let e = self.heap.pop()?;
self.recent_index = Some(e.index);
Some((e.index, e.inner))
}
}
#[cfg(test)]
mod tests {
use crate::Message;
use super::ReorderQueueInner;
impl Message for i32 {}
#[test]
fn test_reordering() {
let mut queue = ReorderQueueInner::new(8);
assert_eq!(queue.push(0, 0), None);
assert_eq!(queue.pop(), Some((0, 0)));
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(3, 3), None);
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(2, 2), None);
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(4, 4), None);
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(1, 1), None);
assert_eq!(queue.pop(), Some((1, 1)));
assert_eq!(queue.pop(), Some((2, 2)));
assert_eq!(queue.pop(), Some((3, 3)));
assert_eq!(queue.pop(), Some((4, 4)));
assert_eq!(queue.pop(), None);
}
#[test]
fn test_overflow() {
let mut queue = ReorderQueueInner::new(4);
assert_eq!(queue.push(0, 0), None);
assert_eq!(queue.pop(), Some((0, 0)));
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(4, 4), None);
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(2, 2), None);
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(3, 3), None);
assert_eq!(queue.pop(), None);
assert_eq!(queue.push(5, 5), Some(1));
assert_eq!(queue.pop(), Some((2, 2)));
assert_eq!(queue.pop(), Some((3, 3)));
assert_eq!(queue.pop(), Some((4, 4)));
assert_eq!(queue.pop(), Some((5, 5)));
assert_eq!(queue.pop(), None);
}
}

View File

@ -1,137 +0,0 @@
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
};
use futures::Future;
use tokio::sync::Notify;
use crate::{
builder::Config,
chan::{BusSender, Receiver, Sender},
handler::HandlerSpawner,
message::Msg,
Builder, BusInner, Error, Handler, Message,
};
#[derive(Default)]
pub(crate) struct TaskCounter {
pub(crate) running: AtomicUsize,
notify: Notify,
}
pub(crate) struct TaskCounterLease<S: Fn() -> bool> {
need_notify: S,
counter: Arc<TaskCounter>,
}
impl<S: Fn() -> bool> Drop for TaskCounterLease<S> {
fn drop(&mut self) {
let notify = (self.need_notify)();
let prev = self.counter.running.fetch_sub(1, Ordering::Relaxed);
if notify && prev == 1 {
self.counter.notify.notify_waiters();
}
}
}
impl<S: Fn() -> bool> TaskCounterLease<S> {
fn new(counter: Arc<TaskCounter>, need_notify: S) -> Self {
counter.running.fetch_add(1, Ordering::Relaxed);
Self {
counter,
need_notify,
}
}
}
impl TaskCounter {
pub fn lease_unit<S: Fn() -> bool>(self: Arc<Self>, need_notify: S) -> TaskCounterLease<S> {
TaskCounterLease::new(self, need_notify)
}
#[inline]
pub async fn wait(&self) {
self.notify.notified().await
}
}
pub(crate) trait TaskSpawner<M: Message>: Send + Sync {
fn config(&self, stream_id: u32) -> Config;
fn is_producer(&self) -> bool;
#[allow(clippy::too_many_arguments)]
fn spawn_task(
&self,
rx: Receiver<Msg<M>>,
stream_id: u32,
task_id: u32,
abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
spawn_counter: Arc<TaskCounter>,
index_counter: Arc<AtomicU64>,
bus: Arc<BusInner>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
}
pub(crate) struct TaskSpawnerWrapper<M: Message> {
inner: Arc<dyn TaskSpawner<M>>,
}
impl<M: Message> Clone for TaskSpawnerWrapper<M> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<M: Message> TaskSpawnerWrapper<M> {
pub fn from_handler<B: Builder<M> + 'static>(builder: B) -> Self
where
B::Context: Handler<M>,
{
Self {
inner: Arc::new(HandlerSpawner::new(builder)) as _,
}
}
#[inline]
#[allow(clippy::too_many_arguments)]
pub async fn spawn_task(
&self,
(tx, rx): (Sender<Msg<M>>, Receiver<Msg<M>>),
stream_id: u32,
task_id: u32,
abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
spawn_counter: Arc<TaskCounter>,
index_counter: Arc<AtomicU64>,
bus: Arc<BusInner>,
) -> Result<BusSender<M>, Error> {
self.inner
.spawn_task(
rx,
stream_id,
task_id,
abort,
task_counter,
spawn_counter,
index_counter,
bus,
)
.await?;
Ok(BusSender::new(self.inner.is_producer(), tx))
}
#[inline]
pub(crate) fn config(&self, stream_id: u32) -> Config {
self.inner.config(stream_id)
}
}