This commit is contained in:
Andrey Tkachenko 2023-10-25 21:43:39 +04:00
parent 0297882f47
commit 2ae4e8bfed
12 changed files with 1112 additions and 644 deletions

View File

@ -11,6 +11,9 @@ dashmap = "5.5.0"
futures = "0.3.28"
kanal = "0.1.0-pre8"
log = "0.4.20"
priority-queue = "1.3.2"
rand = { version = "0.8.5", default-features = false, features = ["std_rng", "std"] }
rand_xorshift = "0.3.0"
tokio = { version = "1.32.0", features = ["sync", "rt", "macros"] }
[dev-dependencies]

View File

@ -1,6 +1,8 @@
#![feature(impl_trait_in_assoc_type)]
use std::sync::Arc;
use messagebus::{Bus, Error, Message};
use messagebus::{Builder, Bus, Error, Handler, IntoMessage, Message};
#[derive(Debug, Clone)]
pub struct Msg(pub i32);
@ -10,6 +12,35 @@ pub struct Processor {
state: i32,
}
impl Handler<Msg> for Processor {
type Result = ();
type IntoMessage = impl IntoMessage<Self::Result>;
type HandleFut<'a> = impl futures::Future<Output = Result<Self::IntoMessage, Error>> + 'a;
type FinalizeFut<'a> = impl futures::Future<Output = Result<(), Error>> + 'a;
fn handle(&mut self, msg: Msg, _stream_id: u32, _task_id: u32) -> Self::HandleFut<'_> {
async move { Ok(()) }
}
fn finalize<'a>(self) -> Self::FinalizeFut<'a> {
async move { Ok(()) }
}
}
struct ProcSpawner;
impl Builder<Msg> for ProcSpawner {
type Context = Processor;
type BuildFut<'a> = impl futures::Future<Output = Result<Self::Context, Error>> + 'a;
fn build(&self, stream_id: u32, _task_id: u32) -> Self::BuildFut<'_> {
async move {
Ok(Processor {
state: stream_id as _,
})
}
}
}
impl Processor {
pub async fn spawn(sid: u32) -> Result<(usize, Self), Error> {
Ok((4, Self { state: 0 }))
@ -26,12 +57,7 @@ impl Processor {
async fn run() {
let bus = Bus::new();
bus.register(
4,
Processor::spawn,
Processor::handler_msg,
Processor::finalize_msg_handler,
);
bus.register(ProcSpawner).await;
}
#[tokio::main]

102
src/builder.rs Normal file
View File

@ -0,0 +1,102 @@
use std::{marker::PhantomData, sync::Arc};
use futures::Future;
use crate::{Error, Message};
pub trait Builder<M: Message>: Send + Sync + 'static {
type Context: 'static;
type BuildFut<'a>: Future<Output = Result<Self::Context, Error>> + Send + 'a
where
Self: 'a;
fn parallel(&self, _stream_id: u32) -> (u32, bool) {
(1, false)
}
fn queue_size(&self, _stream_id: u32, _task_id: u32) -> usize {
4
}
fn build(&self, stream_id: u32, _task_id: u32) -> Self::BuildFut<'_>;
}
pub struct DefaultBuilder<M: Message, H>(usize, PhantomData<(M, H)>);
impl<M: Message, H> DefaultBuilder<M, H> {
pub fn new(queue_size: usize) -> Self {
Self(queue_size, Default::default())
}
}
impl<M: Message, H: Sync + Send + Default + 'static> Builder<M> for DefaultBuilder<M, H> {
type Context = H;
type BuildFut<'a> = impl Future<Output = Result<Self::Context, Error>> + Send + 'a;
fn build(&self, _stream_id: u32, _task_id: u32) -> Self::BuildFut<'_> {
async move { Ok(<Self::Context as Default>::default()) }
}
fn queue_size(&self, _stream_id: u32, _task_id: u32) -> usize {
self.0
}
}
pub struct SharedBuilder<M, H, C, F> {
queue_size: usize,
parallel: u32,
stream_handlers: dashmap::DashMap<u32, Arc<H>>,
callback: C,
ordered: bool,
_m: PhantomData<(M, F)>,
}
impl<M, H, C, F> SharedBuilder<M, H, C, F>
where
M: Message,
H: Sync + Send + 'static,
F: Sync + Send + Future<Output = Result<H, Error>> + 'static,
C: Sync + Send + Fn(u32, u32) -> F + 'static,
{
pub fn new(queue_size: usize, parallel: u32, ordered: bool, callback: C) -> Self {
Self {
queue_size,
parallel,
stream_handlers: Default::default(),
callback,
ordered,
_m: PhantomData,
}
}
}
impl<M, H, C, F> Builder<M> for SharedBuilder<M, H, C, F>
where
M: Message,
H: Sync + Send + 'static,
F: Sync + Send + Future<Output = Result<H, Error>> + 'static,
C: Sync + Send + Fn(u32, u32) -> F + 'static,
{
type Context = Arc<H>;
type BuildFut<'a> = impl Future<Output = Result<Self::Context, Error>> + Send + 'a;
fn build(&self, stream_id: u32, task_id: u32) -> Self::BuildFut<'_> {
async move {
if self.stream_handlers.contains_key(&stream_id) {
return Ok(self.stream_handlers.get(&stream_id).unwrap().clone());
}
let val = Arc::new((self.callback)(stream_id, task_id).await?);
self.stream_handlers.insert(stream_id, val.clone());
Ok(val.clone())
}
}
fn queue_size(&self, _stream_id: u32, _task_id: u32) -> usize {
self.queue_size
}
fn parallel(&self, _stream_id: u32) -> (u32, bool) {
(self.parallel, self.ordered)
}
}

