Refacroting ReciveTypedReceiver

This commit is contained in:
Andrey Tkachenko 2021-09-20 19:31:06 +04:00
parent ce64f7f4bc
commit 9d014d38a1
15 changed files with 137 additions and 158 deletions

View File

@ -29,7 +29,7 @@ thiserror = "1"
erased-serde = "0.3"
serde = "1"
serde_derive = "1"
dashmap = "4.0.2"
dashmap = "4.0"
[dev-dependencies]
anyhow = "1.0"

View File

@ -24,5 +24,9 @@ rmp-serde = "0.15.5"
erased-serde = "0.3.16"
serde_derive = "1.0.130"
serde = "1.0.130"
futures = "0.3.17"
[dev-dependencies]
tokio = { version = "1.11.0", features = ["full"] }
# quinn = { version = "0.7", optional = true }

View File

@ -0,0 +1,2 @@
#[tokio::main]
async fn main() {}

View File

@ -1,5 +1,5 @@
use crate::error::Error;
use core::task::{Context, Poll};
use futures::Stream;
use messagebus::{
error::{GenericError, SendError},
Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag,
@ -8,6 +8,7 @@ use messagebus::{
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
pin::Pin,
};
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
@ -34,7 +35,6 @@ impl QuicClientRelayEndpoint {
Ok(Self { endpoint })
}
pub async fn connect(
&self,
addr: SocketAddr,
@ -63,6 +63,7 @@ pub struct QuicClientConnection {
}
impl QuicClientConnection {
#[inline]
pub fn send(&self, req: Request) -> Result<(), Error> {
Ok(())
}
@ -147,21 +148,15 @@ impl SendUntypedReceiver for QuicClientRelay {
Ok(())
}
}
impl ReciveUntypedReceiver for QuicClientRelay {
fn poll_events(
&self,
ctx: &mut Context<'_>,
_bus: &Bus,
) -> Poll<Event<Box<dyn Message>, GenericError>> {
Poll::Pending
type Stream = Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, GenericError>> + Send>>;
// let poll = self.srx.lock().poll_recv(ctx);
fn event_stream(&self) -> Self::Stream {
// let mut rx = self.srx.lock().take().unwrap();
// match poll {
// Poll::Pending => Poll::Pending,
// Poll::Ready(Some(event)) => Poll::Ready(event),
// Poll::Ready(None) => Poll::Ready(Event::Exited),
// }
// Box::pin(futures::stream::poll_fn(move |cx|rx.poll_recv(cx)))
Box::pin(futures::stream::poll_fn(move |_cx| {
std::task::Poll::Pending
}))
}
}

View File

@ -13,9 +13,9 @@ use core::{
mem,
pin::Pin,
sync::atomic::{AtomicBool, AtomicI64, Ordering},
task::{Context, Poll},
};
use futures::{future::poll_fn, Future, FutureExt};
use futures::{pin_mut, Stream};
use futures::{Future, FutureExt, StreamExt};
use std::{borrow::Cow, sync::Arc};
use tokio::sync::{oneshot, Notify};
@ -48,15 +48,15 @@ where
M: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, bus: &Bus) -> Poll<Event<M, E>>;
type Stream: Stream<Item = Event<M, E>> + Send;
fn event_stream(&self) -> Self::Stream;
}
pub trait ReciveUntypedReceiver: Sync {
fn poll_events(
&self,
ctx: &mut Context<'_>,
bus: &Bus,
) -> Poll<Event<Box<dyn Message>, GenericError>>;
type Stream: Stream<Item = Event<Box<dyn Message>, GenericError>> + Send;
fn event_stream(&self) -> Self::Stream;
}
pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
@ -184,12 +184,19 @@ where
fn start_polling_events(
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
Box::new(move |bus| {
Box::new(move |_| {
Box::pin(async move {
loop {
let this = self.clone();
let bus = bus.clone();
let event = poll_fn(move |ctx| this.inner.poll_events(ctx, &bus)).await;
let events = this.inner.event_stream();
pin_mut!(events);
loop {
let event = if let Some(event) = events.next().await {
event
} else {
self.context.closed.notify_waiters();
break;
};
match event {
Event::Error(err) => error!("Batch Error: {}", err),
@ -489,12 +496,10 @@ pub struct AnyReceiver<'a> {
}
impl<'a> AnyReceiver<'a> {
pub fn new<M, R, E, S>(rcvr: &'a S) -> Self
pub fn new<M, S>(rcvr: &'a S) -> Self
where
M: Message,
R: Message,
E: StdSyncSendError,
S: SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
S: SendTypedReceiver<M> + 'static,
{
let send_typed_receiver = rcvr as &(dyn SendTypedReceiver<M>);
let send_typed_receiver: TraitObject = unsafe { mem::transmute(send_typed_receiver) };

View File

@ -4,7 +4,6 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{
@ -18,7 +17,7 @@ use crate::{
use super::{BufferUnorderedConfig, BufferUnorderedStats};
use futures::Future;
use futures::{Future, Stream};
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedSender};
@ -45,7 +44,7 @@ where
{
tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedAsync<M, R, E>
@ -93,7 +92,7 @@ where
BufferUnorderedAsync::<M, R, E> {
tx,
stats,
srx: Mutex::new(srx),
srx: Mutex::new(Some(srx)),
},
poller,
)
@ -140,12 +139,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -4,7 +4,6 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use super::{BufferUnorderedConfig, BufferUnorderedStats};
@ -17,7 +16,7 @@ use crate::{
Bus, Handler, Message, Untyped,
};
use futures::Future;
use futures::{Future, Stream};
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedSender};
@ -48,7 +47,7 @@ where
{
tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for BufferUnorderedSync<M, R, E>
@ -96,7 +95,7 @@ where
BufferUnorderedSync::<M, R, E> {
tx,
stats,
srx: Mutex::new(srx),
srx: Mutex::new(Some(srx)),
},
poller,
)
@ -143,12 +142,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -4,7 +4,6 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use crate::{
@ -17,7 +16,7 @@ use crate::{
};
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use futures::Future;
use futures::{Future, Stream};
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedSender};
@ -43,7 +42,7 @@ where
{
tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedBatchedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
@ -94,7 +93,7 @@ where
BufferUnorderedBatchedAsync::<M, R, T::Error> {
tx,
stats,
srx: Mutex::new(srx),
srx: Mutex::new(Some(srx)),
},
poller,
)
@ -141,12 +140,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -4,7 +4,6 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::{Context, Poll},
};
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
@ -17,7 +16,7 @@ use crate::{
BatchHandler, Bus, Message, Untyped,
};
use futures::Future;
use futures::{Future, Stream};
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedSender};
@ -49,7 +48,7 @@ where
{
tx: mpsc::UnboundedSender<Request<M>>,
stats: Arc<BufferUnorderedBatchedStats>,
srx: Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
@ -100,7 +99,7 @@ where
BufferUnorderedBatchedSync::<M, R, T::Error> {
tx,
stats,
srx: Mutex::new(srx),
srx: Mutex::new(Some(srx)),
},
poller,
)
@ -147,12 +146,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -1,9 +1,6 @@
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use std::{pin::Pin, sync::Arc};
use super::SynchronizedBatchedConfig;
use crate::{
batch_synchronized_poller_macro,
builder::ReceiverSubscriberBuilder,
@ -13,10 +10,7 @@ use crate::{
AsyncBatchSynchronizedHandler, Bus, Message, Untyped,
};
use futures::Future;
use super::SynchronizedBatchedConfig;
use futures::{Future, Stream};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
@ -42,7 +36,7 @@ where
E: StdSyncSendError,
{
tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
@ -76,7 +70,7 @@ where
(
SynchronizedBatchedAsync::<M, R, T::Error> {
tx,
srx: parking_lot::Mutex::new(srx),
srx: parking_lot::Mutex::new(Some(srx)),
},
poller,
)
@ -119,12 +113,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -1,8 +1,4 @@
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use std::{pin::Pin, sync::Arc};
use crate::{
batch_synchronized_poller_macro,
@ -14,7 +10,7 @@ use crate::{
};
use super::SynchronizedBatchedConfig;
use futures::{executor::block_on, Future};
use futures::{executor::block_on, Future, Stream};
use tokio::sync::{
mpsc::{self, UnboundedSender},
Mutex,
@ -44,7 +40,7 @@ where
E: StdSyncSendError,
{
tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R> ReceiverSubscriberBuilder<T, M, R, T::Error>
@ -78,7 +74,7 @@ where
(
SynchronizedBatchedSync::<M, R, T::Error> {
tx,
srx: parking_lot::Mutex::new(srx),
srx: parking_lot::Mutex::new(Some(srx)),
},
poller,
)
@ -121,12 +117,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -1,11 +1,7 @@
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use std::{pin::Pin, sync::Arc};
use crate::synchronized_poller_macro;
use futures::Future;
use futures::{Future, Stream};
use super::SynchronizedConfig;
use crate::{
@ -44,7 +40,7 @@ where
E: StdSyncSendError,
{
tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedAsync<M, R, E>
@ -77,7 +73,7 @@ where
(
SynchronizedAsync::<M, R, E> {
tx,
srx: parking_lot::Mutex::new(srx),
srx: parking_lot::Mutex::new(Some(srx)),
},
poller,
)
@ -120,12 +116,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -1,11 +1,7 @@
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use std::{pin::Pin, sync::Arc};
use crate::synchronized_poller_macro;
use futures::{executor::block_on, Future};
use futures::{executor::block_on, Future, Stream};
use super::SynchronizedConfig;
use crate::{
@ -45,7 +41,7 @@ where
E: StdSyncSendError,
{
tx: mpsc::UnboundedSender<Request<M>>,
srx: parking_lot::Mutex<mpsc::UnboundedReceiver<Event<R, E>>>,
srx: parking_lot::Mutex<Option<mpsc::UnboundedReceiver<Event<R, E>>>>,
}
impl<T, M, R, E> ReceiverSubscriberBuilder<T, M, R, E> for SynchronizedSync<M, R, E>
@ -78,7 +74,7 @@ where
(
SynchronizedSync::<M, R, E> {
tx,
srx: parking_lot::Mutex::new(srx),
srx: parking_lot::Mutex::new(Some(srx)),
},
poller,
)
@ -121,12 +117,11 @@ where
R: Message,
E: StdSyncSendError,
{
fn poll_events(&self, ctx: &mut Context<'_>, _bus: &Bus) -> Poll<Event<R, E>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}

View File

@ -12,7 +12,7 @@ use core::{
sync::atomic::{AtomicBool, AtomicU64, Ordering},
};
use dashmap::DashMap;
use futures::{future::poll_fn, Future};
use futures::{pin_mut, Future, StreamExt};
use std::sync::Arc;
use tokio::sync::{oneshot, Notify};
@ -226,12 +226,19 @@ where
fn start_polling(
self: Arc<Self>,
) -> Box<dyn FnOnce(Bus) -> Pin<Box<dyn Future<Output = ()> + Send>>> {
Box::new(move |bus| {
Box::new(move |_| {
Box::pin(async move {
loop {
let this = self.clone();
let bus = bus.clone();
let event = poll_fn(move |ctx| this.inner.poll_events(ctx, &bus)).await;
let events = this.inner.event_stream();
pin_mut!(events);
loop {
let event = if let Some(event) = events.next().await {
event
} else {
self.context.closed.notify_waiters();
break;
};
match event {
Event::Error(err) => error!("Batch Error: {}", err),

View File

@ -1,6 +1,7 @@
use std::task::{Context, Poll};
use std::pin::Pin;
use async_trait::async_trait;
use futures::Stream;
use messagebus::{
derive::{Error as MbError, Message},
error::{self, GenericError},
@ -46,7 +47,7 @@ impl AsyncHandler<Msg<i32>> for TmpReceiver {
pub struct TestRelay {
stx: mpsc::UnboundedSender<Event<Box<dyn Message>, GenericError>>,
srx: Mutex<mpsc::UnboundedReceiver<Event<Box<dyn Message>, GenericError>>>,
srx: Mutex<Option<mpsc::UnboundedReceiver<Event<Box<dyn Message>, GenericError>>>>,
}
impl TypeTagAccept for TestRelay {
@ -154,17 +155,12 @@ impl SendUntypedReceiver for TestRelay {
}
impl ReciveUntypedReceiver for TestRelay {
fn poll_events(
&self,
ctx: &mut Context<'_>,
_bus: &Bus,
) -> Poll<Event<Box<dyn Message>, error::GenericError>> {
let poll = self.srx.lock().poll_recv(ctx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) => Poll::Ready(Event::Exited),
}
type Stream = Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, error::GenericError>> + Send>>;
fn event_stream(&self) -> Self::Stream {
let mut rx = self.srx.lock().take().unwrap();
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
}
}
@ -173,7 +169,7 @@ async fn test_relay() {
let (stx, srx) = mpsc::unbounded_channel();
let relay = TestRelay {
stx,
srx: Mutex::new(srx),
srx: Mutex::new(Some(srx)),
};
let (b, poller) = Bus::build()