From bc759001d311de0470326ff9bed9dbf6770683ed Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Mon, 20 Feb 2023 13:39:09 +0400 Subject: [PATCH] Mutable TaskHandler --- src/polling_pool.rs | 9 +++++---- src/receiver.rs | 22 +++++++++++----------- src/receivers/mod.rs | 2 +- src/receivers/producer.rs | 23 +++++++++++++---------- src/receivers/queue.rs | 2 +- src/receivers/spawner.rs | 9 +++++---- src/receivers/wrapper.rs | 2 +- 7 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/polling_pool.rs b/src/polling_pool.rs index 1a5de01..2a9c1fa 100644 --- a/src/polling_pool.rs +++ b/src/polling_pool.rs @@ -1,5 +1,6 @@ use crossbeam::queue::SegQueue; use futures::task::AtomicWaker; +use parking_lot::Mutex; use sharded_slab::Slab; use std::{ sync::{ @@ -51,7 +52,7 @@ impl WakerHelper { } struct PollEntry { - task: TaskHandler, + task: Mutex, receiver: Arc, multiple: bool, } @@ -84,7 +85,7 @@ impl PollingPool { WAKER_QUEUE.push( self.pool .insert(PollEntry { - task, + task: Mutex::new(task), receiver, multiple, }) @@ -113,8 +114,8 @@ impl PollingPool { let waker = WakerHelper::waker(idx); let mut cx = Context::from_waker(&waker); - - match entry.receiver.poll_result(&entry.task, None, &mut cx, bus) { + let mut lock = entry.task.lock(); + match entry.receiver.poll_result(&mut *lock, None, &mut cx, bus) { Poll::Ready(res) => { if !entry.multiple || (entry.multiple && res.is_err()) { self.pool.remove(idx); diff --git a/src/receiver.rs b/src/receiver.rs index 1835774..5be9fc2 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -25,7 +25,7 @@ pub trait Receiver { fn poll_result( &self, - task: &TaskHandler, + task: &mut TaskHandler, res: Option<&mut ResultCell>, cx: &mut Context<'_>, bus: &Bus, @@ -70,14 +70,14 @@ impl + 'static> ReceiverEx for H poll_fn(move |cx| self.poll_send(&mut cell, Some(cx), &bus)) } - fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_> { - poll_fn(move |cx| self.poll_result(&task, None, cx, &bus)) + fn process(&self, mut task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_> { + poll_fn(move |cx| self.poll_result(&mut task, None, cx, &bus)) } - fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_> { + fn result(&self, mut task: TaskHandler, bus: Bus) -> Self::ResultFut<'_> { async move { let mut cell = ResultCell::empty(); - poll_fn(|cx| self.poll_result(&task, Some(&mut cell), cx, &bus)).await?; + poll_fn(|cx| self.poll_result(&mut task, Some(&mut cell), cx, &bus)).await?; cell.unwrap() } } @@ -102,7 +102,7 @@ pub trait AbstractReceiver: Send + Sync + 'static { fn poll_result( &self, - task: &TaskHandler, + task: &mut TaskHandler, res: Option<&mut dyn MessageCell>, cx: &mut Context<'_>, bus: &Bus, @@ -155,7 +155,7 @@ impl + Send + Sync + 'static> Abstract #[inline] fn poll_result( &self, - task: &TaskHandler, + task: &mut TaskHandler, res: Option<&mut dyn MessageCell>, cx: &mut Context<'_>, bus: &Bus, @@ -213,14 +213,14 @@ impl dyn AbstractReceiver { } #[inline] - pub async fn process(&self, task: TaskHandler, bus: Bus) -> Result<(), Error> { - poll_fn(|cx| self.poll_result(&task, None, cx, &bus)).await + pub async fn process(&self, mut task: TaskHandler, bus: Bus) -> Result<(), Error> { + poll_fn(|cx| self.poll_result(&mut task, None, cx, &bus)).await } #[inline] - pub async fn result(&self, task: TaskHandler, bus: Bus) -> Result { + pub async fn result(&self, mut task: TaskHandler, bus: Bus) -> Result { let mut cell = ResultCell::empty(); - poll_fn(|cx| self.poll_result(&task, Some(&mut cell), cx, &bus)).await?; + poll_fn(|cx| self.poll_result(&mut task, Some(&mut cell), cx, &bus)).await?; cell.unwrap() } diff --git a/src/receivers/mod.rs b/src/receivers/mod.rs index f968740..62cb1c0 100644 --- a/src/receivers/mod.rs +++ b/src/receivers/mod.rs @@ -1,7 +1,7 @@ // pub mod limit; // pub mod handle; // pub mod dispatcher; -pub mod queue; +// pub mod queue; pub mod spawner; // pub mod unordered; pub mod producer; diff --git a/src/receivers/producer.rs b/src/receivers/producer.rs index bad2f50..b53b32a 100644 --- a/src/receivers/producer.rs +++ b/src/receivers/producer.rs @@ -113,13 +113,11 @@ impl + 'static> Receiver fn poll_result( &self, - task: &TaskHandler, + task: &mut TaskHandler, resp: Option<&mut ResultCell>, cx: &mut Context<'_>, bus: &Bus, ) -> Poll> { - println!("2222222"); - let Some(task_handle) = task.data().downcast_ref::>>>() else { println!("cannot cast type"); return Poll::Ready(Err(Error::ErrorPollWrongTask(String::new()))); @@ -133,30 +131,35 @@ impl + 'static> Receiver let mut lock = self.next_fut.lock(); loop { if let Some(fut) = &mut *lock { - println!("3333333"); - // SAFETY: in box, safly can poll it let res = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx)); - println!("5555555"); drop(lock.take()); drop(lock); self.send_waker.wake(); - if let Some(resp_cell) = resp { + let res = if let Some(resp_cell) = resp { resp_cell.put(res); + Ok(()) } else if TypeId::of::() != TypeId::of::<()>() { + // match res { + // Ok(msg) => bus.send(msg), + // Err(err) => { println!( "[{}]: unhandled result message of type `{}`", std::any::type_name::(), std::any::type_name::() ); - } + Ok(()) + // } + // } + } else { + Ok(()) + }; - return Poll::Ready(Ok(())); + return Poll::Ready(res); } else { - println!("444444"); self.start_next(&mut *lock, bus); } } diff --git a/src/receivers/queue.rs b/src/receivers/queue.rs index cc782d7..8917777 100644 --- a/src/receivers/queue.rs +++ b/src/receivers/queue.rs @@ -103,7 +103,7 @@ impl<'a, M: Message, R: Message, T: Receiver + 'static> Receiver for Ordering::Relaxed, ) { // fast track - self.tasks[index].start(msg); + // self.tasks[index].start(msg); let th = TaskHandler::new(vtable, self.tasks.clone(), index); Poll::Ready(Ok(th)) diff --git a/src/receivers/spawner.rs b/src/receivers/spawner.rs index aaae390..24034e7 100644 --- a/src/receivers/spawner.rs +++ b/src/receivers/spawner.rs @@ -102,9 +102,10 @@ impl + Send + Sync + 'static> SpawnerT self.send_waker.register(cx.waker()); if let Some(state) = self.state.lock().as_mut() { - let res = ready!(self - .inner - .poll_result(&state.task, state.result.as_mut(), cx, bus)); + let res = + ready!(self + .inner + .poll_result(&mut state.task, state.result.as_mut(), cx, bus)); self.free_index_queue.push(self.index); Poll::Ready(res) @@ -193,7 +194,7 @@ impl + Send + Sync + 'static> Receiver #[inline] fn poll_result( &self, - task: &TaskHandler, + task: &mut TaskHandler, resp: Option<&mut ResultCell>, cx: &mut Context<'_>, bus: &Bus, diff --git a/src/receivers/wrapper.rs b/src/receivers/wrapper.rs index 26ab73e..81987a2 100644 --- a/src/receivers/wrapper.rs +++ b/src/receivers/wrapper.rs @@ -91,7 +91,7 @@ impl + 'static> Receiver for HandlerWr fn poll_result( &self, - task: &TaskHandler, + task: &mut TaskHandler, resp: Option<&mut ResultCell>, cx: &mut Context<'_>, _bus: &Bus,