113
src/chan.rs Normal file
View File

@ -0,0 +1,113 @@
use std::{any::Any, pin::Pin};
use futures::Future;
use crate::{message::Msg, Error, Message};
enum ChannelItem<T> {
Value(T),
Close,
}
pub(crate) trait BusSenderClose: Any + Send + Sync {
fn upcast(&self) -> &(dyn Any + Send + Sync);
fn is_producer(&self) -> bool;
fn load(&self) -> (usize, usize);
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>>;
fn terminate(&self) -> Result<(), Error>;
}
pub(crate) struct BusSender<M: Message> {
is_producer: bool,
tx: Sender<Msg<M>>,
}
impl<M: Message> BusSenderClose for BusSender<M> {
fn upcast(&self) -> &(dyn Any + Send + Sync) {
self
}
fn stop(&self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>> {
Box::pin(async move {
self.tx.stop().await?;
Ok(())
})
}
fn terminate(&self) -> Result<(), Error> {
self.tx.close()
}
fn is_producer(&self) -> bool {
self.is_producer
}
fn load(&self) -> (usize, usize) {
self.tx.load()
}
}
impl<M: Message> BusSender<M> {
pub async fn send(&self, m: Msg<M>) -> Result<(), Error> {
self.tx.send(m).await
}
#[inline]
pub(crate) fn new(is_producer: bool, tx: Sender<Msg<M>>) -> BusSender<M> {
Self { is_producer, tx }
}
}
pub(crate) struct Sender<T> {
inner: kanal::AsyncSender<ChannelItem<T>>,
}
impl<T> Sender<T> {
pub async fn send(&self, msg: T) -> Result<(), Error> {
self.inner.send(ChannelItem::Value(msg)).await?;
Ok(())
}
pub async fn stop(&self) -> Result<(), Error> {
self.inner.send(ChannelItem::Close).await?;
Ok(())
}
pub fn close(&self) -> Result<(), Error> {
self.inner.close();
Ok(())
}
pub fn load(&self) -> (usize, usize) {
(self.inner.len(), self.inner.capacity())
}
}
pub(crate) struct Receiver<T> {
inner: kanal::AsyncReceiver<ChannelItem<T>>,
}
impl<T> Receiver<T> {
#[inline]
pub async fn recv(&self) -> Option<T> {
let Ok(item) = self.inner.recv().await else {
return None;
};
match item {
ChannelItem::Value(val) => Some(val),
ChannelItem::Close => None,
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
pub(crate) fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = kanal::bounded_async(cap);
(Sender { inner: tx }, Receiver { inner: rx })
}

25
src/error.rs Normal file
View File

@ -0,0 +1,25 @@
#[derive(Debug)]
pub enum Error {
HandlerIsNotRegistered,
Aborted,
SendError(String),
ReceiveError(kanal::ReceiveError),
}
impl From<kanal::SendError> for Error {
fn from(value: kanal::SendError) -> Self {
Self::SendError(format!("{}", value))
}
}
impl From<kanal::ReceiveError> for Error {
fn from(value: kanal::ReceiveError) -> Self {
Self::ReceiveError(value)
}
}
// impl<M> From<mpsc::error::SendError<M>> for Error {
// fn from(value: mpsc::error::SendError<M>) -> Self {
// Self::SendError(format!("{}", value))
// }
// }

110
src/handler.rs Normal file
View File

@ -0,0 +1,110 @@
use std::{
any::{Any, TypeId},
marker::PhantomData,
sync::Arc,
};
use futures::Future;
use tokio::sync::Notify;
use crate::{
builder::Builder,
chan::{channel, Sender},
message::Msg,
task::{TaskCounter, TaskSpawner},
BusInner, Error, IntoMessage, Message,
};
pub trait Handler<M: Message>: Send + Sync + 'static {
type Result: Message;
type IntoMessage: IntoMessage<Self::Result>;
type HandleFut<'a>: Future<Output = Result<Self::IntoMessage, Error>> + Send + 'a
where
Self: 'a;
type FinalizeFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
fn handle(&mut self, msg: M, stream_id: u32, task_id: u32) -> Self::HandleFut<'_>;
fn finalize<'a>(self) -> Self::FinalizeFut<'a>;
}
pub(crate) struct HandlerSpawner<M, B> {
pub(crate) builder: B,
_m: PhantomData<M>,
}
impl<M, B> HandlerSpawner<M, B> {
pub(crate) fn new(builder: B) -> Self {
Self {
builder,
_m: PhantomData,
}
}
}
impl<M: Message, B: Builder<M>> TaskSpawner<M> for HandlerSpawner<M, B>
where
B::Context: Any + Handler<M>,
{
fn spawn_task(
&self,
stream_id: u32,
task_id: u32,
_abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
bus: Arc<BusInner>,
) -> Box<dyn Future<Output = Result<Sender<Msg<M>>, Error>> + Send + '_> {
Box::new(async move {
let bus = bus.clone();
let (tx, rx) = channel::<Msg<M>>(self.builder.queue_size(stream_id, task_id));
let mut ctx = self.builder.build(stream_id, task_id).await?;
let _handle = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
task_counter.inc_running();
let res_msg = match ctx.handle(msg.inner, stream_id, task_id).await {
Ok(res) => res.into_message(),
Err(err) => {
println!("TASK HANDLE ERROR: {:?}", err);
continue;
}
};
if let Some(inner) = res_msg {
if inner.type_id() != TypeId::of::<()>() {
if let Err(err) = bus
.send(Msg {
inner,
index: msg.index,
stream_id,
})
.await
{
println!("BUS SEND ERROR: {:?}", err);
continue;
}
}
}
task_counter.dec_running(rx.is_empty());
}
if let Err(err) = ctx.finalize().await {
println!("TASK FINALIZE ERROR: {:?}", err);
}
});
Ok(tx)
})
}
fn is_producer(&self) -> bool {
false
}
fn parallel(&self, stream_id: u32) -> (u32, bool) {
self.builder.parallel(stream_id)
}
}

