Fix race condition

This commit is contained in:
Andrey Tkachenko 2021-06-23 16:20:10 +04:00
parent 500e1ce178
commit dc4bb94ec5
13 changed files with 151 additions and 137 deletions

View File

@ -21,6 +21,8 @@ async-stream = "0.3.2"
smallvec = "1.6.1" smallvec = "1.6.1"
log = "0.4.14" log = "0.4.14"
sharded-slab = "0.1.1" sharded-slab = "0.1.1"
empty-box = "0.1.1"
[dev-dependencies] [dev-dependencies]
env_logger = "0.8.4"
tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] } tokio = { version = "1", features = ["macros", "parking_lot", "rt-multi-thread", "io-util", "sync"] }

View File

@ -135,6 +135,8 @@ impl Handler<i16> for TmpReceiver2 {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
env_logger::init();
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register(TmpReceiver) .register(TmpReceiver)
.subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default()) .subscribe::<f32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
@ -157,6 +159,8 @@ async fn main() {
println!("close"); println!("close");
b.close().await; b.close().await;
println!("closed");
poller.await; poller.await;
println!("[done]"); println!("[done]");
} }

View File

@ -71,6 +71,8 @@ async fn main() {
println!("close"); println!("close");
b.close().await; b.close().await;
println!("closed");
poller.await; poller.await;
println!("[done]"); println!("[done]");
} }

View File

@ -167,6 +167,8 @@ async fn main() {
println!("close"); println!("close");
b.close().await; b.close().await;
println!("closed");
poller.await; poller.await;
println!("[done]"); println!("[done]");
} }

View File

@ -57,6 +57,8 @@ async fn main() {
println!("close"); println!("close");
b.close().await; b.close().await;
println!("closed");
poller.await; poller.await;
println!("[done]"); println!("[done]");
} }

View File

