From 6ef65cbfd108b14f03005521709924f7ca2f5ca2 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Wed, 22 Sep 2021 16:47:25 +0400 Subject: [PATCH] QUIC Relay progress --- Cargo.toml | 4 +- crates/derive/Cargo.toml | 2 +- crates/derive/src/lib.rs | 8 +- crates/remote/Cargo.toml | 3 + crates/remote/src/relays/mod.rs | 54 +++ crates/remote/src/relays/quic/client.rs | 450 +++++++++++++++--- src/envelop.rs | 18 +- src/error.rs | 7 + src/receiver.rs | 8 +- src/receivers/buffer_unordered/async.rs | 2 +- src/receivers/buffer_unordered/sync.rs | 2 +- .../buffer_unordered_batched/async.rs | 2 +- .../buffer_unordered_batched/sync.rs | 2 +- src/receivers/synchronize_batched/async.rs | 2 +- src/receivers/synchronize_batched/sync.rs | 2 +- src/receivers/synchronized/async.rs | 2 +- src/receivers/synchronized/sync.rs | 2 +- src/relay.rs | 4 +- 18 files changed, 466 insertions(+), 108 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 13fd2dd..cb94d6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "messagebus" -version = "0.9.5" +version = "0.9.6" authors = ["Andrey Tkachenko "] repository = "https://github.com/andreytkachenko/messagebus.git" keywords = ["futures", "async", "tokio", "message", "bus"] @@ -17,7 +17,7 @@ members = [ ] [dependencies] -messagebus_derive = "0.1" +messagebus_derive = "0.2" tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } parking_lot = "0.11" async-trait = "0.1" diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index 498d50c..d7ef8ce 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "messagebus_derive" -version = "0.1.0" +version = "0.2.0" authors = ["Andrey Tkachenko "] repository = "https://github.com/andreytkachenko/messagebus.git" keywords = ["futures", "async", "tokio", "message", "bus"] diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 772c6f4..b3ac1ac 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -14,15 +14,15 @@ fn shared_part(_ast: &syn::DeriveInput, has_shared: bool) -> proc_macro2::TokenS quote! { fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {Some(self)} fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{Some(self)} - fn as_shared_boxed(self: std::boxed::Box) -> Option>{Some(self)} + fn as_shared_boxed(self: Box) -> Result, Box> {Ok(self)} fn as_shared_arc(self: std::sync::Arc) -> Option>{Some(self)} } } else { quote! { fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {None} - fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{None} - fn as_shared_boxed(self: std::boxed::Box) -> Option>{None} - fn as_shared_arc(self: std::sync::Arc) -> Option>{None} + fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage> {None} + fn as_shared_boxed(self: Box) -> Result, Box> {Err(self)} + fn as_shared_arc(self: std::sync::Arc) -> Option> {None} } } } diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 8f21434..0d18abc 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -25,6 +25,9 @@ erased-serde = "0.3.16" serde_derive = "1.0.130" serde = "1.0.130" futures = "0.3.17" +cbor = "0.4.1" +serde_cbor = "0.11.2" +bytes = "1.1.0" [dev-dependencies] tokio = { version = "1.11.0", features = ["full"] } diff --git a/crates/remote/src/relays/mod.rs b/crates/remote/src/relays/mod.rs index f73dbbf..36637d9 100644 --- a/crates/remote/src/relays/mod.rs +++ b/crates/remote/src/relays/mod.rs @@ -1,5 +1,59 @@ // #[cfg(feature = "quic")] mod quic; +use futures::Stream; +use messagebus::{error::GenericError, Event, Message, TypeTag}; +use std::{collections::HashMap, pin::Pin}; + // #[cfg(feature = "quic")] pub use quic::*; + +pub(crate) type GenericEventStream = + Pin, GenericError>> + Send>>; + + +#[derive(Debug, Default)] +pub(crate) struct MessageTable { + table: HashMap>, +} + +impl MessageTable { + pub fn new() -> Self { + Self { + table: HashMap::new(), + } + } + + pub fn add(&mut self, req: TypeTag, resp: TypeTag, err: TypeTag) { + self.table.entry(req) + .or_insert_with(Vec::new) + .push((resp, err)); + } + + pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { + self.table.get(msg).map_or(false, |v| { + v.iter().any(|(r, e)| { + resp.map_or(true, |resp| resp.as_ref() == r.as_ref()) + && err.map_or(true, |err| err.as_ref() == e.as_ref()) + }) + }) + } + + pub fn iter_types(&self) -> impl Iterator + '_ { + self.table + .iter() + .map(|(k, v)| v.iter().map(move |(e, r)| (k, r, e))) + .flatten() + } +} + + +impl From> for MessageTable { + fn from(table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Self { + let mut outgoing_table = MessageTable::default(); + for (x, y, z) in table { + outgoing_table.add(x, y, z); + } + outgoing_table + } +} \ No newline at end of file diff --git a/crates/remote/src/relays/quic/client.rs b/crates/remote/src/relays/quic/client.rs index 77afd60..8372dce 100644 --- a/crates/remote/src/relays/quic/client.rs +++ b/crates/remote/src/relays/quic/client.rs @@ -1,15 +1,16 @@ -use crate::error::Error; -use futures::Stream; -use messagebus::{ - error::{GenericError, SendError}, - Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, - TypeTagAccept, -}; -use std::{ - collections::{HashMap, HashSet}, - net::SocketAddr, - pin::Pin, +use crate::{ + error::Error, + relays::{GenericEventStream, MessageTable}, }; +use futures::{Future, FutureExt, Stream, pin_mut}; +use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, SharedMessage, TypeTag, TypeTagAccept, TypeTagged, error::{GenericError, SendError}}; +use parking_lot::Mutex; +use serde::Deserialize; +use serde_derive::{Deserialize, Serialize}; +use core::slice::SlicePattern; +use std::{collections::{HashMap, HashSet}, net::SocketAddr, pin::Pin, sync::atomic::AtomicBool, task::Poll}; +use tokio::{io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}, sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}}; +use bytes::{Buf, BufMut}; pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; @@ -35,19 +36,24 @@ impl QuicClientRelayEndpoint { Ok(Self { endpoint }) } - pub async fn connect( - &self, - addr: SocketAddr, - host: &str, - ) -> Result { - let quinn::NewConnection { connection, .. } = self.endpoint.connect(&addr, host)?.await?; - let (send, recv) = connection.open_bi().await?; - Ok(QuicClientConnection { - connection, - send, - recv, - }) + pub fn connect( + &self, + addr: &SocketAddr, + host: &str, + ) -> impl Future> { + let conn = self.endpoint.connect(addr, host); + + async move { + let quinn::NewConnection { connection, .. } = conn?.await?; + let (send, recv) = connection.open_bi().await?; + + Ok(QuicClientConnection { + connection, + send, + recv, + }) + } } #[inline] @@ -62,39 +68,233 @@ pub struct QuicClientConnection { recv: quinn::RecvStream, } -impl QuicClientConnection { - #[inline] - pub fn send(&self, req: Request) -> Result<(), Error> { - Ok(()) +pub struct QuicClientRelay { + ready_flag: AtomicBool, + addr: SocketAddr, + host: String, + endpoint: QuicClientRelayEndpoint, + outgoing_table: MessageTable, + sender: UnboundedSender, + receiver_send: Mutex, UnboundedReceiver)>>, + receiver_recv: Mutex>>, +} + +impl QuicClientRelay { + pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { + let endpoint = QuicClientRelayEndpoint::new(cert)?; + let mut outgoing_table = MessageTable::from(table); + let (sender, receiver) = mpsc::unbounded_channel(); + let (recv_send, recv_recv) = oneshot::channel(); + + Ok(Self { + ready_flag: AtomicBool::new(false), + addr, + host, + endpoint, + outgoing_table, + sender, + receiver_send: Mutex::new(Some((recv_send, receiver))), + receiver_recv: Mutex::new(Some(recv_recv)), + }) } } -pub struct TypeTable { - type_tags: HashSet, +#[derive(Deserialize, Serialize)] +#[repr(u16)] +pub enum ProtocolHeaderActionKind { + Nop, + Send, + Response, + Flush, + Flushed, + Synchronize, + Synchronized, + BatchComplete, + Close, + Exited, + Initialize, + Ready, + Pause, + Paused, + Error, } -pub struct QuicClientRelay { - connection: QuicClientConnection, - incoming_table: HashMap>, - outgoing_table: HashMap>, +#[derive(Deserialize, Serialize)] +pub struct ProtocolHeader<'a> { + kind: ProtocolHeaderActionKind, + type_tag: Option<&'a [u8]>, + failed: bool, + body_encoding: u32, + argument: u64, +} + +impl<'a> ProtocolHeader<'a> { + pub fn send(mid: u64, tt: &'a TypeTag) -> ProtocolHeader<'a> { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Send, + type_tag: Some(tt.as_bytes()), + failed: false, + body_encoding: 0, + argument: mid, + } + } + + pub fn flush() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Flush, + type_tag: None, + failed: false, + body_encoding: 0, + argument: 0, + } + } + + pub fn close() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Close, + type_tag: None, + failed: false, + body_encoding: 0, + argument: 0, + } + } + + pub fn sync() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Synchronize, + type_tag: None, + failed: false, + body_encoding: 0, + argument: 0, + } + } + + pub fn init() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Initialize, + type_tag: None, + failed: false, + body_encoding: 0, + argument: 0, + } + } + + pub fn pause() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Pause, + type_tag: None, + failed: false, + body_encoding: 0, + argument: 0, + } + } +} + +#[derive(Deserialize, Serialize)] +pub struct ProtocolPacket<'a> { + header: ProtocolHeader<'a>, + body: &'a [u8] +} + +#[derive(Debug)] +pub enum Request { + Init, + Flush, + Sync, + Close, + Stats, + Send(u64, Box, bool), +} + +impl Request { + pub async fn serialize(&self, mut header_buff: &mut Vec, body_buff: &mut Vec, conn: &mut W) -> Result<(), Error> { + body_buff.clear(); + header_buff.clear(); + + let mut tt = TypeTag::Borrowed(""); + let header = match self { + Request::Init => unimplemented!(), + Request::Flush => ProtocolHeader::flush(), + Request::Sync => ProtocolHeader::sync(), + Request::Close => ProtocolHeader::close(), + Request::Stats => unimplemented!(), + Request::Send(mid, msg, req) => { + tt = msg.type_tag(); + + let mut cbor_se = serde_cbor::Serializer::new(&mut *body_buff); + let mut se = ::erase(&mut cbor_se); + + msg.erased_serialize(&mut se); + + ProtocolHeader::send(if *req {*mid} else {0}, &tt) + } + }; + + serde_cbor::to_writer(&mut header_buff, &ProtocolPacket { + header, + body: body_buff.as_slice(), + }).unwrap(); + + let mut buf = [0u8; 16]; + let mut writer = &mut buf[..]; + + writer.put(&b"MBUS"[..]); + writer.put_u16(1); + writer.put_u16(0); + writer.put_u64(header_buff.len() as _); + + conn.write_all(&buf).await?; + conn.write_all(&header_buff).await?; + + Ok(()) + // buff. + + + // let x = match self { + // Request::Init => unimplemented!(), + // Request::Flush => ProtocolHeader::Flush, + // Request::Sync => ProtocolHeader::Synchronize, + // Request::Close => ProtocolHeader::Close, + // Request::Stats => unimplemented!(), + // Request::Send(_mid, msg) => { + // let ser = serde_cbor::Serializer::new(buff); + + // ProtocolHeader::Send(msg.type_tag()) + // } + + // Request::Request(mid, msg) => { + // ProtocolHeader::Request(*mid, msg.type_tag()) + // } + // }; + } +} + +impl From for Request { + fn from(action: Action) -> Self { + match action { + Action::Init => Request::Init, + Action::Flush => Request::Flush, + Action::Sync => Request::Sync, + Action::Close => Request::Close, + Action::Stats => Request::Stats, + _ => unimplemented!(), + } + } +} + +impl From<(bool, u64, Box)> for Request { + fn from((req, mid, msg): (bool, u64, Box)) -> Self { + Request::Send(mid, msg, req) + } } impl TypeTagAccept for QuicClientRelay { fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { - self.outgoing_table.get(msg).map_or(false, |v| { - v.into_iter().any(|(r, e)| { - resp.map_or(true, |resp| resp.as_ref() == r.as_ref()) - && err.map_or(true, |err| err.as_ref() == e.as_ref()) - }) - }) + self.outgoing_table.accept(msg, resp, err) } fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { - let iter = self - .outgoing_table - .iter() - .map(|(k, v)| v.iter().map(move |(e, r)| (k, r, e))) - .flatten(); + let iter = self.outgoing_table.iter_types(); for (m, r, e) in iter { if cb(m, r, e) { @@ -104,28 +304,27 @@ impl TypeTagAccept for QuicClientRelay { } } -#[derive(Debug)] -pub enum Request { - Flush, - Sync, - Close, - Stats, - Send(u64, Box), - Request(u64, Box), -} - impl SendUntypedReceiver for QuicClientRelay { fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { - self.connection - .send(match msg { - Action::Init => return Ok(()), - Action::Flush => Request::Flush, - Action::Sync => Request::Sync, - Action::Close => Request::Close, - Action::Stats => Request::Stats, - _ => unimplemented!(), - }) - .unwrap(); + match msg { + Action::Init => { + let (sender, mut rx) = self.receiver_send.lock().take().unwrap(); + let conn = self.endpoint.connect(&self.addr, &self.host); + + tokio::spawn(async move { + let mut conn = conn.await.unwrap(); + sender.send((conn.recv, conn.connection)).unwrap(); + let mut buf1 = Vec::new(); + let mut buf2 = Vec::new(); + + while let Some(r) = rx.recv().await { + r.serialize(&mut buf1, &mut buf2, &mut conn.send); + } + }); + } + + other => self.sender.send(other.into()).unwrap(), + } Ok(()) } @@ -137,26 +336,121 @@ impl SendUntypedReceiver for QuicClientRelay { req: bool, _bus: &Bus, ) -> Result<(), SendError>> { - self.connection - .send(if req { - Request::Request(mid, msg) - } else { - Request::Send(mid, msg) - }) - .unwrap(); + msg.as_shared_boxed() + self.sender.send((req, mid, msg).into()).unwrap(); Ok(()) } } + impl ReciveUntypedReceiver for QuicClientRelay { - type Stream = Pin, GenericError>> + Send>>; + type Stream = GenericEventStream; - fn event_stream(&self) -> Self::Stream { - // let mut rx = self.srx.lock().take().unwrap(); + fn event_stream(&self, bus: Bus) -> Self::Stream { + let recv = self.receiver_recv.lock().take().unwrap(); - // Box::pin(futures::stream::poll_fn(move |cx|rx.poll_recv(cx))) - Box::pin(futures::stream::poll_fn(move |_cx| { - std::task::Poll::Pending - })) + Box::pin(async move { + let buff = Vec::with_capacity(1024); + let (recv, conn) = recv.await.unwrap(); + futures::stream::unfold( + (true, recv, conn, bus, buff), + |(first, mut recv, conn, bus, mut buff)| async move { + if first { + return Some((Event::Ready, (false, recv, conn, bus, buff))); + } + + unsafe { buff.set_len(16) }; + recv.read_exact(&mut buff).await.unwrap(); + + let mut reader = &buff[..]; + let mut sign = [0u8; 4]; + reader.copy_to_slice(&mut sign); + assert!(&sign != b"MBUS"); + + let version = reader.get_u16(); + assert!(version == 1); + + let content_type = reader.get_u16(); + + let body_size = reader.get_u64(); + let diff = buff.capacity() as i64 - body_size as i64; + if diff < 0 { + buff.reserve(-diff as usize); + } + + unsafe { buff.set_len(body_size as usize); } + recv.read_exact(&mut buff).await.unwrap(); + + let event = match content_type { + 0 => { // CBOR + let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap(); + + use ProtocolHeaderActionKind::*; + match proto.header.kind { + Nop => unimplemented!(), + Response => { + let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); + + let res = if proto.header.failed { + Err( + messagebus::error::Error::Other(GenericError { + type_tag: tt.into(), + description: "unknown".into() + } + )) + } else { + let mut cbor_de = serde_cbor::Deserializer::from_slice(proto.body); + let mut de = ::erase(&mut cbor_de); + + bus.deserialize_message(tt.into(), &mut de) + .map_err(|x| x.map_msg(|_| ())) + }; + + Event::Response(proto.header.argument, res) + } + Ready => { + if proto.header.failed { + let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); + Event::InitFailed(messagebus::error::Error::Other(GenericError { + type_tag: tt.into(), + description: "unknown".into() + })) + } else { + Event::Ready + } + } + Exited => Event::Exited, + Pause => Event::Pause, + BatchComplete => Event::Finished(proto.header.argument), + Flushed => Event::Flushed, + Error => { + let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); + Event::Error(GenericError { + type_tag: tt.into(), + description: "unknown".into() + }) + } + Synchronized => { + if proto.header.failed { + let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); + Event::Synchronized( Err(messagebus::error::Error::Other(GenericError { + type_tag: tt.into(), + description: "unknown".into() + }))) + } else { + Event::Synchronized(Ok(())) + } + }, + + _ => unimplemented!() + } + }, + _ => unimplemented!() + }; + + Some((event, (false, recv, conn, bus, buff))) + }, + ) + }.flatten_stream()) } } diff --git a/src/envelop.rs b/src/envelop.rs index cc6c7da..575e1e0 100644 --- a/src/envelop.rs +++ b/src/envelop.rs @@ -27,7 +27,7 @@ pub trait Message: MessageBounds { fn as_shared_ref(&self) -> Option<&dyn SharedMessage>; fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage>; - fn as_shared_boxed(self: Box) -> Option>; + fn as_shared_boxed(self: Box) -> Result, Box>; fn as_shared_arc(self: Arc) -> Option>; fn try_clone_into(&self, into: &mut dyn Any) -> bool; @@ -123,8 +123,8 @@ impl Message for () { fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { Some(self) } - fn as_shared_boxed(self: Box) -> Option> { - Some(self) + fn as_shared_boxed(self: Box) -> Result, Box> { + Ok(self) } fn as_shared_arc(self: Arc) -> Option> { Some(self) @@ -221,8 +221,8 @@ mod tests { fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { None } - fn as_shared_boxed(self: Box) -> Option> { - None + fn as_shared_boxed(self: Box) -> Result, Box> { + Err(self) } fn as_shared_arc(self: Arc) -> Option> { None @@ -270,8 +270,8 @@ mod tests { fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { None } - fn as_shared_boxed(self: Box) -> Option> { - None + fn as_shared_boxed(self: Box) -> Result, Box> { + Err(self) } fn as_shared_arc(self: Arc) -> Option> { None @@ -328,8 +328,8 @@ mod tests { fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> { Some(self) } - fn as_shared_boxed(self: Box) -> Option> { - Some(self) + fn as_shared_boxed(self: Box) -> Result, Box> { + Ok(self) } fn as_shared_arc(self: Arc) -> Option> { Some(self) diff --git a/src/error.rs b/src/error.rs index d25d138..f0175e0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,6 +29,13 @@ impl GenericError { description: format!("{}[{}]", err.type_tag(), err), } } + + pub fn from_err(tt: TypeTag, err: impl fmt::Display) -> Self { + GenericError { + description: format!("{}[{}]", tt, err), + type_tag: tt, + } + } } impl fmt::Display for GenericError { diff --git a/src/receiver.rs b/src/receiver.rs index 1ee1b6e..265aaa2 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -54,13 +54,13 @@ where { type Stream: Stream> + Send; - fn event_stream(&self) -> Self::Stream; + fn event_stream(&self, bus: Bus) -> Self::Stream; } pub trait ReciveUntypedReceiver: Sync { type Stream: Stream, GenericError>> + Send; - fn event_stream(&self) -> Self::Stream; + fn event_stream(&self, bus: Bus) -> Self::Stream; } pub trait WrapperReturnTypeOnly: Send + Sync { @@ -182,10 +182,10 @@ where S: SendUntypedReceiver + ReciveTypedReceiver + Send + Sync + 'static, { fn start_polling_events(self: Arc) -> BusPollerCallback { - Box::new(move |_| { + Box::new(move |bus| { Box::pin(async move { let this = self.clone(); - let events = this.inner.event_stream(); + let events = this.inner.event_stream(bus); pin_mut!(events); loop { diff --git a/src/receivers/buffer_unordered/async.rs b/src/receivers/buffer_unordered/async.rs index 0c66ef5..cefac00 100644 --- a/src/receivers/buffer_unordered/async.rs +++ b/src/receivers/buffer_unordered/async.rs @@ -137,7 +137,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/buffer_unordered/sync.rs b/src/receivers/buffer_unordered/sync.rs index 03bef8c..4f089da 100644 --- a/src/receivers/buffer_unordered/sync.rs +++ b/src/receivers/buffer_unordered/sync.rs @@ -140,7 +140,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/buffer_unordered_batched/async.rs b/src/receivers/buffer_unordered_batched/async.rs index b4c6dd8..168230d 100644 --- a/src/receivers/buffer_unordered_batched/async.rs +++ b/src/receivers/buffer_unordered_batched/async.rs @@ -138,7 +138,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/buffer_unordered_batched/sync.rs b/src/receivers/buffer_unordered_batched/sync.rs index 3b62f26..a4fd363 100644 --- a/src/receivers/buffer_unordered_batched/sync.rs +++ b/src/receivers/buffer_unordered_batched/sync.rs @@ -144,7 +144,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/synchronize_batched/async.rs b/src/receivers/synchronize_batched/async.rs index 8e9c053..53ebd5d 100644 --- a/src/receivers/synchronize_batched/async.rs +++ b/src/receivers/synchronize_batched/async.rs @@ -111,7 +111,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/synchronize_batched/sync.rs b/src/receivers/synchronize_batched/sync.rs index 9042268..34b01de 100644 --- a/src/receivers/synchronize_batched/sync.rs +++ b/src/receivers/synchronize_batched/sync.rs @@ -115,7 +115,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/synchronized/async.rs b/src/receivers/synchronized/async.rs index cb86ea3..3e22a8f 100644 --- a/src/receivers/synchronized/async.rs +++ b/src/receivers/synchronized/async.rs @@ -111,7 +111,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/receivers/synchronized/sync.rs b/src/receivers/synchronized/sync.rs index b44e788..18ec341 100644 --- a/src/receivers/synchronized/sync.rs +++ b/src/receivers/synchronized/sync.rs @@ -112,7 +112,7 @@ where { type Stream = Pin> + Send>>; - fn event_stream(&self) -> Self::Stream { + fn event_stream(&self, _: Bus) -> Self::Stream { let mut rx = self.srx.lock().take().unwrap(); Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx))) diff --git a/src/relay.rs b/src/relay.rs index d4a6aba..7fbedf8 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -221,10 +221,10 @@ where } fn start_polling(self: Arc) -> BusPollerCallback { - Box::new(move |_| { + Box::new(move |bus| { Box::pin(async move { let this = self.clone(); - let events = this.inner.event_stream(); + let events = this.inner.event_stream(bus); pin_mut!(events); loop {