File diff suppressed because it is too large Load Diff

28
src/message.rs Normal file
View File

@ -0,0 +1,28 @@
use core::fmt;
#[derive(Debug, Clone)]
pub(crate) struct Msg<M: Message> {
pub(crate) inner: M,
pub(crate) index: u64,
pub(crate) stream_id: u32,
}
pub trait Message: fmt::Debug + Clone + Send + Sync + 'static {}
impl Message for () {}
pub trait IntoMessage<M>: Send {
fn into_message(self) -> Option<M>;
}
impl<M: Message> IntoMessage<M> for Option<M> {
fn into_message(self) -> Option<M> {
self
}
}
impl<M: Message> IntoMessage<M> for M {
fn into_message(self) -> Option<M> {
Some(self)
}
}

116
src/producer.rs Normal file
View File

@ -0,0 +1,116 @@
use std::{marker::PhantomData, pin::pin, sync::Arc};
use futures::{Future, Stream, StreamExt};
use tokio::sync::Notify;
use crate::{
builder::Builder,
chan::{channel, Sender},
message::Msg,
task::{TaskCounter, TaskSpawner},
BusInner, Error, IntoMessage, Message,
};
pub trait Producer<M: Message>: Send + Sync + 'static {
type Item: Message;
type IntoMessage: IntoMessage<Self::Item>;
type Stream<'a>: Stream<Item = Result<Self::IntoMessage, Error>> + Send + 'a
where
Self: 'a;
type FinalizeFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;
fn stream(&mut self, msg: M, stream_id: u32, task_id: u32) -> Self::Stream<'_>;
fn finalize<'a>(self) -> Self::FinalizeFut<'a>;
}
pub(crate) struct ProducerSpawner<M, B> {
pub(crate) builder: B,
_m: PhantomData<M>,
}
impl<M, B> ProducerSpawner<M, B> {
pub(crate) fn new(builder: B) -> Self {
Self {
builder,
_m: PhantomData,
}
}
}
impl<M: Message, B: Builder<M>> TaskSpawner<M> for ProducerSpawner<M, B>
where
B::Context: Producer<M>,
{
fn spawn_task(
&self,
stream_id: u32,
task_id: u32,
abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
bus: Arc<BusInner>,
) -> Box<dyn Future<Output = Result<Sender<Msg<M>>, Error>> + Send + '_> {
Box::new(async move {
let (tx, rx) = channel::<Msg<M>>(self.builder.queue_size(stream_id, task_id));
let mut ctx = self.builder.build(stream_id, task_id).await?;
let _handle = tokio::spawn(async move {
while let Some(recv_msg) = rx.recv().await {
task_counter.inc_running();
let mut stream = pin!(ctx
.stream(recv_msg.inner, stream_id, task_id)
.take_until(abort.notified()));
let mut index = 0;
loop {
index += 1;
match stream.next().await {
Some(Ok(msg)) => {
if let Some(inner) = msg.into_message() {
if let Err(err) = bus
.send(Msg {
inner,
index: index - 1,
stream_id,
})
.await
{
println!("BUS SEND ERROR: {:?}", err);
continue;
}
}
}
Some(Err(err)) => {
println!("PRODUCER ERROR: {:?}", err);
continue;
}
None => break,
}
}
task_counter.dec_running(rx.is_empty());
}
if let Err(err) = ctx.finalize().await {
println!("TASK FINALIZE ERROR: {:?}", err);
}
});
Ok(tx)
})
}
fn is_producer(&self) -> bool {
true
}
fn parallel(&self, stream_id: u32) -> (u32, bool) {
self.builder.parallel(stream_id)
}
}