@ -1,15 +1,14 @@
use async_trait::async_trait; use async_trait::async_trait;
use messagebus::{receivers, Bus, SynchronizedHandler, AsyncSynchronizedHandler}; use messagebus::{receivers, AsyncSynchronizedHandler, Bus, SynchronizedHandler};
use receivers::SynchronizedConfig;
struct TmpReceiver; struct TmpReceiver;
impl SynchronizedHandler<f32> for TmpReceiver { #[async_trait]
impl AsyncSynchronizedHandler<f32> for TmpReceiver {
type Error = anyhow::Error; type Error = anyhow::Error;
type Response = (); type Response = ();
fn handle(&mut self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> { async fn handle(&mut self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
// std::thread::sleep(std::time::Duration::from_millis(100)); // std::thread::sleep(std::time::Duration::from_millis(100));
println!("---> f32 {}", msg); println!("---> f32 {}", msg);
@ -22,9 +21,8 @@ impl SynchronizedHandler<f32> for TmpReceiver {
impl AsyncSynchronizedHandler<i16> for TmpReceiver { impl AsyncSynchronizedHandler<i16> for TmpReceiver {
type Error = anyhow::Error; type Error = anyhow::Error;
type Response = (); type Response = ();
async fn handle(&mut self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> { async fn handle(&mut self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
println!("---> i16 {}", msg); println!("---> i16 {}", msg);
@ -37,7 +35,7 @@ impl AsyncSynchronizedHandler<i16> for TmpReceiver {
async fn main() { async fn main() {
let (b, poller) = Bus::build() let (b, poller) = Bus::build()
.register_unsync(TmpReceiver) .register_unsync(TmpReceiver)
.subscribe::<f32, receivers::SynchronizedSync<_>, _, _>(8, Default::default()) .subscribe::<f32, receivers::SynchronizedAsync<_>, _, _>(8, Default::default())
.subscribe::<i16, receivers::SynchronizedAsync<_>, _, _>(8, Default::default()) .subscribe::<i16, receivers::SynchronizedAsync<_>, _, _>(8, Default::default())
.done() .done()
.build(); .build();
@ -62,10 +60,16 @@ async fn main() {
b.send(12.0f32).await.unwrap(); b.send(12.0f32).await.unwrap();
b.send(1i16).await.unwrap(); b.send(1i16).await.unwrap();
println!("flush");
b.flush().await;
println!("closing"); println!("closing");
b.close().await; b.close().await;
println!("closed");
poller.await; poller.await;
println!("[done]"); println!("[done]");

View File

@ -14,7 +14,7 @@ use builder::BusBuilder;
pub use envelop::Message; pub use envelop::Message;
pub use handler::*; pub use handler::*;
pub use receiver::SendError; pub use receiver::SendError;
use receiver::{Receiver, ReceiverStats}; use receiver::Receiver;
use smallvec::SmallVec; use smallvec::SmallVec;
use tokio::sync::oneshot; use tokio::sync::oneshot;

View File

@ -45,6 +45,7 @@ where
pub trait ReceiverTrait: Send + Sync { pub trait ReceiverTrait: Send + Sync {
fn typed(&self) -> AnyReceiver<'_>; fn typed(&self) -> AnyReceiver<'_>;
fn poller(&self) -> AnyPoller<'_>; fn poller(&self) -> AnyPoller<'_>;
fn name(&self) -> &str;
fn stats(&self) -> Result<(), SendError<()>>; fn stats(&self) -> Result<(), SendError<()>>;
fn close(&self) -> Result<(), SendError<()>>; fn close(&self) -> Result<(), SendError<()>>;
fn sync(&self) -> Result<(), SendError<()>>; fn sync(&self) -> Result<(), SendError<()>>;
@ -111,6 +112,10 @@ where
E: Error, E: Error,
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static, S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{ {
fn name(&self) -> &str {
std::any::type_name::<Self>()
}
fn typed(&self) -> AnyReceiver<'_> { fn typed(&self) -> AnyReceiver<'_> {
AnyReceiver::new(&self.inner) AnyReceiver::new(&self.inner)
} }
@ -269,7 +274,6 @@ struct ReceiverContext {
synchronized: Notify, synchronized: Notify,
closed: Notify, closed: Notify,
response: Notify, response: Notify,
statistics: Notify,
} }
impl PermitDrop for ReceiverContext { impl PermitDrop for ReceiverContext {
@ -318,7 +322,6 @@ impl Receiver {
synchronized: Notify::new(), synchronized: Notify::new(),
closed: Notify::new(), closed: Notify::new(),
response: Notify::new(), response: Notify::new(),
statistics: Notify::new(),
}), }),
inner: Arc::new(ReceiverWrapper { inner: Arc::new(ReceiverWrapper {
inner, inner,
@ -444,7 +447,6 @@ impl Receiver {
loop { loop {
let event = poll_fn(move |ctx| receiver.poll_events(ctx)).await; let event = poll_fn(move |ctx| receiver.poll_events(ctx)).await;
match event { match event {
Event::Exited => { Event::Exited => {
ctx_clone.closed.notify_waiters(); ctx_clone.closed.notify_waiters();
@ -493,20 +495,11 @@ impl Receiver {
Some(idx) Some(idx)
} }
// #[inline]
// pub fn stats(&self) -> ReceiverStats {
// if self.inner.stats().is_ok() {
// self.context.stats.notified()
// .await
// } else {
// warn!("close failed!");
// }
// }
#[inline] #[inline]
pub async fn close(&self) { pub async fn close(&self) {
let notified = self.context.closed.notified();
if self.inner.close().is_ok() { if self.inner.close().is_ok() {
self.context.closed.notified().await notified.await;
} else { } else {
warn!("close failed!"); warn!("close failed!");
} }
@ -514,8 +507,9 @@ impl Receiver {
#[inline] #[inline]
pub async fn sync(&self) { pub async fn sync(&self) {
let notified = self.context.synchronized.notified();
if self.inner.sync().is_ok() { if self.inner.sync().is_ok() {
self.context.synchronized.notified().await notified.await
} else { } else {
warn!("sync failed!"); warn!("sync failed!");
} }
@ -523,9 +517,9 @@ impl Receiver {
#[inline] #[inline]
pub async fn flush(&self) { pub async fn flush(&self) {
let notified = self.context.flushed.notified();
if self.inner.flush().is_ok() { if self.inner.flush().is_ok() {
self.context.flushed.notified().await; notified.await;
self.context.need_flush.store(false, Ordering::SeqCst); self.context.need_flush.store(false, Ordering::SeqCst);
} else { } else {
warn!("flush failed!"); warn!("flush failed!");

View File

@ -8,8 +8,8 @@ use std::{
}; };
use crate::{ use crate::{
receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::Request, receivers::{fix_type2, Request},
}; };
use anyhow::Result; use anyhow::Result;
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
@ -39,7 +39,7 @@ where
{ {
let ut = ut.downcast::<T>().unwrap(); let ut = ut.downcast::<T>().unwrap();
let mut queue = FuturesUnordered::new(); let mut queue = FuturesUnordered::new();
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None; let mut sync_future = None;
let mut need_sync = false; let mut need_sync = false;
let mut rx_closed = false; let mut rx_closed = false;
let mut need_flush = false; let mut need_flush = false;
@ -55,18 +55,20 @@ where
let bus = bus.clone(); let bus = bus.clone();
let ut = ut.clone(); let ut = ut.clone();
queue.push(tokio::task::spawn(async move { queue.push(async move { (mid, ut.handle(msg, &bus).await) });
(mid, ut.handle(msg, &bus).await)
}));
} }
Request::Action(Action::Flush) => need_flush = true, Request::Action(Action::Flush) => need_flush = true,
Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => rx.close(), Request::Action(Action::Close) => rx.close(),
Request::Action(Action::Sync) => {
need_sync = true;
break;
}
_ => unimplemented!(), _ => unimplemented!(),
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
need_sync = true; need_sync = true;
rx_closed = true; rx_closed = true;
break;
} }
Poll::Pending => break, Poll::Pending => break,
} }
@ -80,11 +82,10 @@ where
loop { loop {
match queue.poll_next_unpin(cx) { match queue.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok((mid, res)))) => { Poll::Ready(Some((mid, res))) => {
stx.send(Event::Response(mid, res)).ok(); stx.send(Event::Response(mid, res)).ok();
} }
Poll::Ready(None) => break, Poll::Ready(None) => break,
_ => {}
} }
} }
} }
@ -95,21 +96,20 @@ where
} }
if need_sync { if need_sync {
if let Some(mut fut) = sync_future.take() { if let Some(fut) = sync_future.as_mut() {
match fut.as_mut().poll(cx) { match unsafe { fix_type2(fut) }.poll(cx) {
Poll::Pending => { Poll::Pending => return Poll::Pending,
sync_future = Some(fut);
return Poll::Pending;
}
Poll::Ready(res) => { Poll::Ready(res) => {
need_sync = false;
stx.send(Event::Synchronized(res)).ok(); stx.send(Event::Synchronized(res)).ok();
} }
} }
need_sync = false;
sync_future = None;
continue;
} else { } else {
let ut = ut.clone(); let ut = ut.clone();
let bus_clone = bus.clone(); let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move { ut.sync(&bus_clone).await })); sync_future.replace(async move { ut.sync(&bus_clone).await });
} }
} else { } else {
break; break;

View File

@ -8,8 +8,8 @@ use std::{
}; };
use crate::{ use crate::{
receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::Request, receivers::{fix_type2, Request},
}; };
use anyhow::Result; use anyhow::Result;
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
@ -39,7 +39,7 @@ where
{ {
let ut = ut.downcast::<T>().unwrap(); let ut = ut.downcast::<T>().unwrap();
let mut queue = FuturesUnordered::new(); let mut queue = FuturesUnordered::new();
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None; let mut sync_future = None;
let mut need_sync = false; let mut need_sync = false;
let mut rx_closed = false; let mut rx_closed = false;
let mut need_flush = false; let mut need_flush = false;
@ -59,15 +59,12 @@ where
(mid, ut.handle(msg, &bus)) (mid, ut.handle(msg, &bus))
})); }));
} }
Request::Action(Action::Flush) => { Request::Action(Action::Flush) => need_flush = true,
need_flush = true; Request::Action(Action::Close) => rx.close(),
break;
}
Request::Action(Action::Sync) => { Request::Action(Action::Sync) => {
need_sync = true; need_sync = true;
break; break;
} }
Request::Action(Action::Close) => rx.close(),
_ => unimplemented!(), _ => unimplemented!(),
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
@ -102,25 +99,26 @@ where
} }
if need_sync { if need_sync {
if let Some(mut fut) = sync_future.take() { if let Some(fut) = sync_future.as_mut() {
match fut.as_mut().poll(cx) { // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
Poll::Pending => { match unsafe { fix_type2(fut) }.poll(cx) {
sync_future = Some(fut); Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
Poll::Ready(res) => { Poll::Ready(res) => {
need_sync = false;
stx.send(Event::Synchronized(res)).ok(); stx.send(Event::Synchronized(res)).ok();
} }
} }
need_sync = false;
sync_future = None;
continue;
} else { } else {
let ut = ut.clone(); let ut = ut.clone();
let bus_clone = bus.clone(); let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move { sync_future.replace(async move {
tokio::task::spawn_blocking(move || ut.sync(&bus_clone)) tokio::task::spawn_blocking(move || ut.sync(&bus_clone))
.await .await
.unwrap() .unwrap()
})); });
} }
} else { } else {
break; break;

View File

@ -9,11 +9,34 @@ mod synchronized;
// pub use super::mpsc_futures::*; // pub use super::mpsc_futures::*;
// } // }
use std::pin::Pin;
pub use buffer_unordered::{BufferUnorderedAsync, BufferUnorderedConfig, BufferUnorderedSync}; pub use buffer_unordered::{BufferUnorderedAsync, BufferUnorderedConfig, BufferUnorderedSync};
use futures::Future;
pub use synchronized::{SynchronizedAsync, SynchronizedConfig, SynchronizedSync}; pub use synchronized::{SynchronizedAsync, SynchronizedConfig, SynchronizedSync};
use crate::receiver::Action; use crate::receiver::Action;
#[inline(always)]
pub(crate) unsafe fn fix_type1<'a, F, R, E>(
x: &'a mut F,
) -> Pin<&'a mut (impl Future<Output = (u64, Result<R, E>)> + Send)>
where
F: Future<Output = (u64, Result<R, E>)> + Send,
{
Pin::new_unchecked(x)
}
#[inline(always)]
pub(crate) unsafe fn fix_type2<'a, F, E>(
x: &'a mut F,
) -> Pin<&'a mut (impl Future<Output = Result<(), E>> + Send)>
where
F: Future<Output = Result<(), E>> + Send,
{
Pin::new_unchecked(x)
}
pub(crate) enum Request<M> { pub(crate) enum Request<M> {
Action(Action), Action(Action),
Request(u64, M), Request(u64, M),

View File

@ -5,7 +5,7 @@ use std::{
use crate::{ use crate::{
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::Request, receivers::{fix_type1, fix_type2, Request},
}; };
use anyhow::Result; use anyhow::Result;
use futures::Future; use futures::Future;
@ -31,24 +31,22 @@ where
E: crate::Error, E: crate::Error,
{ {
let ut = ut.downcast::<Mutex<T>>().unwrap(); let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut handle_future: Option<Pin<Box<dyn Future<Output = (u64, Result<R, E>)> + Send>>> = None; let mut handle_future = None;
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None; let mut sync_future = None;
let mut need_sync = false; let mut need_sync = false;
let mut rx_closed = false; let mut rx_closed = false;
futures::future::poll_fn(move |cx| loop { futures::future::poll_fn(move |cx| loop {
if let Some(mut fut) = handle_future.take() { if let Some(fut) = handle_future.as_mut() {
match fut.as_mut().poll(cx) { // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
Poll::Pending => { match unsafe { fix_type1(fut) }.poll(cx) {
handle_future = Some(fut); Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
Poll::Ready((mid, resp)) => { Poll::Ready((mid, resp)) => {
stx.send(Event::Response(mid, resp)).ok(); stx.send(Event::Response(mid, resp)).ok();
} }
} }
} }
handle_future = None;
if !rx_closed && !need_sync { if !rx_closed && !need_sync {
match rx.poll_recv(cx) { match rx.poll_recv(cx) {
@ -56,13 +54,13 @@ where
Request::Request(mid, msg) => { Request::Request(mid, msg) => {
let bus = bus.clone(); let bus = bus.clone();
let ut = ut.clone(); let ut = ut.clone();
handle_future.replace(Box::pin(async move { handle_future
(mid, ut.lock().await.handle(msg, &bus).await) .replace(async move { (mid, ut.lock().await.handle(msg, &bus).await) });
}));
continue; continue;
} }
Request::Action(Action::Flush) => { Request::Action(Action::Flush) => {
stx.send(Event::Flushed).ok(); stx.send(Event::Flushed).ok();
continue;
} }
Request::Action(Action::Sync) => need_sync = true, Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => { Request::Action(Action::Close) => {
@ -74,30 +72,26 @@ where
Poll::Ready(None) => { Poll::Ready(None) => {
need_sync = true; need_sync = true;
rx_closed = true; rx_closed = true;
} }
Poll::Pending => {}, Poll::Pending => {}
} }
} }
if need_sync { if need_sync {
if let Some(mut fut) = sync_future.take() { if let Some(fut) = sync_future.as_mut() {
match fut.as_mut().poll(cx) { // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
Poll::Pending => { match unsafe { fix_type2(fut) }.poll(cx) {
sync_future = Some(fut); Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
Poll::Ready(res) => { Poll::Ready(res) => {
need_sync = false; need_sync = false;
stx.send(Event::Synchronized(res)).ok(); stx.send(Event::Synchronized(res)).ok();
} }
} }
sync_future = None;
} else { } else {
let ut = ut.clone(); let ut = ut.clone();
let bus_clone = bus.clone(); let bus_clone = bus.clone();
sync_future.replace(Box::pin( sync_future.replace(async move { ut.lock().await.sync(&bus_clone).await });
async move { ut.lock().await.sync(&bus_clone).await },
));
} }
} }

View File

@ -1,20 +1,16 @@
use std::{ use std::{
pin::Pin, pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::{ use crate::{
receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver}, receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
receivers::Request, receivers::{fix_type1, fix_type2, Request},
}; };
use anyhow::Result; use anyhow::Result;
use futures::{Future, executor::block_on}; use futures::{executor::block_on, Future};
use super::{SynchronizedConfig, SynchronizedStats}; use super::SynchronizedConfig;
use crate::{ use crate::{
builder::ReceiverSubscriberBuilder, builder::ReceiverSubscriberBuilder,
receiver::{SendError, SendTypedReceiver}, receiver::{SendError, SendTypedReceiver},
@ -22,7 +18,6 @@ use crate::{
}; };
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
fn synchronized_poller<T, M, R, E>( fn synchronized_poller<T, M, R, E>(
mut rx: mpsc::UnboundedReceiver<Request<M>>, mut rx: mpsc::UnboundedReceiver<Request<M>>,
bus: Bus, bus: Bus,
@ -36,24 +31,22 @@ where
E: crate::Error, E: crate::Error,
{ {
let ut = ut.downcast::<Mutex<T>>().unwrap(); let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut handle_future: Option<Pin<Box<dyn Future<Output = (u64, Result<R, E>)> + Send>>> = None; let mut handle_future = None;
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None; let mut sync_future = None;
let mut need_sync = false; let mut need_sync = false;
let mut rx_closed = false; let mut rx_closed = false;
futures::future::poll_fn(move |cx| loop { futures::future::poll_fn(move |cx| loop {
if let Some(mut fut) = handle_future.take() { if let Some(fut) = handle_future.as_mut() {
match fut.as_mut().poll(cx) { // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
Poll::Pending => { match unsafe { fix_type1(fut) }.poll(cx) {
handle_future = Some(fut); Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
Poll::Ready((mid, resp)) => { Poll::Ready((mid, resp)) => {
stx.send(Event::Response(mid, resp)).ok(); stx.send(Event::Response(mid, resp)).ok();
} }
} }
} }
handle_future = None;
if !rx_closed && !need_sync { if !rx_closed && !need_sync {
match rx.poll_recv(cx) { match rx.poll_recv(cx) {
@ -61,45 +54,57 @@ where
Request::Request(mid, msg) => { Request::Request(mid, msg) => {
let bus = bus.clone(); let bus = bus.clone();
let ut = ut.clone(); let ut = ut.clone();
handle_future.replace(Box::pin(async move { handle_future.replace(async move {
(mid, tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus)).await.unwrap()) (
})); mid,
tokio::task::spawn_blocking(move || {
block_on(ut.lock()).handle(msg, &bus)
})
.await
.unwrap(),
)
});
continue;
}
Request::Action(Action::Flush) => {
stx.send(Event::Flushed).ok();
continue; continue;
} }
Request::Action(Action::Flush) => {stx.send(Event::Flushed).ok();}
Request::Action(Action::Sync) => need_sync = true, Request::Action(Action::Sync) => need_sync = true,
Request::Action(Action::Close) => { Request::Action(Action::Close) => {
rx.close(); rx.close();
continue; continue;
}, }
_ => unimplemented!(), _ => unimplemented!(),
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
need_sync = true; need_sync = true;
rx_closed = true; rx_closed = true;
} }
Poll::Pending => {}, Poll::Pending => {}
} }
} }
if need_sync { if need_sync {
if let Some(mut fut) = sync_future.take() { if let Some(fut) = sync_future.as_mut() {
match fut.as_mut().poll(cx) { // SAFETY: safe bacause pinnet to async generator `stack` which should be pinned
Poll::Pending => { match unsafe { fix_type2(fut) }.poll(cx) {
sync_future = Some(fut); Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
Poll::Ready(res) => { Poll::Ready(res) => {
need_sync = false; need_sync = false;
stx.send(Event::Synchronized(res)).ok(); stx.send(Event::Synchronized(res)).ok();
} }
} }
sync_future = None;
} else { } else {
let ut = ut.clone(); let ut = ut.clone();
let bus_clone = bus.clone(); let bus_clone = bus.clone();
sync_future.replace(Box::pin(async move { sync_future.replace(async move {
tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await.unwrap() tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone))
})); .await
.unwrap()
});
} }
} }
@ -118,7 +123,6 @@ where
E: crate::Error, E: crate::Error,
{ {
tx: mpsc::UnboundedSender<Request<M>>, tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<SynchronizedStats>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>, srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
} }
@ -132,37 +136,26 @@ where
type Config = SynchronizedConfig; type Config = SynchronizedConfig;
fn build( fn build(
cfg: Self::Config, _cfg: Self::Config,
) -> ( ) -> (
Self, Self,
Box< Box<
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>, dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
>, >,
) { ) {
let stats = Arc::new(SynchronizedStats {
buffer: AtomicU64::new(0),
buffer_total: AtomicU64::new(cfg.buffer_size as _),
});
let (stx, srx) = mpsc::unbounded_channel(); let (stx, srx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let stats_clone = stats.clone();
let poller = Box::new(move |ut| { let poller = Box::new(move |ut| {
Box::new(move |bus| { Box::new(move |bus| {
Box::pin(synchronized_poller::<T, M, R, E>( Box::pin(synchronized_poller::<T, M, R, E>(rx, bus, ut, stx))
rx, as Pin<Box<dyn Future<Output = ()> + Send>>
bus,
ut,
stx,
)) as Pin<Box<dyn Future<Output = ()> + Send>>
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> }) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
}); });
( (
SynchronizedSync::<M, R, E> { SynchronizedSync::<M, R, E> {
tx, tx,
stats,
srx: parking_lot::Mutex::new(srx), srx: parking_lot::Mutex::new(srx),
}, },
poller, poller,
@ -193,11 +186,7 @@ where
{ {
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> { fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
match self.tx.send(Request::Request(mid, m)) { match self.tx.send(Request::Request(mid, m)) {
Ok(_) => { Ok(_) => Ok(()),
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)), Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)),
_ => unimplemented!(), _ => unimplemented!(),
} }