QUIC Relay progress
This commit is contained in:
parent
22d1d4a569
commit
6ef65cbfd1
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "messagebus"
|
name = "messagebus"
|
||||||
version = "0.9.5"
|
version = "0.9.6"
|
||||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||||
@ -17,7 +17,7 @@ members = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
messagebus_derive = "0.1"
|
messagebus_derive = "0.2"
|
||||||
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
|
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
|
||||||
parking_lot = "0.11"
|
parking_lot = "0.11"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "messagebus_derive"
|
name = "messagebus_derive"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||||
|
@ -14,14 +14,14 @@ fn shared_part(_ast: &syn::DeriveInput, has_shared: bool) -> proc_macro2::TokenS
|
|||||||
quote! {
|
quote! {
|
||||||
fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {Some(self)}
|
fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {Some(self)}
|
||||||
fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{Some(self)}
|
fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage>{Some(self)}
|
||||||
fn as_shared_boxed(self: std::boxed::Box<Self>) -> Option<std::boxed::Box<dyn messagebus::SharedMessage>>{Some(self)}
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {Ok(self)}
|
||||||
fn as_shared_arc(self: std::sync::Arc<Self>) -> Option<std::sync::Arc<dyn messagebus::SharedMessage>>{Some(self)}
|
fn as_shared_arc(self: std::sync::Arc<Self>) -> Option<std::sync::Arc<dyn messagebus::SharedMessage>>{Some(self)}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
quote! {
|
quote! {
|
||||||
fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {None}
|
fn as_shared_ref(&self) -> std::option::Option<&dyn messagebus::SharedMessage> {None}
|
||||||
fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage> {None}
|
fn as_shared_mut(&mut self) -> std::option::Option<&mut dyn messagebus::SharedMessage> {None}
|
||||||
fn as_shared_boxed(self: std::boxed::Box<Self>) -> Option<std::boxed::Box<dyn messagebus::SharedMessage>>{None}
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {Err(self)}
|
||||||
fn as_shared_arc(self: std::sync::Arc<Self>) -> Option<std::sync::Arc<dyn messagebus::SharedMessage>> {None}
|
fn as_shared_arc(self: std::sync::Arc<Self>) -> Option<std::sync::Arc<dyn messagebus::SharedMessage>> {None}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,9 @@ erased-serde = "0.3.16"
|
|||||||
serde_derive = "1.0.130"
|
serde_derive = "1.0.130"
|
||||||
serde = "1.0.130"
|
serde = "1.0.130"
|
||||||
futures = "0.3.17"
|
futures = "0.3.17"
|
||||||
|
cbor = "0.4.1"
|
||||||
|
serde_cbor = "0.11.2"
|
||||||
|
bytes = "1.1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1.11.0", features = ["full"] }
|
tokio = { version = "1.11.0", features = ["full"] }
|
||||||
|
@ -1,5 +1,59 @@
|
|||||||
// #[cfg(feature = "quic")]
|
// #[cfg(feature = "quic")]
|
||||||
mod quic;
|
mod quic;
|
||||||
|
|
||||||
|
use futures::Stream;
|
||||||
|
use messagebus::{error::GenericError, Event, Message, TypeTag};
|
||||||
|
use std::{collections::HashMap, pin::Pin};
|
||||||
|
|
||||||
// #[cfg(feature = "quic")]
|
// #[cfg(feature = "quic")]
|
||||||
pub use quic::*;
|
pub use quic::*;
|
||||||
|
|
||||||
|
pub(crate) type GenericEventStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, GenericError>> + Send>>;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(crate) struct MessageTable {
|
||||||
|
table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageTable {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
table: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add(&mut self, req: TypeTag, resp: TypeTag, err: TypeTag) {
|
||||||
|
self.table.entry(req)
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.push((resp, err));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||||
|
self.table.get(msg).map_or(false, |v| {
|
||||||
|
v.iter().any(|(r, e)| {
|
||||||
|
resp.map_or(true, |resp| resp.as_ref() == r.as_ref())
|
||||||
|
&& err.map_or(true, |err| err.as_ref() == e.as_ref())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn iter_types(&self) -> impl Iterator<Item = (&'_ TypeTag, &'_ TypeTag, &'_ TypeTag)> + '_ {
|
||||||
|
self.table
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| v.iter().map(move |(e, r)| (k, r, e)))
|
||||||
|
.flatten()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl From<Vec<(TypeTag, TypeTag, TypeTag)>> for MessageTable {
|
||||||
|
fn from(table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Self {
|
||||||
|
let mut outgoing_table = MessageTable::default();
|
||||||
|
for (x, y, z) in table {
|
||||||
|
outgoing_table.add(x, y, z);
|
||||||
|
}
|
||||||
|
outgoing_table
|
||||||
|
}
|
||||||
|
}
|
@ -1,15 +1,16 @@
|
|||||||
use crate::error::Error;
|
use crate::{
|
||||||
use futures::Stream;
|
error::Error,
|
||||||
use messagebus::{
|
relays::{GenericEventStream, MessageTable},
|
||||||
error::{GenericError, SendError},
|
|
||||||
Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag,
|
|
||||||
TypeTagAccept,
|
|
||||||
};
|
|
||||||
use std::{
|
|
||||||
collections::{HashMap, HashSet},
|
|
||||||
net::SocketAddr,
|
|
||||||
pin::Pin,
|
|
||||||
};
|
};
|
||||||
|
use futures::{Future, FutureExt, Stream, pin_mut};
|
||||||
|
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, SharedMessage, TypeTag, TypeTagAccept, TypeTagged, error::{GenericError, SendError}};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
use core::slice::SlicePattern;
|
||||||
|
use std::{collections::{HashMap, HashSet}, net::SocketAddr, pin::Pin, sync::atomic::AtomicBool, task::Poll};
|
||||||
|
use tokio::{io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}, sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}};
|
||||||
|
use bytes::{Buf, BufMut};
|
||||||
|
|
||||||
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
|
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
|
||||||
|
|
||||||
@ -35,12 +36,16 @@ impl QuicClientRelayEndpoint {
|
|||||||
|
|
||||||
Ok(Self { endpoint })
|
Ok(Self { endpoint })
|
||||||
}
|
}
|
||||||
pub async fn connect(
|
|
||||||
|
pub fn connect(
|
||||||
&self,
|
&self,
|
||||||
addr: SocketAddr,
|
addr: &SocketAddr,
|
||||||
host: &str,
|
host: &str,
|
||||||
) -> Result<QuicClientConnection, Error> {
|
) -> impl Future<Output = Result<QuicClientConnection, Error>> {
|
||||||
let quinn::NewConnection { connection, .. } = self.endpoint.connect(&addr, host)?.await?;
|
let conn = self.endpoint.connect(addr, host);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let quinn::NewConnection { connection, .. } = conn?.await?;
|
||||||
let (send, recv) = connection.open_bi().await?;
|
let (send, recv) = connection.open_bi().await?;
|
||||||
|
|
||||||
Ok(QuicClientConnection {
|
Ok(QuicClientConnection {
|
||||||
@ -49,6 +54,7 @@ impl QuicClientRelayEndpoint {
|
|||||||
recv,
|
recv,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn wait_idle(&self) {
|
pub async fn wait_idle(&self) {
|
||||||
@ -62,39 +68,233 @@ pub struct QuicClientConnection {
|
|||||||
recv: quinn::RecvStream,
|
recv: quinn::RecvStream,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QuicClientConnection {
|
|
||||||
#[inline]
|
|
||||||
pub fn send(&self, req: Request) -> Result<(), Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TypeTable {
|
|
||||||
type_tags: HashSet<TypeTag>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct QuicClientRelay {
|
pub struct QuicClientRelay {
|
||||||
connection: QuicClientConnection,
|
ready_flag: AtomicBool,
|
||||||
incoming_table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
|
addr: SocketAddr,
|
||||||
outgoing_table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
|
host: String,
|
||||||
|
endpoint: QuicClientRelayEndpoint,
|
||||||
|
outgoing_table: MessageTable,
|
||||||
|
sender: UnboundedSender<Request>,
|
||||||
|
receiver_send: Mutex<Option<(oneshot::Sender<(quinn::RecvStream, quinn::Connection)>, UnboundedReceiver<Request>)>>,
|
||||||
|
receiver_recv: Mutex<Option<oneshot::Receiver<(quinn::RecvStream, quinn::Connection)>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QuicClientRelay {
|
||||||
|
pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, Error> {
|
||||||
|
let endpoint = QuicClientRelayEndpoint::new(cert)?;
|
||||||
|
let mut outgoing_table = MessageTable::from(table);
|
||||||
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
let (recv_send, recv_recv) = oneshot::channel();
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
ready_flag: AtomicBool::new(false),
|
||||||
|
addr,
|
||||||
|
host,
|
||||||
|
endpoint,
|
||||||
|
outgoing_table,
|
||||||
|
sender,
|
||||||
|
receiver_send: Mutex::new(Some((recv_send, receiver))),
|
||||||
|
receiver_recv: Mutex::new(Some(recv_recv)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
#[repr(u16)]
|
||||||
|
pub enum ProtocolHeaderActionKind {
|
||||||
|
Nop,
|
||||||
|
Send,
|
||||||
|
Response,
|
||||||
|
Flush,
|
||||||
|
Flushed,
|
||||||
|
Synchronize,
|
||||||
|
Synchronized,
|
||||||
|
BatchComplete,
|
||||||
|
Close,
|
||||||
|
Exited,
|
||||||
|
Initialize,
|
||||||
|
Ready,
|
||||||
|
Pause,
|
||||||
|
Paused,
|
||||||
|
Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
pub struct ProtocolHeader<'a> {
|
||||||
|
kind: ProtocolHeaderActionKind,
|
||||||
|
type_tag: Option<&'a [u8]>,
|
||||||
|
failed: bool,
|
||||||
|
body_encoding: u32,
|
||||||
|
argument: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ProtocolHeader<'a> {
|
||||||
|
pub fn send(mid: u64, tt: &'a TypeTag) -> ProtocolHeader<'a> {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Send,
|
||||||
|
type_tag: Some(tt.as_bytes()),
|
||||||
|
failed: false,
|
||||||
|
body_encoding: 0,
|
||||||
|
argument: mid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Flush,
|
||||||
|
type_tag: None,
|
||||||
|
failed: false,
|
||||||
|
body_encoding: 0,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Close,
|
||||||
|
type_tag: None,
|
||||||
|
failed: false,
|
||||||
|
body_encoding: 0,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sync() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Synchronize,
|
||||||
|
type_tag: None,
|
||||||
|
failed: false,
|
||||||
|
body_encoding: 0,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Initialize,
|
||||||
|
type_tag: None,
|
||||||
|
failed: false,
|
||||||
|
body_encoding: 0,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pause() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Pause,
|
||||||
|
type_tag: None,
|
||||||
|
failed: false,
|
||||||
|
body_encoding: 0,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
pub struct ProtocolPacket<'a> {
|
||||||
|
header: ProtocolHeader<'a>,
|
||||||
|
body: &'a [u8]
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Request {
|
||||||
|
Init,
|
||||||
|
Flush,
|
||||||
|
Sync,
|
||||||
|
Close,
|
||||||
|
Stats,
|
||||||
|
Send(u64, Box<dyn SharedMessage>, bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Request {
|
||||||
|
pub async fn serialize<W: AsyncWrite + Unpin>(&self, mut header_buff: &mut Vec<u8>, body_buff: &mut Vec<u8>, conn: &mut W) -> Result<(), Error> {
|
||||||
|
body_buff.clear();
|
||||||
|
header_buff.clear();
|
||||||
|
|
||||||
|
let mut tt = TypeTag::Borrowed("");
|
||||||
|
let header = match self {
|
||||||
|
Request::Init => unimplemented!(),
|
||||||
|
Request::Flush => ProtocolHeader::flush(),
|
||||||
|
Request::Sync => ProtocolHeader::sync(),
|
||||||
|
Request::Close => ProtocolHeader::close(),
|
||||||
|
Request::Stats => unimplemented!(),
|
||||||
|
Request::Send(mid, msg, req) => {
|
||||||
|
tt = msg.type_tag();
|
||||||
|
|
||||||
|
let mut cbor_se = serde_cbor::Serializer::new(&mut *body_buff);
|
||||||
|
let mut se = <dyn erased_serde::Serializer>::erase(&mut cbor_se);
|
||||||
|
|
||||||
|
msg.erased_serialize(&mut se);
|
||||||
|
|
||||||
|
ProtocolHeader::send(if *req {*mid} else {0}, &tt)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
serde_cbor::to_writer(&mut header_buff, &ProtocolPacket {
|
||||||
|
header,
|
||||||
|
body: body_buff.as_slice(),
|
||||||
|
}).unwrap();
|
||||||
|
|
||||||
|
let mut buf = [0u8; 16];
|
||||||
|
let mut writer = &mut buf[..];
|
||||||
|
|
||||||
|
writer.put(&b"MBUS"[..]);
|
||||||
|
writer.put_u16(1);
|
||||||
|
writer.put_u16(0);
|
||||||
|
writer.put_u64(header_buff.len() as _);
|
||||||
|
|
||||||
|
conn.write_all(&buf).await?;
|
||||||
|
conn.write_all(&header_buff).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
// buff.
|
||||||
|
|
||||||
|
|
||||||
|
// let x = match self {
|
||||||
|
// Request::Init => unimplemented!(),
|
||||||
|
// Request::Flush => ProtocolHeader::Flush,
|
||||||
|
// Request::Sync => ProtocolHeader::Synchronize,
|
||||||
|
// Request::Close => ProtocolHeader::Close,
|
||||||
|
// Request::Stats => unimplemented!(),
|
||||||
|
// Request::Send(_mid, msg) => {
|
||||||
|
// let ser = serde_cbor::Serializer::new(buff);
|
||||||
|
|
||||||
|
// ProtocolHeader::Send(msg.type_tag())
|
||||||
|
// }
|
||||||
|
|
||||||
|
// Request::Request(mid, msg) => {
|
||||||
|
// ProtocolHeader::Request(*mid, msg.type_tag())
|
||||||
|
// }
|
||||||
|
// };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Action> for Request {
|
||||||
|
fn from(action: Action) -> Self {
|
||||||
|
match action {
|
||||||
|
Action::Init => Request::Init,
|
||||||
|
Action::Flush => Request::Flush,
|
||||||
|
Action::Sync => Request::Sync,
|
||||||
|
Action::Close => Request::Close,
|
||||||
|
Action::Stats => Request::Stats,
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<(bool, u64, Box<dyn SharedMessage>)> for Request {
|
||||||
|
fn from((req, mid, msg): (bool, u64, Box<dyn SharedMessage>)) -> Self {
|
||||||
|
Request::Send(mid, msg, req)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TypeTagAccept for QuicClientRelay {
|
impl TypeTagAccept for QuicClientRelay {
|
||||||
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||||
self.outgoing_table.get(msg).map_or(false, |v| {
|
self.outgoing_table.accept(msg, resp, err)
|
||||||
v.into_iter().any(|(r, e)| {
|
|
||||||
resp.map_or(true, |resp| resp.as_ref() == r.as_ref())
|
|
||||||
&& err.map_or(true, |err| err.as_ref() == e.as_ref())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
|
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
|
||||||
let iter = self
|
let iter = self.outgoing_table.iter_types();
|
||||||
.outgoing_table
|
|
||||||
.iter()
|
|
||||||
.map(|(k, v)| v.iter().map(move |(e, r)| (k, r, e)))
|
|
||||||
.flatten();
|
|
||||||
|
|
||||||
for (m, r, e) in iter {
|
for (m, r, e) in iter {
|
||||||
if cb(m, r, e) {
|
if cb(m, r, e) {
|
||||||
@ -104,28 +304,27 @@ impl TypeTagAccept for QuicClientRelay {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Request {
|
|
||||||
Flush,
|
|
||||||
Sync,
|
|
||||||
Close,
|
|
||||||
Stats,
|
|
||||||
Send(u64, Box<dyn Message>),
|
|
||||||
Request(u64, Box<dyn Message>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendUntypedReceiver for QuicClientRelay {
|
impl SendUntypedReceiver for QuicClientRelay {
|
||||||
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
|
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
|
||||||
self.connection
|
match msg {
|
||||||
.send(match msg {
|
Action::Init => {
|
||||||
Action::Init => return Ok(()),
|
let (sender, mut rx) = self.receiver_send.lock().take().unwrap();
|
||||||
Action::Flush => Request::Flush,
|
let conn = self.endpoint.connect(&self.addr, &self.host);
|
||||||
Action::Sync => Request::Sync,
|
|
||||||
Action::Close => Request::Close,
|
tokio::spawn(async move {
|
||||||
Action::Stats => Request::Stats,
|
let mut conn = conn.await.unwrap();
|
||||||
_ => unimplemented!(),
|
sender.send((conn.recv, conn.connection)).unwrap();
|
||||||
})
|
let mut buf1 = Vec::new();
|
||||||
.unwrap();
|
let mut buf2 = Vec::new();
|
||||||
|
|
||||||
|
while let Some(r) = rx.recv().await {
|
||||||
|
r.serialize(&mut buf1, &mut buf2, &mut conn.send);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
other => self.sender.send(other.into()).unwrap(),
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -137,26 +336,121 @@ impl SendUntypedReceiver for QuicClientRelay {
|
|||||||
req: bool,
|
req: bool,
|
||||||
_bus: &Bus,
|
_bus: &Bus,
|
||||||
) -> Result<(), SendError<Box<dyn Message>>> {
|
) -> Result<(), SendError<Box<dyn Message>>> {
|
||||||
self.connection
|
msg.as_shared_boxed()
|
||||||
.send(if req {
|
self.sender.send((req, mid, msg).into()).unwrap();
|
||||||
Request::Request(mid, msg)
|
|
||||||
} else {
|
|
||||||
Request::Send(mid, msg)
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReciveUntypedReceiver for QuicClientRelay {
|
impl ReciveUntypedReceiver for QuicClientRelay {
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, GenericError>> + Send>>;
|
type Stream = GenericEventStream;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, bus: Bus) -> Self::Stream {
|
||||||
// let mut rx = self.srx.lock().take().unwrap();
|
let recv = self.receiver_recv.lock().take().unwrap();
|
||||||
|
|
||||||
// Box::pin(futures::stream::poll_fn(move |cx|rx.poll_recv(cx)))
|
Box::pin(async move {
|
||||||
Box::pin(futures::stream::poll_fn(move |_cx| {
|
let buff = Vec::with_capacity(1024);
|
||||||
std::task::Poll::Pending
|
let (recv, conn) = recv.await.unwrap();
|
||||||
|
futures::stream::unfold(
|
||||||
|
(true, recv, conn, bus, buff),
|
||||||
|
|(first, mut recv, conn, bus, mut buff)| async move {
|
||||||
|
if first {
|
||||||
|
return Some((Event::Ready, (false, recv, conn, bus, buff)));
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe { buff.set_len(16) };
|
||||||
|
recv.read_exact(&mut buff).await.unwrap();
|
||||||
|
|
||||||
|
let mut reader = &buff[..];
|
||||||
|
let mut sign = [0u8; 4];
|
||||||
|
reader.copy_to_slice(&mut sign);
|
||||||
|
assert!(&sign != b"MBUS");
|
||||||
|
|
||||||
|
let version = reader.get_u16();
|
||||||
|
assert!(version == 1);
|
||||||
|
|
||||||
|
let content_type = reader.get_u16();
|
||||||
|
|
||||||
|
let body_size = reader.get_u64();
|
||||||
|
let diff = buff.capacity() as i64 - body_size as i64;
|
||||||
|
if diff < 0 {
|
||||||
|
buff.reserve(-diff as usize);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe { buff.set_len(body_size as usize); }
|
||||||
|
recv.read_exact(&mut buff).await.unwrap();
|
||||||
|
|
||||||
|
let event = match content_type {
|
||||||
|
0 => { // CBOR
|
||||||
|
let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap();
|
||||||
|
|
||||||
|
use ProtocolHeaderActionKind::*;
|
||||||
|
match proto.header.kind {
|
||||||
|
Nop => unimplemented!(),
|
||||||
|
Response => {
|
||||||
|
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
||||||
|
|
||||||
|
let res = if proto.header.failed {
|
||||||
|
Err(
|
||||||
|
messagebus::error::Error::Other(GenericError {
|
||||||
|
type_tag: tt.into(),
|
||||||
|
description: "unknown".into()
|
||||||
|
}
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
let mut cbor_de = serde_cbor::Deserializer::from_slice(proto.body);
|
||||||
|
let mut de = <dyn erased_serde::Deserializer>::erase(&mut cbor_de);
|
||||||
|
|
||||||
|
bus.deserialize_message(tt.into(), &mut de)
|
||||||
|
.map_err(|x| x.map_msg(|_| ()))
|
||||||
|
};
|
||||||
|
|
||||||
|
Event::Response(proto.header.argument, res)
|
||||||
|
}
|
||||||
|
Ready => {
|
||||||
|
if proto.header.failed {
|
||||||
|
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
||||||
|
Event::InitFailed(messagebus::error::Error::Other(GenericError {
|
||||||
|
type_tag: tt.into(),
|
||||||
|
description: "unknown".into()
|
||||||
}))
|
}))
|
||||||
|
} else {
|
||||||
|
Event::Ready
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Exited => Event::Exited,
|
||||||
|
Pause => Event::Pause,
|
||||||
|
BatchComplete => Event::Finished(proto.header.argument),
|
||||||
|
Flushed => Event::Flushed,
|
||||||
|
Error => {
|
||||||
|
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
||||||
|
Event::Error(GenericError {
|
||||||
|
type_tag: tt.into(),
|
||||||
|
description: "unknown".into()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Synchronized => {
|
||||||
|
if proto.header.failed {
|
||||||
|
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
||||||
|
Event::Synchronized( Err(messagebus::error::Error::Other(GenericError {
|
||||||
|
type_tag: tt.into(),
|
||||||
|
description: "unknown".into()
|
||||||
|
})))
|
||||||
|
} else {
|
||||||
|
Event::Synchronized(Ok(()))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => unimplemented!()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => unimplemented!()
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((event, (false, recv, conn, bus, buff)))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}.flatten_stream())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ pub trait Message: MessageBounds {
|
|||||||
|
|
||||||
fn as_shared_ref(&self) -> Option<&dyn SharedMessage>;
|
fn as_shared_ref(&self) -> Option<&dyn SharedMessage>;
|
||||||
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage>;
|
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage>;
|
||||||
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>>;
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>>;
|
||||||
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>>;
|
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>>;
|
||||||
|
|
||||||
fn try_clone_into(&self, into: &mut dyn Any) -> bool;
|
fn try_clone_into(&self, into: &mut dyn Any) -> bool;
|
||||||
@ -123,8 +123,8 @@ impl Message for () {
|
|||||||
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
||||||
Some(self)
|
Some(self)
|
||||||
}
|
}
|
||||||
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
|
||||||
Some(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
||||||
Some(self)
|
Some(self)
|
||||||
@ -221,8 +221,8 @@ mod tests {
|
|||||||
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
|
||||||
None
|
Err(self)
|
||||||
}
|
}
|
||||||
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
||||||
None
|
None
|
||||||
@ -270,8 +270,8 @@ mod tests {
|
|||||||
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
|
||||||
None
|
Err(self)
|
||||||
}
|
}
|
||||||
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
||||||
None
|
None
|
||||||
@ -328,8 +328,8 @@ mod tests {
|
|||||||
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
fn as_shared_mut(&mut self) -> Option<&mut dyn SharedMessage> {
|
||||||
Some(self)
|
Some(self)
|
||||||
}
|
}
|
||||||
fn as_shared_boxed(self: Box<Self>) -> Option<Box<dyn SharedMessage>> {
|
fn as_shared_boxed(self: Box<Self>) -> Result<Box<dyn SharedMessage>, Box<dyn Message>> {
|
||||||
Some(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
fn as_shared_arc(self: Arc<Self>) -> Option<Arc<dyn SharedMessage>> {
|
||||||
Some(self)
|
Some(self)
|
||||||
|
@ -29,6 +29,13 @@ impl GenericError {
|
|||||||
description: format!("{}[{}]", err.type_tag(), err),
|
description: format!("{}[{}]", err.type_tag(), err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_err(tt: TypeTag, err: impl fmt::Display) -> Self {
|
||||||
|
GenericError {
|
||||||
|
description: format!("{}[{}]", tt, err),
|
||||||
|
type_tag: tt,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for GenericError {
|
impl fmt::Display for GenericError {
|
||||||
|
@ -54,13 +54,13 @@ where
|
|||||||
{
|
{
|
||||||
type Stream: Stream<Item = Event<M, E>> + Send;
|
type Stream: Stream<Item = Event<M, E>> + Send;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream;
|
fn event_stream(&self, bus: Bus) -> Self::Stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ReciveUntypedReceiver: Sync {
|
pub trait ReciveUntypedReceiver: Sync {
|
||||||
type Stream: Stream<Item = Event<Box<dyn Message>, GenericError>> + Send;
|
type Stream: Stream<Item = Event<Box<dyn Message>, GenericError>> + Send;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream;
|
fn event_stream(&self, bus: Bus) -> Self::Stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
|
pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
|
||||||
@ -182,10 +182,10 @@ where
|
|||||||
S: SendUntypedReceiver + ReciveTypedReceiver<R, E> + Send + Sync + 'static,
|
S: SendUntypedReceiver + ReciveTypedReceiver<R, E> + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
fn start_polling_events(self: Arc<Self>) -> BusPollerCallback {
|
fn start_polling_events(self: Arc<Self>) -> BusPollerCallback {
|
||||||
Box::new(move |_| {
|
Box::new(move |bus| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
let events = this.inner.event_stream();
|
let events = this.inner.event_stream(bus);
|
||||||
pin_mut!(events);
|
pin_mut!(events);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -137,7 +137,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -140,7 +140,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -138,7 +138,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -144,7 +144,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -111,7 +111,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -115,7 +115,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -111,7 +111,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -112,7 +112,7 @@ where
|
|||||||
{
|
{
|
||||||
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
type Stream = Pin<Box<dyn Stream<Item = Event<R, E>> + Send>>;
|
||||||
|
|
||||||
fn event_stream(&self) -> Self::Stream {
|
fn event_stream(&self, _: Bus) -> Self::Stream {
|
||||||
let mut rx = self.srx.lock().take().unwrap();
|
let mut rx = self.srx.lock().take().unwrap();
|
||||||
|
|
||||||
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
Box::pin(futures::stream::poll_fn(move |cx| rx.poll_recv(cx)))
|
||||||
|
@ -221,10 +221,10 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn start_polling(self: Arc<Self>) -> BusPollerCallback {
|
fn start_polling(self: Arc<Self>) -> BusPollerCallback {
|
||||||
Box::new(move |_| {
|
Box::new(move |bus| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
let events = this.inner.event_stream();
|
let events = this.inner.event_stream(bus);
|
||||||
pin_mut!(events);
|
pin_mut!(events);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
Loading…
Reference in New Issue
Block a user