75
src/rand.rs Normal file
View File

@ -0,0 +1,75 @@
use std::sync::Mutex;
use rand::{RngCore, SeedableRng};
use rand_xorshift::XorShiftRng;
static RNG_SEEDS: [u8; 256] = [
7, 86, 77, 188, 83, 136, 60, 245, 248, 212, 156, 114, 143, 47, 160, 72, 190, 243, 158, 20, 240,
198, 25, 8, 27, 229, 179, 165, 186, 148, 239, 118, 187, 24, 152, 154, 102, 45, 101, 159, 8, 33,
158, 161, 42, 183, 189, 173, 58, 200, 121, 45, 11, 168, 245, 161, 186, 43, 244, 251, 244, 246,
137, 244, 112, 157, 102, 234, 65, 138, 23, 105, 46, 192, 114, 52, 233, 28, 93, 207, 186, 94,
55, 24, 182, 170, 61, 90, 180, 120, 32, 229, 219, 144, 227, 255, 18, 148, 69, 118, 164, 66, 5,
243, 18, 190, 21, 224, 151, 225, 229, 136, 112, 1, 181, 246, 64, 53, 249, 166, 22, 104, 255,
239, 127, 125, 29, 174, 115, 212, 213, 211, 230, 111, 194, 16, 20, 115, 192, 109, 254, 157,
175, 228, 10, 173, 10, 208, 214, 129, 57, 120, 53, 188, 24, 147, 223, 108, 77, 151, 1, 245,
151, 38, 186, 95, 28, 242, 0, 2, 161, 86, 154, 73, 138, 225, 40, 235, 26, 195, 42, 229, 15,
149, 41, 53, 230, 175, 36, 88, 205, 61, 113, 204, 253, 150, 193, 47, 231, 102, 23, 182, 225,
197, 29, 193, 120, 171, 198, 164, 148, 206, 57, 197, 193, 201, 102, 147, 249, 4, 230, 22, 75,
40, 145, 144, 113, 152, 115, 170, 41, 29, 154, 166, 74, 71, 23, 148, 139, 244, 114, 139, 66,
73, 231, 167, 59, 201, 33, 58, 188, 120, 70, 142, 251, 95,
];
struct RngGenInner {
rng: XorShiftRng,
counter: u32,
}
impl RngGenInner {
#[inline]
fn next_u32_pair(&mut self, count: u32) -> (u32, u32) {
if self.counter >= 16 {
self.reseed_next();
self.counter = 0;
}
let r1 = self.rng.next_u32() % count;
let r2 = loop {
let val = self.rng.next_u32() % count;
if r1 != val {
break val;
}
};
self.counter += 1;
(r1, r2)
}
fn reseed_next(&mut self) {
let offset = (self.rng.next_u32() % 240) as usize;
let seed: [u8; 16] = RNG_SEEDS[offset..offset + 16].try_into().unwrap();
self.rng = XorShiftRng::from_seed(seed);
}
}
pub(crate) struct RndGen {
inner: Mutex<RngGenInner>,
}
impl Default for RndGen {
fn default() -> Self {
Self {
inner: Mutex::new(RngGenInner {
rng: XorShiftRng::from_seed(RNG_SEEDS[0..16].try_into().unwrap()),
counter: 0,
}),
}
}
}
impl RndGen {
#[inline]
pub fn next_u32_pair(&self, count: u32) -> (u32, u32) {
self.inner.lock().unwrap().next_u32_pair(count)
}
}

