Synchronized handlers
This commit is contained in:
parent
db1d22df7f
commit
500e1ce178
@ -1,25 +1,33 @@
|
||||
use messagebus::{receivers, Bus, Result as MbusResult, SynchronizedHandler};
|
||||
use async_trait::async_trait;
|
||||
use messagebus::{receivers, Bus, SynchronizedHandler, AsyncSynchronizedHandler};
|
||||
use receivers::SynchronizedConfig;
|
||||
|
||||
struct TmpReceiver;
|
||||
|
||||
impl SynchronizedHandler<f32> for TmpReceiver {
|
||||
fn handle(&mut self, msg: f32, _bus: &Bus) -> MbusResult {
|
||||
println!("---> f32 {}", msg);
|
||||
type Error = anyhow::Error;
|
||||
type Response = ();
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
fn handle(&mut self, msg: f32, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
|
||||
// std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
println!("---> f32 {}", msg);
|
||||
|
||||
println!("done");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SynchronizedHandler<i16> for TmpReceiver {
|
||||
fn handle(&mut self, msg: i16, _bus: &Bus) -> MbusResult {
|
||||
#[async_trait]
|
||||
impl AsyncSynchronizedHandler<i16> for TmpReceiver {
|
||||
type Error = anyhow::Error;
|
||||
type Response = ();
|
||||
|
||||
async fn handle(&mut self, msg: i16, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
println!("---> i16 {}", msg);
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
|
||||
println!("done");
|
||||
Ok(())
|
||||
}
|
||||
@ -29,8 +37,8 @@ impl SynchronizedHandler<i16> for TmpReceiver {
|
||||
async fn main() {
|
||||
let (b, poller) = Bus::build()
|
||||
.register_unsync(TmpReceiver)
|
||||
.subscribe::<f32, receivers::SynchronizedSync<_>>(SynchronizedConfig { buffer_size: 1 })
|
||||
.subscribe::<i16, receivers::SynchronizedSync<_>>(Default::default())
|
||||
.subscribe::<f32, receivers::SynchronizedSync<_>, _, _>(8, Default::default())
|
||||
.subscribe::<i16, receivers::SynchronizedAsync<_>, _, _>(8, Default::default())
|
||||
.done()
|
||||
.build();
|
||||
|
||||
@ -51,7 +59,14 @@ async fn main() {
|
||||
b.send(12.0f32).await.unwrap();
|
||||
b.send(1i16).await.unwrap();
|
||||
|
||||
println!("finish");
|
||||
b.send(12.0f32).await.unwrap();
|
||||
b.send(1i16).await.unwrap();
|
||||
|
||||
println!("closing");
|
||||
|
||||
b.close().await;
|
||||
|
||||
poller.await;
|
||||
|
||||
println!("[done]");
|
||||
}
|
||||
|
@ -59,14 +59,21 @@ where
|
||||
(mid, ut.handle(msg, &bus))
|
||||
}));
|
||||
}
|
||||
Request::Action(Action::Flush) => need_flush = true,
|
||||
Request::Action(Action::Sync) => need_sync = true,
|
||||
Request::Action(Action::Flush) => {
|
||||
need_flush = true;
|
||||
break;
|
||||
}
|
||||
Request::Action(Action::Sync) => {
|
||||
need_sync = true;
|
||||
break;
|
||||
}
|
||||
Request::Action(Action::Close) => rx.close(),
|
||||
_ => unimplemented!(),
|
||||
},
|
||||
Poll::Ready(None) => {
|
||||
need_sync = true;
|
||||
rx_closed = true;
|
||||
break;
|
||||
}
|
||||
Poll::Pending => break,
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
mod buffer_unordered;
|
||||
mod synchronized;
|
||||
// mod buffer_unordered_batched;
|
||||
// mod mpsc_futures;
|
||||
// mod synchronize_batched;
|
||||
// mod synchronized;
|
||||
|
||||
// mod mpsc;
|
||||
// mod mpsc {
|
||||
@ -10,6 +10,7 @@ mod buffer_unordered;
|
||||
// }
|
||||
|
||||
pub use buffer_unordered::{BufferUnorderedAsync, BufferUnorderedConfig, BufferUnorderedSync};
|
||||
pub use synchronized::{SynchronizedAsync, SynchronizedConfig, SynchronizedSync};
|
||||
|
||||
use crate::receiver::Action;
|
||||
|
||||
@ -23,11 +24,6 @@ pub(crate) enum Request<M> {
|
||||
// BufferUnorderedBatchedSync, BufferUnorderedBatchedSyncSubscriber,
|
||||
// };
|
||||
|
||||
// pub use synchronized::{
|
||||
// SynchronizedAsync, SynchronizedAsyncSubscriber, SynchronizedConfig, SynchronizedSync,
|
||||
// SynchronizedSyncSubscriber,
|
||||
// };
|
||||
|
||||
// pub use synchronize_batched::{
|
||||
// SynchronizeBatchedAsync, SynchronizeBatchedAsyncSubscriber, SynchronizeBatchedConfig,
|
||||
// SynchronizeBatchedSync, SynchronizeBatchedSyncSubscriber,
|
||||
|
@ -1,186 +1,203 @@
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{Future, StreamExt};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use super::{SynchronizedConfig, SynchronizedStats};
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
msgs,
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
receiver::{Action, Event, ReciveTypedReceiver, SendUntypedReceiver},
|
||||
receivers::Request,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use futures::Future;
|
||||
|
||||
use super::SynchronizedConfig;
|
||||
use crate::{
|
||||
builder::ReceiverSubscriberBuilder,
|
||||
receiver::{SendError, SendTypedReceiver},
|
||||
AsyncSynchronizedHandler, Bus, Message, Untyped,
|
||||
};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
pub struct SynchronizedAsyncSubscriber<T, M>
|
||||
fn synchronized_poller<T, M, R, E>(
|
||||
mut rx: mpsc::UnboundedReceiver<Request<M>>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stx: mpsc::UnboundedSender<Event<R, E>>,
|
||||
) -> impl Future<Output = ()>
|
||||
where
|
||||
T: AsyncSynchronizedHandler<M> + 'static,
|
||||
T: AsyncSynchronizedHandler<M, Response = R, Error = E> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
cfg: SynchronizedConfig,
|
||||
_m: PhantomData<(T, M)>,
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let mut handle_future: Option<Pin<Box<dyn Future<Output = (u64, Result<R, E>)> + Send>>> = None;
|
||||
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None;
|
||||
let mut need_sync = false;
|
||||
let mut rx_closed = false;
|
||||
|
||||
futures::future::poll_fn(move |cx| loop {
|
||||
if let Some(mut fut) = handle_future.take() {
|
||||
match fut.as_mut().poll(cx) {
|
||||
Poll::Pending => {
|
||||
handle_future = Some(fut);
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
Poll::Ready((mid, resp)) => {
|
||||
stx.send(Event::Response(mid, resp)).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !rx_closed && !need_sync {
|
||||
match rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(a)) => match a {
|
||||
Request::Request(mid, msg) => {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
handle_future.replace(Box::pin(async move {
|
||||
(mid, ut.lock().await.handle(msg, &bus).await)
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
Request::Action(Action::Flush) => {
|
||||
stx.send(Event::Flushed).ok();
|
||||
}
|
||||
Request::Action(Action::Sync) => need_sync = true,
|
||||
Request::Action(Action::Close) => {
|
||||
rx.close();
|
||||
continue;
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
},
|
||||
Poll::Ready(None) => {
|
||||
need_sync = true;
|
||||
rx_closed = true;
|
||||
|
||||
}
|
||||
Poll::Pending => {},
|
||||
}
|
||||
}
|
||||
|
||||
if need_sync {
|
||||
if let Some(mut fut) = sync_future.take() {
|
||||
match fut.as_mut().poll(cx) {
|
||||
Poll::Pending => {
|
||||
sync_future = Some(fut);
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(res) => {
|
||||
need_sync = false;
|
||||
stx.send(Event::Synchronized(res)).ok();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
sync_future.replace(Box::pin(
|
||||
async move { ut.lock().await.sync(&bus_clone).await },
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
return if rx_closed {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
};
|
||||
})
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriber<T> for SynchronizedAsyncSubscriber<T, M>
|
||||
pub struct SynchronizedAsync<M, R = (), E = anyhow::Error>
|
||||
where
|
||||
T: AsyncSynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
fn subscribe(
|
||||
self,
|
||||
tx: mpsc::UnboundedSender<Request<M>>,
|
||||
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
|
||||
}
|
||||
|
||||
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedAsync<M, R, E>
|
||||
where
|
||||
T: AsyncSynchronizedHandler<M, Response = R, Error = E> + 'static,
|
||||
R: Message,
|
||||
M: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
type Config = SynchronizedConfig;
|
||||
|
||||
fn build(
|
||||
_cfg: Self::Config,
|
||||
) -> (
|
||||
Arc<dyn ReceiverTrait>,
|
||||
Self,
|
||||
Box<
|
||||
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
>,
|
||||
) {
|
||||
let cfg = self.cfg;
|
||||
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||
let stats = Arc::new(SynchronizedStats {
|
||||
buffer: AtomicU64::new(0),
|
||||
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||
});
|
||||
|
||||
let arc = Arc::new(SynchronizedAsync::<M> {
|
||||
tx,
|
||||
stats: stats.clone(),
|
||||
});
|
||||
let (stx, srx) = mpsc::unbounded_channel();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let poller = Box::new(move |ut| {
|
||||
Box::new(move |bus| {
|
||||
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||
Box::pin(synchronized_poller::<T, M, R, E>(rx, bus, ut, stx))
|
||||
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||
});
|
||||
|
||||
(arc, poller)
|
||||
(
|
||||
SynchronizedAsync::<M, R, E> {
|
||||
tx,
|
||||
srx: parking_lot::Mutex::new(srx),
|
||||
},
|
||||
poller,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn buffer_unordered_poller<T, M>(
|
||||
rx: mpsc::Receiver<M>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stats: Arc<SynchronizedStats>,
|
||||
_cfg: SynchronizedConfig,
|
||||
) where
|
||||
T: AsyncSynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let mut x = rx.then(|msg| {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
|
||||
tokio::task::spawn(async move { ut.lock().await.handle(msg, &bus).await })
|
||||
});
|
||||
|
||||
while let Some(err) = x.next().await {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match err {
|
||||
Ok(Err(err)) => {
|
||||
let _ = bus.send(msgs::Error(Arc::new(err))).await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn(async move { ut.lock().await.sync(&bus_clone).await }).await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
let _ = bus.send(msgs::Error(Arc::new(err))).await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
println!("[EXIT] SynchronizedAsync<{}>", std::any::type_name::<M>());
|
||||
}
|
||||
|
||||
pub struct SynchronizedAsync<M: Message> {
|
||||
tx: mpsc::Sender<M>,
|
||||
stats: Arc<SynchronizedStats>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriberBuilder<M, T> for SynchronizedAsync<M>
|
||||
impl<M, R, E> SendUntypedReceiver for SynchronizedAsync<M, R, E>
|
||||
where
|
||||
T: AsyncSynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
type Entry = SynchronizedAsyncSubscriber<T, M>;
|
||||
type Config = SynchronizedConfig;
|
||||
|
||||
fn build(cfg: Self::Config) -> Self::Entry {
|
||||
SynchronizedAsyncSubscriber {
|
||||
cfg,
|
||||
_m: Default::default(),
|
||||
fn send(&self, m: Action) -> Result<(), SendError<Action>> {
|
||||
match self.tx.send(Request::Action(m)) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TypedReceiver<M> for SynchronizedAsync<M> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||
match self.tx.poll_ready(ctx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
impl<M, R, E> SendTypedReceiver<M> for SynchronizedAsync<M, R, E>
|
||||
where
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.send(Request::Request(mid, m)) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedAsync<M, R, E>
|
||||
where
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
|
||||
let poll = self.srx.lock().poll_recv(ctx);
|
||||
match poll {
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.try_send(m) {
|
||||
Ok(_) => {
|
||||
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
Poll::Ready(Some(event)) => Poll::Ready(event),
|
||||
Poll::Ready(None) => Poll::Ready(Event::Exited),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> ReceiverTrait for SynchronizedAsync<M> {
|
||||
fn typed(&self) -> AnyReceiver<'_> {
|
||||
AnyReceiver::new(self)
|
||||
}
|
||||
|
||||
fn type_id(&self) -> TypeId {
|
||||
TypeId::of::<SynchronizedAsync<M>>()
|
||||
}
|
||||
|
||||
fn stats(&self) -> ReceiverStats {
|
||||
ReceiverStats {
|
||||
name: std::any::type_name::<M>().into(),
|
||||
fields: vec![
|
||||
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||
(
|
||||
"buffer_total".into(),
|
||||
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||
),
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.tx.close();
|
||||
}
|
||||
|
||||
fn sync(&self) {
|
||||
self.tx.flush();
|
||||
}
|
||||
|
||||
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
|
@ -3,9 +3,8 @@ mod sync;
|
||||
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
pub use sync::{SynchronizedSync, SynchronizedSyncSubscriber};
|
||||
|
||||
pub use r#async::{SynchronizedAsync, SynchronizedAsyncSubscriber};
|
||||
pub use r#async::SynchronizedAsync;
|
||||
pub use sync::SynchronizedSync;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SynchronizedStats {
|
||||
|
@ -1,9 +1,4 @@
|
||||
use super::{SynchronizedConfig, SynchronizedStats};
|
||||
use crate::{receiver::ReceiverStats, receivers::mpsc};
|
||||
use futures::{executor::block_on, Future, StreamExt};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
@ -11,180 +6,216 @@ use std::{
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
builder::{ReceiverSubscriber, ReceiverSubscriberBuilder},
|
||||
msgs,
|
||||
receiver::{AnyReceiver, ReceiverTrait, SendError, TypedReceiver},
|
||||
receiver::{Action, Event, ReceiverStats, ReciveTypedReceiver, SendUntypedReceiver},
|
||||
receivers::Request,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use futures::{Future, executor::block_on};
|
||||
|
||||
use super::{SynchronizedConfig, SynchronizedStats};
|
||||
use crate::{
|
||||
builder::ReceiverSubscriberBuilder,
|
||||
receiver::{SendError, SendTypedReceiver},
|
||||
Bus, Message, SynchronizedHandler, Untyped,
|
||||
};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
pub struct SynchronizedSyncSubscriber<T, M>
|
||||
|
||||
fn synchronized_poller<T, M, R, E>(
|
||||
mut rx: mpsc::UnboundedReceiver<Request<M>>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stx: mpsc::UnboundedSender<Event<R, E>>,
|
||||
) -> impl Future<Output = ()>
|
||||
where
|
||||
T: SynchronizedHandler<M> + 'static,
|
||||
T: SynchronizedHandler<M, Response = R, Error = E> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
cfg: SynchronizedConfig,
|
||||
_m: PhantomData<(M, T)>,
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let mut handle_future: Option<Pin<Box<dyn Future<Output = (u64, Result<R, E>)> + Send>>> = None;
|
||||
let mut sync_future: Option<Pin<Box<dyn Future<Output = Result<(), E>> + Send>>> = None;
|
||||
let mut need_sync = false;
|
||||
let mut rx_closed = false;
|
||||
|
||||
futures::future::poll_fn(move |cx| loop {
|
||||
if let Some(mut fut) = handle_future.take() {
|
||||
match fut.as_mut().poll(cx) {
|
||||
Poll::Pending => {
|
||||
handle_future = Some(fut);
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
Poll::Ready((mid, resp)) => {
|
||||
stx.send(Event::Response(mid, resp)).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !rx_closed && !need_sync {
|
||||
match rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(a)) => match a {
|
||||
Request::Request(mid, msg) => {
|
||||
let bus = bus.clone();
|
||||
let ut = ut.clone();
|
||||
handle_future.replace(Box::pin(async move {
|
||||
(mid, tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus)).await.unwrap())
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
Request::Action(Action::Flush) => {stx.send(Event::Flushed).ok();}
|
||||
Request::Action(Action::Sync) => need_sync = true,
|
||||
Request::Action(Action::Close) => {
|
||||
rx.close();
|
||||
continue;
|
||||
},
|
||||
_ => unimplemented!(),
|
||||
},
|
||||
Poll::Ready(None) => {
|
||||
need_sync = true;
|
||||
rx_closed = true;
|
||||
}
|
||||
Poll::Pending => {},
|
||||
}
|
||||
}
|
||||
|
||||
if need_sync {
|
||||
if let Some(mut fut) = sync_future.take() {
|
||||
match fut.as_mut().poll(cx) {
|
||||
Poll::Pending => {
|
||||
sync_future = Some(fut);
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(res) => {
|
||||
need_sync = false;
|
||||
stx.send(Event::Synchronized(res)).ok();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
sync_future.replace(Box::pin(async move {
|
||||
tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus_clone)).await.unwrap()
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
return if rx_closed {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
};
|
||||
})
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriber<T> for SynchronizedSyncSubscriber<T, M>
|
||||
pub struct SynchronizedSync<M, R = (), E = anyhow::Error>
|
||||
where
|
||||
T: SynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
fn subscribe(
|
||||
self,
|
||||
tx: mpsc::UnboundedSender<Request<M>>,
|
||||
stats: Arc<SynchronizedStats>,
|
||||
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
|
||||
}
|
||||
|
||||
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedSync<M, R, E>
|
||||
where
|
||||
T: SynchronizedHandler<M, Response = R, Error = E> + 'static,
|
||||
R: Message,
|
||||
M: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
type Config = SynchronizedConfig;
|
||||
|
||||
fn build(
|
||||
cfg: Self::Config,
|
||||
) -> (
|
||||
Arc<dyn ReceiverTrait>,
|
||||
Self,
|
||||
Box<
|
||||
dyn FnOnce(Untyped) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
>,
|
||||
) {
|
||||
let cfg = self.cfg;
|
||||
let (tx, rx) = mpsc::channel(cfg.buffer_size);
|
||||
let stats = Arc::new(SynchronizedStats {
|
||||
buffer: AtomicU64::new(0),
|
||||
buffer_total: AtomicU64::new(cfg.buffer_size as _),
|
||||
});
|
||||
|
||||
let arc = Arc::new(SynchronizedSync::<M> {
|
||||
tx,
|
||||
stats: stats.clone(),
|
||||
});
|
||||
let (stx, srx) = mpsc::unbounded_channel();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let stats_clone = stats.clone();
|
||||
|
||||
let poller = Box::new(move |ut| {
|
||||
Box::new(move |bus| {
|
||||
Box::pin(buffer_unordered_poller::<T, M>(rx, bus, ut, stats, cfg))
|
||||
as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
Box::pin(synchronized_poller::<T, M, R, E>(
|
||||
rx,
|
||||
bus,
|
||||
ut,
|
||||
stx,
|
||||
)) as Pin<Box<dyn Future<Output = ()> + Send>>
|
||||
}) as Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>>
|
||||
});
|
||||
|
||||
(arc, poller)
|
||||
(
|
||||
SynchronizedSync::<M, R, E> {
|
||||
tx,
|
||||
stats,
|
||||
srx: parking_lot::Mutex::new(srx),
|
||||
},
|
||||
poller,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn buffer_unordered_poller<T, M>(
|
||||
rx: mpsc::Receiver<M>,
|
||||
bus: Bus,
|
||||
ut: Untyped,
|
||||
stats: Arc<SynchronizedStats>,
|
||||
_cfg: SynchronizedConfig,
|
||||
) where
|
||||
T: SynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
{
|
||||
let ut = ut.downcast::<Mutex<T>>().unwrap();
|
||||
let mut x = rx.then(|msg| {
|
||||
let ut = ut.clone();
|
||||
let bus = bus.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus))
|
||||
});
|
||||
|
||||
while let Some(err) = x.next().await {
|
||||
stats.buffer.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match err {
|
||||
Ok(Err(err)) => {
|
||||
let _ = bus.send(msgs::Error(Arc::new(err))).await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
let ut = ut.clone();
|
||||
let bus_clone = bus.clone();
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
futures::executor::block_on(ut.lock()).sync(&bus_clone)
|
||||
})
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Err(err)) => {
|
||||
let _ = bus.send(msgs::Error(Arc::new(err))).await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
println!(
|
||||
"[EXIT] BufferUnorderedSync<{}>",
|
||||
std::any::type_name::<M>()
|
||||
);
|
||||
}
|
||||
|
||||
pub struct SynchronizedSync<M: Message> {
|
||||
tx: mpsc::Sender<M>,
|
||||
stats: Arc<SynchronizedStats>,
|
||||
}
|
||||
|
||||
impl<T, M> ReceiverSubscriberBuilder<M, T> for SynchronizedSync<M>
|
||||
impl<M, R, E> SendUntypedReceiver for SynchronizedSync<M, R, E>
|
||||
where
|
||||
T: SynchronizedHandler<M> + 'static,
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
type Entry = SynchronizedSyncSubscriber<T, M>;
|
||||
type Config = SynchronizedConfig;
|
||||
|
||||
fn build(cfg: Self::Config) -> Self::Entry {
|
||||
SynchronizedSyncSubscriber {
|
||||
cfg,
|
||||
_m: Default::default(),
|
||||
fn send(&self, msg: Action) -> Result<(), SendError<Action>> {
|
||||
match self.tx.send(Request::Action(msg)) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(mpsc::error::SendError(Request::Action(msg))) => Err(SendError::Closed(msg)),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> TypedReceiver<M> for SynchronizedSync<M> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<()> {
|
||||
match self.tx.poll_ready(ctx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.try_send(m) {
|
||||
impl<M, R, E> SendTypedReceiver<M> for SynchronizedSync<M, R, E>
|
||||
where
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
fn send(&self, mid: u64, m: M) -> Result<(), SendError<M>> {
|
||||
match self.tx.send(Request::Request(mid, m)) {
|
||||
Ok(_) => {
|
||||
self.stats.buffer.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
Err(mpsc::error::SendError(Request::Request(_, msg))) => Err(SendError::Closed(msg)),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: Message> ReceiverTrait for SynchronizedSync<M> {
|
||||
fn typed(&self) -> AnyReceiver<'_> {
|
||||
AnyReceiver::new(self)
|
||||
}
|
||||
|
||||
fn type_id(&self) -> TypeId {
|
||||
TypeId::of::<SynchronizedSync<M>>()
|
||||
}
|
||||
|
||||
fn stats(&self) -> ReceiverStats {
|
||||
ReceiverStats {
|
||||
name: std::any::type_name::<M>().into(),
|
||||
fields: vec![
|
||||
("buffer".into(), self.stats.buffer.load(Ordering::SeqCst)),
|
||||
(
|
||||
"buffer_total".into(),
|
||||
self.stats.buffer_total.load(Ordering::SeqCst),
|
||||
),
|
||||
],
|
||||
impl<M, R, E> ReciveTypedReceiver<R, E> for SynchronizedSync<M, R, E>
|
||||
where
|
||||
M: Message,
|
||||
R: Message,
|
||||
E: crate::Error,
|
||||
{
|
||||
fn poll_events(&self, ctx: &mut Context<'_>) -> Poll<Event<R, E>> {
|
||||
let poll = self.srx.lock().poll_recv(ctx);
|
||||
match poll {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(event)) => Poll::Ready(event),
|
||||
Poll::Ready(None) => Poll::Ready(Event::Exited),
|
||||
}
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
self.tx.close();
|
||||
}
|
||||
|
||||
fn sync(&self) {
|
||||
self.tx.flush();
|
||||
}
|
||||
|
||||
fn poll_synchronized(&self, _ctx: &mut Context<'_>) -> Poll<()> {
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user