Queue step 1

This commit is contained in:
Andrey Tkachenko 2023-02-28 14:51:32 +04:00 committed by Andrey Tkachenko
parent 7fbbed15d8
commit 800da160ac
14 changed files with 394 additions and 146 deletions

View File

@ -31,6 +31,7 @@ tokio = { version = "1.17.0", features = ["sync", "rt" ] }
smallvec = { version = "1.10.0", features = ["const_new"] } smallvec = { version = "1.10.0", features = ["const_new"] }
tokio-util = "0.7.7" tokio-util = "0.7.7"
arc-swap = "1.6.0" arc-swap = "1.6.0"
once_cell = "1.17.1"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.17.0", features = ["full"] } tokio = { version = "1.17.0", features = ["full"] }

View File

@ -86,6 +86,10 @@ impl Message for Msg {
{ {
Some(Self(self.0)) Some(Self(self.0))
} }
fn is_cloneable(&self) -> bool {
false
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct StartMsg; struct StartMsg;
@ -152,6 +156,10 @@ impl Message for StartMsg {
{ {
Some(Self) Some(Self)
} }
fn is_cloneable(&self) -> bool {
false
}
} }
struct Test { struct Test {
@ -190,7 +198,7 @@ impl Handler<Msg> for Test {
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a; type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn handle(&self, msg: &mut MsgCell<Msg>, bus: &Bus) -> Self::HandleFuture<'_> { fn handle(&self, msg: &mut MsgCell<Msg>, bus: &Bus) -> Self::HandleFuture<'_> {
let msg = msg.take().unwrap(); let msg = msg.get();
async move { async move {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;

View File

@ -9,7 +9,6 @@ use messagebus::{
error::Error, error::Error,
handler::Handler, handler::Handler,
message::{Message, SharedMessage}, message::{Message, SharedMessage},
receiver::IntoAbstractReceiver,
receivers::wrapper::HandlerWrapper, receivers::wrapper::HandlerWrapper,
type_tag::{TypeTag, TypeTagInfo}, type_tag::{TypeTag, TypeTagInfo},
}; };
@ -79,6 +78,10 @@ impl Message for Msg {
{ {
Some(Self(self.0)) Some(Self(self.0))
} }
fn is_cloneable(&self) -> bool {
false
}
} }
struct Test { struct Test {
@ -91,7 +94,7 @@ impl Handler<Msg> for Test {
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a; type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn handle(&self, msg: &mut MsgCell<Msg>, _bus: &Bus) -> Self::HandleFuture<'_> { fn handle(&self, msg: &mut MsgCell<Msg>, _bus: &Bus) -> Self::HandleFuture<'_> {
let msg = msg.take().unwrap(); let msg = msg.get();
async move { async move {
println!("msg {msg:?}"); println!("msg {msg:?}");

View File

@ -16,6 +16,8 @@ use crate::{
receiver::{AbstractReceiver, IntoAbstractReceiver, Receiver}, receiver::{AbstractReceiver, IntoAbstractReceiver, Receiver},
}; };
pub use crate::handler::*;
pub struct TaskHandlerVTable { pub struct TaskHandlerVTable {
pub drop: fn(Arc<dyn Any + Send + Sync>, usize), pub drop: fn(Arc<dyn Any + Send + Sync>, usize),
} }
@ -262,9 +264,17 @@ impl BusReceivers {
} }
} }
#[inline]
pub fn add(&mut self, mask: MaskMatch, inner: Arc<dyn AbstractReceiver>) { pub fn add(&mut self, mask: MaskMatch, inner: Arc<dyn AbstractReceiver>) {
self.inner.push(BusReceiver { inner, mask }) self.inner.push(BusReceiver { inner, mask })
} }
#[inline]
pub fn iter(&self, mask: u64) -> impl Iterator<Item = &Arc<dyn AbstractReceiver>> + '_ {
self.inner
.iter()
.filter_map(move |x| x.mask.test(mask).then(move || &x.inner))
}
} }
impl From<Arc<dyn AbstractReceiver>> for BusReceivers { impl From<Arc<dyn AbstractReceiver>> for BusReceivers {
@ -335,12 +345,18 @@ impl BusInner {
continue; continue;
} }
let task = receiver.inner.try_send_dyn(msg, bus)?; match receiver.inner.try_send_dyn(msg, bus) {
Ok(task) => {
let receiver = receiver.clone(); let receiver = receiver.clone();
self.processing.push(task, receiver.inner, false); self.processing.push(task, receiver.inner, false);
} }
Err(err) => {
println!("send failed {}", err);
}
}
}
Ok(()) Ok(())
} }
@ -362,12 +378,18 @@ impl BusInner {
continue; continue;
} }
let task = receiver.inner.send_dyn(msg, bus.clone()).await?; match receiver.inner.send_dyn(msg, bus.clone()).await {
Ok(task) => {
let receiver = receiver.clone(); let receiver = receiver.clone();
self.processing.push(task, receiver.inner, false); self.processing.push(task, receiver.inner, false);
} }
Err(err) => {
println!("send failed {}", err);
}
}
}
Ok(()) Ok(())
} }
@ -389,12 +411,18 @@ impl BusInner {
continue; continue;
} }
let task = receiver.inner.send_dyn(msg, bus.clone()).await?; match receiver.inner.send_dyn(msg, bus.clone()).await {
Ok(task) => {
let receiver = receiver.clone(); let receiver = receiver.clone();
self.processing.push(task, receiver.inner, true); self.processing.push(task, receiver.inner, true);
} }
Err(err) => {
println!("send failed {}", err);
}
}
}
Ok(()) Ok(())
} }