68
src/reorder_queue.rs Normal file
View File

@ -0,0 +1,68 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use crate::{message::Msg, Message};
struct Entry<M: Message>(Msg<M>);
impl<M: Message> PartialOrd for Entry<M> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(other.0.index.cmp(&self.0.index))
}
}
impl<M: Message> Ord for Entry<M> {
fn cmp(&self, other: &Self) -> Ordering {
other.0.index.cmp(&self.0.index)
}
}
impl<M: Message> PartialEq for Entry<M> {
fn eq(&self, other: &Self) -> bool {
other.0.index.eq(&self.0.index)
}
}
impl<M: Message> Eq for Entry<M> {}
pub(crate) struct ReorderQueue<M: Message> {
cap: usize,
recent_index: Option<u64>,
heap: BinaryHeap<Entry<M>>,
}
impl<M: Message> ReorderQueue<M> {
pub fn new(cap: usize) -> Self {
Self {
cap,
recent_index: None,
heap: BinaryHeap::with_capacity(cap + 1),
}
}
pub fn push(&mut self, msg: Msg<M>) {
self.heap.push(Entry(msg));
if self.heap.len() == self.cap {
self.recent_index = None;
}
}
pub fn pop(&mut self) -> Option<Msg<M>> {
match self.recent_index {
None => {
let e = self.heap.pop()?;
self.recent_index = Some(e.0.index);
Some(e.0)
}
Some(ri) => {
let e = self.heap.peek()?;
if e.0.index == ri + 1 {
self.recent_index = Some(e.0.index);
Some(self.heap.pop()?.0)
} else {
None
}
}
}
}
}

110
src/task.rs Normal file
View File

@ -0,0 +1,110 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use futures::Future;
use tokio::sync::Notify;
use crate::{
chan::{BusSender, Sender},
handler::HandlerSpawner,
message::Msg,
producer::ProducerSpawner,
Builder, BusInner, Error, Handler, Message, Producer,
};
#[derive(Default)]
pub(crate) struct TaskCounter {
running: AtomicUsize,
notify: Notify,
}
impl TaskCounter {
#[inline]
pub fn inc_running(&self) {
self.running.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn dec_running(&self, notify: bool) {
let prev = self.running.fetch_sub(1, Ordering::Relaxed);
if notify && prev == 1 {
self.notify.notify_waiters();
}
}
#[inline]
pub async fn wait(&self) {
self.notify.notified().await
}
}
pub(crate) trait TaskSpawner<M: Message>: Send + Sync {
fn parallel(&self, stream_id: u32) -> (u32, bool);
fn is_producer(&self) -> bool;
fn spawn_task(
&self,
stream_id: u32,
task_id: u32,
abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
bus: Arc<BusInner>,
) -> Box<dyn Future<Output = Result<Sender<Msg<M>>, Error>> + Send + '_>;
}
pub(crate) struct TaskSpawnerWrapper<M: Message> {
inner: Arc<dyn TaskSpawner<M>>,
}
impl<M: Message> Clone for TaskSpawnerWrapper<M> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<M: Message> TaskSpawnerWrapper<M> {
pub fn from_handler<B: Builder<M> + 'static>(builder: B) -> Self
where
B::Context: Handler<M>,
{
Self {
inner: Arc::new(HandlerSpawner::new(builder)) as _,
}
}
pub fn from_producer<B: Builder<M> + 'static>(builder: B) -> Self
where
B::Context: Producer<M>,
{
Self {
inner: Arc::new(ProducerSpawner::new(builder)) as _,
}
}
#[inline]
pub async fn spawn_task(
&self,
stream_id: u32,
task_id: u32,
abort: Arc<Notify>,
task_counter: Arc<TaskCounter>,
bus: Arc<BusInner>,
) -> Result<BusSender<M>, Error> {
Ok(BusSender::new(
self.inner.is_producer(),
Box::into_pin(
self.inner
.spawn_task(stream_id, task_id, abort, task_counter, bus),
)
.await?,
))
}
#[inline]
pub(crate) fn parallel(&self, stream_id: u32) -> (u32, bool) {
self.inner.parallel(stream_id)
}
}