Mutable TaskHandler

This commit is contained in:
Andrey Tkachenko 2023-02-20 13:39:09 +04:00
parent 2423802ecd
commit bc759001d3
7 changed files with 37 additions and 32 deletions

View File

@ -1,5 +1,6 @@
use crossbeam::queue::SegQueue; use crossbeam::queue::SegQueue;
use futures::task::AtomicWaker; use futures::task::AtomicWaker;
use parking_lot::Mutex;
use sharded_slab::Slab; use sharded_slab::Slab;
use std::{ use std::{
sync::{ sync::{
@ -51,7 +52,7 @@ impl WakerHelper {
} }
struct PollEntry { struct PollEntry {
task: TaskHandler, task: Mutex<TaskHandler>,
receiver: Arc<dyn AbstractReceiver>, receiver: Arc<dyn AbstractReceiver>,
multiple: bool, multiple: bool,
} }
@ -84,7 +85,7 @@ impl PollingPool {
WAKER_QUEUE.push( WAKER_QUEUE.push(
self.pool self.pool
.insert(PollEntry { .insert(PollEntry {
task, task: Mutex::new(task),
receiver, receiver,
multiple, multiple,
}) })
@ -113,8 +114,8 @@ impl PollingPool {
let waker = WakerHelper::waker(idx); let waker = WakerHelper::waker(idx);
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
let mut lock = entry.task.lock();
match entry.receiver.poll_result(&entry.task, None, &mut cx, bus) { match entry.receiver.poll_result(&mut *lock, None, &mut cx, bus) {
Poll::Ready(res) => { Poll::Ready(res) => {
if !entry.multiple || (entry.multiple && res.is_err()) { if !entry.multiple || (entry.multiple && res.is_err()) {
self.pool.remove(idx); self.pool.remove(idx);

View File

@ -25,7 +25,7 @@ pub trait Receiver<M: Message, R: Message> {
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
res: Option<&mut ResultCell<R>>, res: Option<&mut ResultCell<R>>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
bus: &Bus, bus: &Bus,
@ -70,14 +70,14 @@ impl<M: Message, R: Message, H: Receiver<M, R> + 'static> ReceiverEx<M, R> for H
poll_fn(move |cx| self.poll_send(&mut cell, Some(cx), &bus)) poll_fn(move |cx| self.poll_send(&mut cell, Some(cx), &bus))
} }
fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_> { fn process(&self, mut task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_> {
poll_fn(move |cx| self.poll_result(&task, None, cx, &bus)) poll_fn(move |cx| self.poll_result(&mut task, None, cx, &bus))
} }
fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_> { fn result(&self, mut task: TaskHandler, bus: Bus) -> Self::ResultFut<'_> {
async move { async move {
let mut cell = ResultCell::empty(); let mut cell = ResultCell::empty();
poll_fn(|cx| self.poll_result(&task, Some(&mut cell), cx, &bus)).await?; poll_fn(|cx| self.poll_result(&mut task, Some(&mut cell), cx, &bus)).await?;
cell.unwrap() cell.unwrap()
} }
} }
@ -102,7 +102,7 @@ pub trait AbstractReceiver: Send + Sync + 'static {
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
res: Option<&mut dyn MessageCell>, res: Option<&mut dyn MessageCell>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
bus: &Bus, bus: &Bus,
@ -155,7 +155,7 @@ impl<M: Message, R: Message, H: Receiver<M, R> + Send + Sync + 'static> Abstract
#[inline] #[inline]
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
res: Option<&mut dyn MessageCell>, res: Option<&mut dyn MessageCell>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
bus: &Bus, bus: &Bus,
@ -213,14 +213,14 @@ impl dyn AbstractReceiver {
} }
#[inline] #[inline]
pub async fn process(&self, task: TaskHandler, bus: Bus) -> Result<(), Error> { pub async fn process(&self, mut task: TaskHandler, bus: Bus) -> Result<(), Error> {
poll_fn(|cx| self.poll_result(&task, None, cx, &bus)).await poll_fn(|cx| self.poll_result(&mut task, None, cx, &bus)).await
} }
#[inline] #[inline]
pub async fn result<R: Message>(&self, task: TaskHandler, bus: Bus) -> Result<R, Error> { pub async fn result<R: Message>(&self, mut task: TaskHandler, bus: Bus) -> Result<R, Error> {
let mut cell = ResultCell::empty(); let mut cell = ResultCell::empty();
poll_fn(|cx| self.poll_result(&task, Some(&mut cell), cx, &bus)).await?; poll_fn(|cx| self.poll_result(&mut task, Some(&mut cell), cx, &bus)).await?;
cell.unwrap() cell.unwrap()
} }

View File

@ -1,7 +1,7 @@
// pub mod limit; // pub mod limit;
// pub mod handle; // pub mod handle;
// pub mod dispatcher; // pub mod dispatcher;
pub mod queue; // pub mod queue;
pub mod spawner; pub mod spawner;
// pub mod unordered; // pub mod unordered;
pub mod producer; pub mod producer;

View File

@ -113,13 +113,11 @@ impl<M: Message, T: MessageProducer<M> + 'static> Receiver<M, T::Message>
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
resp: Option<&mut ResultCell<T::Message>>, resp: Option<&mut ResultCell<T::Message>>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
bus: &Bus, bus: &Bus,
) -> Poll<Result<(), Error>> { ) -> Poll<Result<(), Error>> {
println!("2222222");
let Some(task_handle) = task.data().downcast_ref::<Mutex<Option<T::NextFuture<'static>>>>() else { let Some(task_handle) = task.data().downcast_ref::<Mutex<Option<T::NextFuture<'static>>>>() else {
println!("cannot cast type"); println!("cannot cast type");
return Poll::Ready(Err(Error::ErrorPollWrongTask(String::new()))); return Poll::Ready(Err(Error::ErrorPollWrongTask(String::new())));
@ -133,30 +131,35 @@ impl<M: Message, T: MessageProducer<M> + 'static> Receiver<M, T::Message>
let mut lock = self.next_fut.lock(); let mut lock = self.next_fut.lock();
loop { loop {
if let Some(fut) = &mut *lock { if let Some(fut) = &mut *lock {
println!("3333333");
// SAFETY: in box, safly can poll it // SAFETY: in box, safly can poll it
let res = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx)); let res = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx));
println!("5555555");
drop(lock.take()); drop(lock.take());
drop(lock); drop(lock);
self.send_waker.wake(); self.send_waker.wake();
if let Some(resp_cell) = resp { let res = if let Some(resp_cell) = resp {
resp_cell.put(res); resp_cell.put(res);
Ok(())
} else if TypeId::of::<T::Message>() != TypeId::of::<()>() { } else if TypeId::of::<T::Message>() != TypeId::of::<()>() {
// match res {
// Ok(msg) => bus.send(msg),
// Err(err) => {
println!( println!(
"[{}]: unhandled result message of type `{}`", "[{}]: unhandled result message of type `{}`",
std::any::type_name::<T>(), std::any::type_name::<T>(),
std::any::type_name::<T::Message>() std::any::type_name::<T::Message>()
); );
} Ok(())
// }
return Poll::Ready(Ok(())); // }
} else {
Ok(())
};
return Poll::Ready(res);
} else { } else {
println!("444444");
self.start_next(&mut *lock, bus); self.start_next(&mut *lock, bus);
} }
} }

View File

@ -103,7 +103,7 @@ impl<'a, M: Message, R: Message, T: Receiver<M, R> + 'static> Receiver<M, R> for
Ordering::Relaxed, Ordering::Relaxed,
) { ) {
// fast track // fast track
self.tasks[index].start(msg); // self.tasks[index].start(msg);
let th = TaskHandler::new(vtable, self.tasks.clone(), index); let th = TaskHandler::new(vtable, self.tasks.clone(), index);
Poll::Ready(Ok(th)) Poll::Ready(Ok(th))

View File

@ -102,9 +102,10 @@ impl<M: Message, R: Message, T: Receiver<M, R> + Send + Sync + 'static> SpawnerT
self.send_waker.register(cx.waker()); self.send_waker.register(cx.waker());
if let Some(state) = self.state.lock().as_mut() { if let Some(state) = self.state.lock().as_mut() {
let res = ready!(self let res =
ready!(self
.inner .inner
.poll_result(&state.task, state.result.as_mut(), cx, bus)); .poll_result(&mut state.task, state.result.as_mut(), cx, bus));
self.free_index_queue.push(self.index); self.free_index_queue.push(self.index);
Poll::Ready(res) Poll::Ready(res)
@ -193,7 +194,7 @@ impl<M: Message, R: Message, T: Receiver<M, R> + Send + Sync + 'static> Receiver
#[inline] #[inline]
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
resp: Option<&mut ResultCell<R>>, resp: Option<&mut ResultCell<R>>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
bus: &Bus, bus: &Bus,

View File

@ -91,7 +91,7 @@ impl<M: Message, T: Handler<M> + 'static> Receiver<M, T::Response> for HandlerWr
fn poll_result( fn poll_result(
&self, &self,
task: &TaskHandler, task: &mut TaskHandler,
resp: Option<&mut ResultCell<T::Response>>, resp: Option<&mut ResultCell<T::Response>>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_bus: &Bus, _bus: &Bus,