View File

@ -7,6 +7,7 @@ pub trait MessageCell: Send + 'static {
fn type_tag(&self) -> TypeTag; fn type_tag(&self) -> TypeTag;
fn as_any(&self) -> &dyn Any; fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any;
fn finalize(&mut self);
fn deserialize_from(&mut self, de: &mut dyn erased_serde::Deserializer) -> Result<(), Error>; fn deserialize_from(&mut self, de: &mut dyn erased_serde::Deserializer) -> Result<(), Error>;
} }
@ -49,7 +50,7 @@ impl dyn MessageCell {
pub fn take_cell<T: Message>(&mut self) -> Result<MsgCell<T>, Error> { pub fn take_cell<T: Message>(&mut self) -> Result<MsgCell<T>, Error> {
match self.as_any_mut().downcast_mut::<MsgCell<T>>() { match self.as_any_mut().downcast_mut::<MsgCell<T>>() {
Some(cell) => Ok(MsgCell(cell.take().ok())), Some(cell) => Ok(MsgCell(cell.0.take())),
None => Err(Error::MessageDynamicCastFail( None => Err(Error::MessageDynamicCastFail(
self.type_tag(), self.type_tag(),
T::TYPE_TAG(), T::TYPE_TAG(),
@ -59,7 +60,7 @@ impl dyn MessageCell {
pub fn take<T: Message>(&mut self) -> Result<T, Error> { pub fn take<T: Message>(&mut self) -> Result<T, Error> {
match self.as_any_mut().downcast_mut::<MsgCell<T>>() { match self.as_any_mut().downcast_mut::<MsgCell<T>>() {
Some(cell) => cell.take(), Some(cell) => cell.take().ok_or(Error::EmptyMessageCellError),
None => Err(Error::MessageDynamicCastFail( None => Err(Error::MessageDynamicCastFail(
self.type_tag(), self.type_tag(),
T::TYPE_TAG(), T::TYPE_TAG(),
@ -67,15 +68,15 @@ impl dyn MessageCell {
} }
} }
pub fn cloned<T: Message>(&self) -> Result<MsgCell<T>, Error> { // pub fn cloned<T: Message>(&self) -> Result<MsgCell<T>, Error> {
match self.as_any().downcast_ref::<MsgCell<T>>() { // match self.as_any().downcast_ref::<MsgCell<T>>() {
Some(cell) => Ok(cell.clone()), // Some(cell) => Ok(cell.clone()),
None => Err(Error::MessageDynamicCastFail( // None => Err(Error::MessageDynamicCastFail(
self.type_tag(), // self.type_tag(),
T::TYPE_TAG(), // T::TYPE_TAG(),
)), // )),
} // }
} // }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -133,21 +134,107 @@ impl<R: Message> MessageCell for ResultCell<R> {
R::TYPE_TAG() R::TYPE_TAG()
} }
fn finalize(&mut self) {}
fn deserialize_from(&mut self, _de: &mut dyn erased_serde::Deserializer) -> Result<(), Error> { fn deserialize_from(&mut self, _de: &mut dyn erased_serde::Deserializer) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
#[derive(Debug, Clone)] #[derive(Debug)]
pub struct MsgCell<M>(Option<M>); enum MsgCellInner<M> {
Empty,
Clonable(M),
Message(M),
}
impl<M: Message> MsgCellInner<M> {
pub fn as_ref(&self) -> Option<&M> {
match self {
MsgCellInner::Empty => None,
MsgCellInner::Clonable(x) => Some(x),
MsgCellInner::Message(x) => Some(x),
}
}
#[inline]
fn get(&mut self) -> M {
match std::mem::replace(self, MsgCellInner::Empty) {
MsgCellInner::Empty => panic!("!!!"),
MsgCellInner::Clonable(m) => {
if let Some(x) = m.try_clone() {
*self = MsgCellInner::Clonable(m);
x
} else {
m
}
}
MsgCellInner::Message(m) => m,
}
}
#[inline]
fn take(&mut self) -> MsgCellInner<M> {
std::mem::replace(self, MsgCellInner::Empty)
}
#[inline]
fn take_option(self) -> Option<M> {
match self {
MsgCellInner::Empty => None,
MsgCellInner::Clonable(m) => Some(m),
MsgCellInner::Message(m) => Some(m),
}
}
#[inline]
fn finalize(&mut self) {
*self = match std::mem::replace(self, MsgCellInner::Empty) {
MsgCellInner::Empty => MsgCellInner::Empty,
MsgCellInner::Clonable(m) => MsgCellInner::Message(m),
MsgCellInner::Message(m) => MsgCellInner::Message(m),
}
}
#[inline]
fn put(&mut self, val: M) {
*self = MsgCellInner::new(val);
}
#[inline]
fn unwrap_or<T>(self, err: T) -> Result<M, T> {
match self {
MsgCellInner::Empty => Err(err),
MsgCellInner::Clonable(m) => Ok(m),
MsgCellInner::Message(m) => Ok(m),
}
}
#[inline]
fn is_empty(&self) -> bool {
matches!(self, MsgCellInner::Empty)
}
#[inline]
fn new(msg: M) -> MsgCellInner<M> {
if msg.is_cloneable() {
MsgCellInner::Clonable(msg)
} else {
MsgCellInner::Message(msg)
}
}
}
#[derive(Debug)]
pub struct MsgCell<M>(MsgCellInner<M>);
impl<M: Message> MsgCell<M> { impl<M: Message> MsgCell<M> {
pub fn new(msg: M) -> Self { pub fn new(msg: M) -> Self {
MsgCell(Some(msg)) MsgCell(MsgCellInner::new(msg))
} }
pub fn empty() -> Self { pub fn empty() -> Self {
MsgCell(None) MsgCell(MsgCellInner::Empty)
} }
#[inline] #[inline]
@ -156,8 +243,13 @@ impl<M: Message> MsgCell<M> {
} }
#[inline] #[inline]
pub fn take(&mut self) -> Result<M, Error> { pub fn get(&mut self) -> M {
self.0.take().ok_or(Error::EmptyMessageCellError) self.0.get()
}
#[inline]
pub fn take(&mut self) -> Option<M> {
self.0.take().take_option()
} }
#[inline] #[inline]
@ -167,7 +259,12 @@ impl<M: Message> MsgCell<M> {
#[inline] #[inline]
pub fn put(&mut self, val: M) { pub fn put(&mut self, val: M) {
self.0.replace(val); self.0.put(val);
}
#[inline]
pub fn make_value(&mut self) {
self.0.finalize()
} }
#[inline] #[inline]
@ -175,15 +272,15 @@ impl<M: Message> MsgCell<M> {
self self
} }
#[inline] // #[inline]
pub fn clone(&self) -> MsgCell<M> { // pub fn clone(&self) -> MsgCell<M> {
MsgCell(self.0.as_ref().and_then(|x| x.try_clone())) // MsgCell(MsgCellInner::)
} // }
} }
impl<M: Message> MessageCell for MsgCell<M> { impl<M: Message> MessageCell for MsgCell<M> {
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.0.is_none() self.0.is_empty()
} }
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
@ -198,6 +295,10 @@ impl<M: Message> MessageCell for MsgCell<M> {
M::TYPE_TAG() M::TYPE_TAG()
} }
fn finalize(&mut self) {
self.0.finalize()
}
fn deserialize_from(&mut self, _de: &mut dyn erased_serde::Deserializer) -> Result<(), Error> { fn deserialize_from(&mut self, _de: &mut dyn erased_serde::Deserializer) -> Result<(), Error> {
Ok(()) Ok(())
} }

View File

@ -2,7 +2,7 @@ use futures::Future;
use crate::{bus::Bus, cell::MsgCell, error::Error, message::Message}; use crate::{bus::Bus, cell::MsgCell, error::Error, message::Message};
pub trait Handler<M: Message> { pub trait Handler<M: Message>: Send + Sync {
type Response: Message; type Response: Message;
type HandleFuture<'a>: Future<Output = Result<Self::Response, Error>> + Send + 'a type HandleFuture<'a>: Future<Output = Result<Self::Response, Error>> + Send + 'a
where where
@ -16,7 +16,7 @@ pub trait Handler<M: Message> {
fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_>; fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_>;
} }
pub trait MessageProducer<M: Message> { pub trait MessageProducer<M: Message>: Send + Sync {
type Message: Message; type Message: Message;
type StartFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a type StartFuture<'a>: Future<Output = Result<(), Error>> + Send + 'a

View File

@ -11,4 +11,92 @@ pub mod receiver;
pub mod receivers; pub mod receivers;
pub mod type_tag; pub mod type_tag;
pub use bus::Bus;
pub use handler::*;
pub use message::*;
mod wakelist; mod wakelist;
#[macro_export]
macro_rules! derive_message_clone {
($const_name: ident, $struct_name: ty, $name: literal) => {
lazy_static::lazy_static! {
static ref $const_name: $crate::type_tag::TypeTag = $crate::type_tag::TypeTagInfo::parse($name).unwrap().into();
}
impl $crate::Message for $struct_name {
fn TYPE_TAG() -> $crate::type_tag::TypeTag
where
Self: Sized,
{
$const_name.clone()
}
fn type_tag(&self) -> $crate::type_tag::TypeTag {
$const_name.clone()
}
fn type_layout(&self) -> std::alloc::Layout {
std::alloc::Layout::new::<Self>()
}
fn as_any_ref(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
fn as_any_boxed(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
fn as_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn std::any::Any> {
self
}
fn as_shared_ref(&self) -> Option<&dyn $crate::message::SharedMessage> {
None
}
fn as_shared_mut(&mut self) -> Option<&mut dyn $crate::message::SharedMessage> {
None
}
fn as_shared_boxed(
self: Box<Self>,
) -> Result<Box<dyn $crate::message::SharedMessage>, Box<dyn $crate::Message>> {
Err(self)
}
fn as_shared_arc(
self: std::sync::Arc<Self>,
) -> Option<std::sync::Arc<dyn $crate::message::SharedMessage>> {
None
}
fn try_clone_into(&self, into: &mut dyn $crate::cell::MessageCell) -> bool {
into.into_typed::<Self>()
.map(|c| c.put(self.clone()))
.is_ok()
}
fn try_clone_boxed(&self) -> Option<Box<dyn $crate::Message>> {
Some(Box::new(self.clone()))
}
fn is_cloneable(&self) -> bool {
true
}
fn try_clone(&self) -> Option<Self>
where
Self: Sized,
{
Some(self.clone())
}
}
};
}
derive_message_clone!(VOID, (), "void");

View File

@ -1,7 +1,7 @@
use core::fmt; use core::fmt;
use std::{alloc::Layout, any::Any, sync::Arc}; use std::{alloc::Layout, any::Any, sync::Arc};
use crate::type_tag::TypeTag; use crate::{cell::MessageCell, type_tag::TypeTag};
pub trait ErrorMessage: Message {} pub trait ErrorMessage: Message {}
@ -24,8 +24,9 @@ pub trait Message: fmt::Debug + Unpin + Send + Sync + 'static {
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>>; fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>>;
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>>; fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>>;
fn try_clone_into(&self, into: &mut dyn Message) -> bool; fn try_clone_into(&self, into: &mut dyn MessageCell) -> bool;
fn try_clone_boxed(&self) -> Option<Box<dyn Message>>; fn try_clone_boxed(&self) -> Option<Box<dyn Message>>;
fn is_cloneable(&self) -> bool;
fn try_clone(&self) -> Option<Self> fn try_clone(&self) -> Option<Self>
where where

View File

@ -15,7 +15,7 @@ use crate::{
type_tag::TypeTagQuery, type_tag::TypeTagQuery,
}; };
pub trait Receiver<M: Message, R: Message> { pub trait Receiver<M: Message, R: Message>: Send + Sync {
fn poll_send( fn poll_send(
&self, &self,
msg: &mut MsgCell<M>, msg: &mut MsgCell<M>,
@ -33,35 +33,35 @@ pub trait Receiver<M: Message, R: Message> {
} }
pub trait ReceiverEx<M: Message, R: Message>: Receiver<M, R> { pub trait ReceiverEx<M: Message, R: Message>: Receiver<M, R> {
type SendFut<'a>: Future<Output = Result<TaskHandler, Error>> + 'a type SendFut<'a>: Future<Output = Result<TaskHandler, Error>> + Send + 'a
where where
Self: 'a; Self: 'a;
type RequestFut<'a>: Future<Output = Result<R, Error>> + 'a type RequestFut<'a>: Future<Output = Result<R, Error>> + Send + 'a
where where
Self: 'a; Self: 'a;
type ResultFut<'a>: Future<Output = Result<R, Error>> + 'a type ResultFut<'a>: Future<Output = Result<R, Error>> + Send + 'a
where where
Self: 'a; Self: 'a;
type ProcessFut<'a>: Future<Output = Result<(), Error>> + 'a type ProcessFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where where
Self: 'a; Self: 'a;
fn try_send(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Result<(), Error>; fn try_send(&self, msg: &mut MsgCell<M>, bus: &Bus) -> Result<TaskHandler, Error>;
fn send(&self, msg: MsgCell<M>, bus: Bus) -> Self::SendFut<'_>; fn send(&self, msg: MsgCell<M>, bus: Bus) -> Self::SendFut<'_>;
fn request(&self, msg: MsgCell<M>, bus: Bus) -> Self::RequestFut<'_>; fn request(&self, msg: MsgCell<M>, bus: Bus) -> Self::RequestFut<'_>;
fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_>; fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_>;
fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_>; fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_>;
} }
impl<M: Message, R: Message, H: Receiver<M, R> + 'static> ReceiverEx<M, R> for H { impl<M: Message, R: Message, H: Receiver<M, R> + Send + Sync + 'static> ReceiverEx<M, R> for H {
type SendFut<'a> = impl Future<Output = Result<TaskHandler, Error>> + 'a; type SendFut<'a> = impl Future<Output = Result<TaskHandler, Error>> + Send + 'a;
type RequestFut<'a> = impl Future<Output = Result<R, Error>> + 'a; type RequestFut<'a> = impl Future<Output = Result<R, Error>> + Send + 'a;
type ResultFut<'a> = impl Future<Output = Result<R, Error>> + 'a; type ResultFut<'a> = impl Future<Output = Result<R, Error>> + Send + 'a;
type ProcessFut<'a> = impl Future<Output = Result<(), Error>> + 'a; type ProcessFut<'a> = impl Future<Output = Result<(), Error>> + Send + 'a;
fn try_send(&self, cell: &mut MsgCell<M>, bus: &Bus) -> Result<(), Error> { fn try_send(&self, cell: &mut MsgCell<M>, bus: &Bus) -> Result<TaskHandler, Error> {
match self.poll_send(cell, None, bus) { match self.poll_send(cell, None, bus) {
Poll::Ready(_) => Ok(()), Poll::Ready(handler) => handler,
Poll::Pending => Err(Error::TrySendError), Poll::Pending => Err(Error::TrySendError),
} }
} }

View File

@ -1,10 +1,9 @@
// pub mod limit; // pub mod limit;
// pub mod handle; // pub mod handle;
// pub mod dispatcher; // pub mod dispatcher;
// pub mod queue;
pub mod spawner;
// pub mod unordered;
pub mod producer; pub mod producer;
pub mod queue;
pub mod spawner;
pub mod wrapper; pub mod wrapper;
// pub use queue::*; // pub use queue::*;

View File

@ -19,7 +19,7 @@ use crate::{
}; };
type SendFuture<M: Message, T: MessageProducer<M> + 'static> = type SendFuture<M: Message, T: MessageProducer<M> + 'static> =
impl Future<Output = Result<(), Error>>; impl Future<Output = Result<(), Error>> + Send;
pub struct ProducerWrapper<M: Message, T: MessageProducer<M> + 'static> { pub struct ProducerWrapper<M: Message, T: MessageProducer<M> + 'static> {
inner: Arc<T>, inner: Arc<T>,
@ -320,6 +320,10 @@ mod tests {
{ {
Some(Self(self.0)) Some(Self(self.0))
} }
fn is_cloneable(&self) -> bool {
false
}
} }
struct Test { struct Test {

View File

@ -1,5 +1,6 @@
use std::{ use std::{
marker::PhantomData, marker::PhantomData,
pin::Pin,
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Arc,
@ -7,84 +8,71 @@ use std::{
task::{ready, Context, Poll, Waker}, task::{ready, Context, Poll, Waker},
}; };
use arc_swap::{ArcSwapAny, ArcSwapOption}; use crossbeam::queue::ArrayQueue;
use crossbeam::{atomic::AtomicCell, queue::ArrayQueue}; use futures::{task::AtomicWaker, Future};
use futures::task::AtomicWaker;
use parking_lot::Mutex; use parking_lot::Mutex;
use sharded_slab::Slab;
use crate::{ use crate::{
bus::{Bus, TaskHandler}, bus::{Bus, TaskHandler},
cell::{MsgCell, ResultCell}, cell::{MsgCell, ResultCell},
error::Error, error::Error,
message::Message, message::Message,
receiver::Receiver, receiver::{Receiver, ReceiverEx},
wakelist::WakeList,
}; };
enum QueueTaskInner<M> { struct QueueItem<M: Message> {
Vacant,
Sending(MsgCell<M>),
WaitingResult,
}
struct QueueTask<M: Message> {
index: usize, index: usize,
waker: AtomicWaker, generation: u64,
generation: AtomicU64, message: MsgCell<M>,
message: Mutex<QueueTaskInner<M>>,
} }
impl<M: Message> QueueTask<M> { impl<M: Message> QueueItem<M> {
fn check_task(&self, handler: &TaskHandler) -> bool { fn new(index: usize, generation: u64, message: MsgCell<M>) -> QueueItem<M> {
false Self {
index,
generation,
message,
}
} }
} }
type SendFuture<M: Message, R: Message, T: Receiver<M, R> + 'static> =
impl Future<Output = Result<R, Error>> + Send;
pub struct Queue<M: Message, R: Message, T: Receiver<M, R> + 'static> { pub struct Queue<M: Message, R: Message, T: Receiver<M, R> + 'static> {
inner: T, inner: Arc<T>,
queue: ArrayQueue<usize>,
free: ArrayQueue<usize>, free: ArrayQueue<usize>,
enqueued: AtomicUsize, queue: ArrayQueue<QueueItem<M>>,
wakers: Arc<[AtomicWaker]>,
limit: usize,
wakelist: Mutex<WakeList>,
current: Mutex<Pin<Box<Option<SendFuture<M, R, T>>>>>,
generation_sequence: AtomicU64, generation_sequence: AtomicU64,
tasks: Arc<[Arc<QueueTask<M>>]>,
current_task: AtomicUsize,
_m: PhantomData<(M, R, T)>,
} }
impl<M: Message, R: Message, T: Receiver<M, R> + 'static> Queue<M, R, T> { impl<M: Message, R: Message, T: Receiver<M, R> + 'static> Queue<M, R, T> {
pub fn new(inner: T, limit: usize) -> Self { pub fn new(inner: T, limit: usize) -> Self {
let free = ArrayQueue::new(limit); let free = ArrayQueue::new(limit);
for i in 0..limit { for i in 0..limit {
free.push(i); let _ = free.push(i);
} }
Self { Self {
inner, limit,
inner: Arc::new(inner),
free, free,
queue: ArrayQueue::new(limit), queue: ArrayQueue::new(limit),
enqueued: AtomicUsize::new(0), wakers: (0..limit).map(|_| AtomicWaker::new()).collect(),
generation_sequence: AtomicU64::new(0), wakelist: Mutex::new(WakeList::new()),
tasks: (0..limit) current: Mutex::new(Box::pin(None)),
.map(|index| { generation_sequence: AtomicU64::new(1),
Arc::new(QueueTask {
index,
waker: AtomicWaker::new(),
generation: AtomicU64::new(0),
message: Mutex::new(QueueTaskInner::Vacant),
})
})
.collect(),
current_task: AtomicUsize::new(usize::MAX),
_m: PhantomData::default(),
} }
} }
#[inline] #[inline(always)]
fn update_task_waker(&self, _task: &TaskHandler, _wakerr: &Waker) { fn next_gen(&self) -> u64 {
// TODO self.generation_sequence.fetch_add(1, Ordering::Relaxed)
} }
} }
@ -95,30 +83,59 @@ impl<'a, M: Message, R: Message, T: Receiver<M, R> + 'static> Receiver<M, R> for
cx: Option<&mut Context<'_>>, cx: Option<&mut Context<'_>>,
bus: &Bus, bus: &Bus,
) -> Poll<Result<TaskHandler, Error>> { ) -> Poll<Result<TaskHandler, Error>> {
if let Some(index) = self.free.pop() { // trying fast track
if let Ok(_) = self.current_task.compare_exchange( if self.free.is_full() {
usize::MAX, if let Some(mut lock) = self.current.try_lock() {
index, if lock.is_none() {
Ordering::Relaxed, let inner = self.inner.clone();
Ordering::Relaxed, let bus = bus.clone();
) {
// fast track
// self.tasks[index].start(msg);
let th = TaskHandler::new(vtable, self.tasks.clone(), index);
Poll::Ready(Ok(th)) enum Val<M: Message> {
} Task(TaskHandler),
Cell(MsgCell<M>),
} }
if current.is_none() { let val = if let Ok(task) = inner.try_send(msg, &bus) {
let Some(index) = self.free.pop() else { Val::Task(task)
// TODO } else {
// self.send_wakelist.push(); Val::Cell(msg.take_cell())
return Poll::Pending;
}; };
// .start(msg); drop(
drop(current); unsafe { (&mut *lock).as_mut().get_unchecked_mut() }.replace(async move {
match val {
Val::Task(task) => inner.result(task, bus).await,
Val::Cell(cell) => inner.request(cell, bus).await,
}
}),
);
let task = todo!();
return Poll::Ready(Ok(task));
}
}
}
// enqueuing the message
if let Some(index) = self.free.pop() {
let generation = self.next_gen();
assert!(self
.queue
.push(QueueItem {
index,
generation,
message: msg.take_cell(),
})
.is_ok());
let task = todo!();
return Poll::Ready(Ok(task));
}
if let Some(cx) = cx {
self.wakelist.lock().push(cx.waker().clone());
} }
Poll::Pending Poll::Pending
@ -126,34 +143,18 @@ impl<'a, M: Message, R: Message, T: Receiver<M, R> + 'static> Receiver<M, R> for
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
_resp: Option<&mut ResultCell<R>>, resp: Option<&mut ResultCell<R>>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
bus: &Bus, bus: &Bus,
) -> Poll<Result<(), Error>> { ) -> Poll<Result<(), Error>> {
let current_task = self.current_task.lock(); let mut lock = self.current.lock();
let index = if let Some(hash) = *current_task { let mb_fut = lock.as_mut();
if hash != task.hash() { if let Some(fut) = mb_fut.as_pin_mut() {
self.update_task_waker(task, cx.waker()); let result = ready!(fut.poll(cx));
return Poll::Pending;
} }
task.index() drop(unsafe { lock.as_mut().get_unchecked_mut() }.take());
} else {
if let Some(index) = self.queue.pop() {
index
} else {
return Poll::Ready(Err(Error::TrySendError));
}
};
let entry = self.tasks.get(index).unwrap();
// let task = self.tasks.remove(idx)
let res = ready!(self
.inner
.poll_send(&mut *entry.message.lock(), Some(cx), bus))?;
Poll::Pending Poll::Pending
} }
@ -241,6 +242,10 @@ mod tests {
{ {
Some(Self(self.0)) Some(Self(self.0))
} }
fn is_cloneable(&self) -> bool {
false
}
} }
struct Test { struct Test {

View File

@ -230,6 +230,10 @@ mod tests {
None None
} }
fn is_cloneable(&self) -> bool {
false
}
fn try_clone(&self) -> Option<Self> fn try_clone(&self) -> Option<Self>
where where
Self: Sized, Self: Sized,

View File

@ -32,11 +32,17 @@ impl<'a> AsRef<TypeTagInfo<'a>> for TypeTag {
impl From<TypeTagInfo<'static>> for TypeTag { impl From<TypeTagInfo<'static>> for TypeTag {
fn from(info: TypeTagInfo<'static>) -> Self { fn from(info: TypeTagInfo<'static>) -> Self {
Arc::new(info).into()
}
}
impl From<Arc<TypeTagInfo<'static>>> for TypeTag {
fn from(info: Arc<TypeTagInfo<'static>>) -> Self {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
info.hash(&mut hasher); info.hash(&mut hasher);
Self { Self {
info: Arc::new(info), info,
hash: hasher.finish(), hash: hasher.finish(),
} }
} }