revert upgrate to 2021 edition
This commit is contained in:
parent
bed35670c6
commit
16690db44c
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "messagebus"
|
name = "messagebus"
|
||||||
version = "0.9.6"
|
version = "0.9.7"
|
||||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||||
@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
|
|||||||
description = "MessageBus allows intercommunicate with messages between modules"
|
description = "MessageBus allows intercommunicate with messages between modules"
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||||
edition = "2021"
|
edition = "2018"
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "messagebus_derive"
|
name = "messagebus_derive"
|
||||||
version = "0.2.1"
|
version = "0.2.2"
|
||||||
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
|
||||||
repository = "https://github.com/andreytkachenko/messagebus.git"
|
repository = "https://github.com/andreytkachenko/messagebus.git"
|
||||||
keywords = ["futures", "async", "tokio", "message", "bus"]
|
keywords = ["futures", "async", "tokio", "message", "bus"]
|
||||||
@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
|
|||||||
description = "MessageBus allows intercommunicate with messages between modules"
|
description = "MessageBus allows intercommunicate with messages between modules"
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||||
edition = "2021"
|
edition = "2018"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
proc-macro = true
|
proc-macro = true
|
||||||
|
@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"]
|
|||||||
description = "MessageBus remote allows intercommunicate by messages between instances"
|
description = "MessageBus remote allows intercommunicate by messages between instances"
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
|
||||||
edition = "2021"
|
edition = "2018"
|
||||||
|
|
||||||
# [features]
|
# [features]
|
||||||
# quic = ["quinn"]
|
# quic = ["quinn"]
|
||||||
@ -16,7 +16,7 @@ edition = "2021"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
messagebus = { path="../../" }
|
messagebus = { path="../../" }
|
||||||
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
|
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time", "io-util"] }
|
||||||
parking_lot = "0.11"
|
parking_lot = "0.11"
|
||||||
quinn = "0.7"
|
quinn = "0.7"
|
||||||
rmp = "0.8.10"
|
rmp = "0.8.10"
|
||||||
@ -28,8 +28,16 @@ futures = "0.3.17"
|
|||||||
cbor = "0.4.1"
|
cbor = "0.4.1"
|
||||||
serde_cbor = "0.11.2"
|
serde_cbor = "0.11.2"
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
|
quinn-proto = "0.7.3"
|
||||||
|
rustls = "0.19.1"
|
||||||
|
redis = "0.21.2"
|
||||||
|
bitflags = "1.3.2"
|
||||||
|
serde_json = "1.0.68"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
anyhow = "1.0.44"
|
||||||
|
async-trait = "0.1.51"
|
||||||
|
thiserror = "1.0.29"
|
||||||
tokio = { version = "1.11.0", features = ["full"] }
|
tokio = { version = "1.11.0", features = ["full"] }
|
||||||
# quinn = { version = "0.7", optional = true }
|
# quinn = { version = "0.7", optional = true }
|
||||||
|
|
||||||
|
@ -1,2 +0,0 @@
|
|||||||
#[tokio::main]
|
|
||||||
async fn main() {}
|
|
BIN
crates/remote/examples/cert.der
Normal file
BIN
crates/remote/examples/cert.der
Normal file
Binary file not shown.
BIN
crates/remote/examples/cert.key
Normal file
BIN
crates/remote/examples/cert.key
Normal file
Binary file not shown.
58
crates/remote/examples/quic_client.rs
Normal file
58
crates/remote/examples/quic_client.rs
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
use messagebus::Bus;
|
||||||
|
use messagebus_remote::relays::QuicClientRelay;
|
||||||
|
use serde_derive::{Serialize, Deserialize};
|
||||||
|
use messagebus::{Message, derive::Message};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
|
||||||
|
#[namespace("example")]
|
||||||
|
#[message(shared, clone)]
|
||||||
|
pub struct Req {
|
||||||
|
data: i32,
|
||||||
|
text: String
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
|
||||||
|
#[namespace("example")]
|
||||||
|
#[message(shared, clone)]
|
||||||
|
pub struct Resp {
|
||||||
|
data: i32,
|
||||||
|
text: String
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let relay = QuicClientRelay::new(
|
||||||
|
"./examples/cert.der",
|
||||||
|
"127.0.0.1:8083".parse().unwrap(),
|
||||||
|
"localhost".into(),
|
||||||
|
vec![
|
||||||
|
("example::Req".into(), "example::Resp".into(), "GenericError".into())
|
||||||
|
]
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let (b, poller) = Bus::build()
|
||||||
|
.register_shared_message::<Req>()
|
||||||
|
.register_shared_message::<Resp>()
|
||||||
|
.register_relay(relay)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
b.ready().await;
|
||||||
|
println!("ready");
|
||||||
|
|
||||||
|
let resp: Resp = b.request(Req {
|
||||||
|
data: 12,
|
||||||
|
text: String::from("test")
|
||||||
|
}, Default::default())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
println!("resp {:?}", resp);
|
||||||
|
|
||||||
|
b.flush().await;
|
||||||
|
b.close().await;
|
||||||
|
poller.await;
|
||||||
|
}
|
85
crates/remote/examples/quic_server.rs
Normal file
85
crates/remote/examples/quic_server.rs
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
use messagebus::{error, Message, derive::{Message, Error as MbError}, AsyncHandler, Bus};
|
||||||
|
use messagebus_remote::relays::{QuicServerRelay};
|
||||||
|
use serde_derive::{Serialize, Deserialize};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Error, MbError)]
|
||||||
|
enum Error {
|
||||||
|
#[error("Error({0})")]
|
||||||
|
Error(anyhow::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Message> From<error::Error<M>> for Error {
|
||||||
|
fn from(err: error::Error<M>) -> Self {
|
||||||
|
Self::Error(err.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
|
||||||
|
#[namespace("example")]
|
||||||
|
#[message(shared, clone)]
|
||||||
|
pub struct Req {
|
||||||
|
data: i32,
|
||||||
|
text: String
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
|
||||||
|
#[namespace("example")]
|
||||||
|
#[message(shared, clone)]
|
||||||
|
pub struct Resp {
|
||||||
|
data: i32,
|
||||||
|
text: String
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct TmpReceiver;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AsyncHandler<Req> for TmpReceiver {
|
||||||
|
type Error = Error;
|
||||||
|
type Response = Resp;
|
||||||
|
|
||||||
|
async fn handle(&self, msg: Req, _bus: &Bus) -> Result<Self::Response, Self::Error> {
|
||||||
|
println!("TmpReceiver::handle {:?}", msg);
|
||||||
|
Ok(Resp {
|
||||||
|
data: msg.data + 12,
|
||||||
|
text: format!("<< {} >>", msg.text),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
|
||||||
|
println!("TmpReceiver::sync");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let relay = QuicServerRelay::new(
|
||||||
|
"./examples/cert.key",
|
||||||
|
"./examples/cert.der",
|
||||||
|
"0.0.0.0:8083".parse().unwrap(),
|
||||||
|
vec![
|
||||||
|
("example::Req".into(), "example::Resp".into(), "GenericError".into())
|
||||||
|
]
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let (b, poller) = Bus::build()
|
||||||
|
.register_shared_message::<Req>()
|
||||||
|
.register_shared_message::<Resp>()
|
||||||
|
.register_relay(relay)
|
||||||
|
.register(TmpReceiver)
|
||||||
|
.subscribe_async::<Req>(8, Default::default())
|
||||||
|
.done()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
b.ready().await;
|
||||||
|
|
||||||
|
println!("ready");
|
||||||
|
|
||||||
|
poller.await;
|
||||||
|
}
|
@ -24,4 +24,22 @@ pub enum Error {
|
|||||||
|
|
||||||
#[error("QuinnConnectError: {0}")]
|
#[error("QuinnConnectError: {0}")]
|
||||||
QuinnParseError(#[from] ParseError),
|
QuinnParseError(#[from] ParseError),
|
||||||
|
|
||||||
|
#[error("ConfigError: {0}")]
|
||||||
|
ConfigError(#[from] quinn_proto::ConfigError),
|
||||||
|
|
||||||
|
#[error("TLSError: {0}")]
|
||||||
|
TLSError(#[from] rustls::TLSError),
|
||||||
|
|
||||||
|
#[error("Redis: {0}")]
|
||||||
|
Redis(#[from] redis::RedisError),
|
||||||
|
|
||||||
|
#[error("ProtocolParseError")]
|
||||||
|
ProtocolParseError,
|
||||||
|
|
||||||
|
#[error("UnknownCodec")]
|
||||||
|
UnknownCodec,
|
||||||
|
|
||||||
|
#[error("SerdeErased {0}")]
|
||||||
|
SerdeErased(#[from] erased_serde::Error),
|
||||||
}
|
}
|
||||||
|
@ -1,2 +1,3 @@
|
|||||||
pub mod error;
|
pub mod error;
|
||||||
|
pub mod proto;
|
||||||
pub mod relays;
|
pub mod relays;
|
||||||
|
510
crates/remote/src/proto.rs
Normal file
510
crates/remote/src/proto.rs
Normal file
@ -0,0 +1,510 @@
|
|||||||
|
use messagebus::{Action, Bus, Event, SharedMessage, TypeTag};
|
||||||
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
use std::borrow::Cow;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
#[repr(u16)]
|
||||||
|
pub enum ProtocolHeaderActionKind {
|
||||||
|
Nop,
|
||||||
|
Error,
|
||||||
|
|
||||||
|
Send,
|
||||||
|
Response,
|
||||||
|
BatchComplete,
|
||||||
|
|
||||||
|
Flush,
|
||||||
|
Flushed,
|
||||||
|
|
||||||
|
Synchronize,
|
||||||
|
Synchronized,
|
||||||
|
|
||||||
|
Close,
|
||||||
|
Exited,
|
||||||
|
|
||||||
|
Initialize,
|
||||||
|
Ready,
|
||||||
|
|
||||||
|
Pause,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone, Deserialize, Serialize)]
|
||||||
|
#[non_exhaustive]
|
||||||
|
#[repr(u32)]
|
||||||
|
pub enum BodyType {
|
||||||
|
None,
|
||||||
|
Utf8,
|
||||||
|
Cbor,
|
||||||
|
MessagePack,
|
||||||
|
Json,
|
||||||
|
Bson,
|
||||||
|
}
|
||||||
|
|
||||||
|
bitflags::bitflags! {
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
pub struct ProtocolHeaderFlags: u32 {
|
||||||
|
const TYPE_TAG = 0b00000001;
|
||||||
|
const BODY = 0b00000010;
|
||||||
|
const ERROR = 0b00000100;
|
||||||
|
const ARGUMENT = 0b00001000;
|
||||||
|
const TT_AND_ERROR = Self::TYPE_TAG.bits | Self::ERROR.bits;
|
||||||
|
const TT_AND_BODY = Self::TYPE_TAG.bits | Self::BODY.bits;
|
||||||
|
const TT_ERROR_AND_ARGUMENT = Self::TYPE_TAG.bits | Self::ERROR.bits | Self::ARGUMENT.bits;
|
||||||
|
const TT_BODY_AND_ARGUMENT = Self::TYPE_TAG.bits | Self::BODY.bits | Self::ARGUMENT.bits;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct ProtocolHeader<'a> {
|
||||||
|
pub kind: ProtocolHeaderActionKind,
|
||||||
|
pub type_tag: Option<Cow<'a, [u8]>>,
|
||||||
|
pub flags: ProtocolHeaderFlags,
|
||||||
|
pub body_type: BodyType,
|
||||||
|
pub argument: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ProtocolHeader<'a> {
|
||||||
|
pub fn send(mid: u64, tt: &'a TypeTag, body_type: BodyType) -> ProtocolHeader<'a> {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Send,
|
||||||
|
type_tag: Some(tt.as_bytes().into()),
|
||||||
|
flags: ProtocolHeaderFlags::TT_BODY_AND_ARGUMENT,
|
||||||
|
body_type,
|
||||||
|
argument: mid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Flush,
|
||||||
|
type_tag: None,
|
||||||
|
flags: ProtocolHeaderFlags::empty(),
|
||||||
|
body_type: BodyType::None,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Close,
|
||||||
|
type_tag: None,
|
||||||
|
flags: ProtocolHeaderFlags::empty(),
|
||||||
|
body_type: BodyType::None,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sync() -> Self {
|
||||||
|
ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Synchronize,
|
||||||
|
type_tag: None,
|
||||||
|
flags: ProtocolHeaderFlags::empty(),
|
||||||
|
body_type: BodyType::None,
|
||||||
|
argument: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pub fn init() -> Self {
|
||||||
|
// ProtocolHeader {
|
||||||
|
// kind: ProtocolHeaderActionKind::Initialize,
|
||||||
|
// type_tag: None,
|
||||||
|
// failed: false,
|
||||||
|
// body_encoding: 0,
|
||||||
|
// argument: 0,
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// pub fn pause() -> Self {
|
||||||
|
// ProtocolHeader {
|
||||||
|
// kind: ProtocolHeaderActionKind::Pause,
|
||||||
|
// type_tag: None,
|
||||||
|
// failed: false,
|
||||||
|
// body_encoding: 0,
|
||||||
|
// argument: 0,
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ProtocolItem {
|
||||||
|
Nop,
|
||||||
|
Event(Event<Box<dyn SharedMessage>, messagebus::error::GenericError>),
|
||||||
|
Action(Action),
|
||||||
|
Send(u64, Box<dyn SharedMessage>, bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Action> for ProtocolItem {
|
||||||
|
fn from(action: Action) -> Self {
|
||||||
|
ProtocolItem::Action(action)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Event<Box<dyn SharedMessage>, messagebus::error::GenericError>> for ProtocolItem {
|
||||||
|
fn from(ev: Event<Box<dyn SharedMessage>, messagebus::error::GenericError>) -> Self {
|
||||||
|
ProtocolItem::Event(ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<(u64, Box<dyn SharedMessage>, bool)> for ProtocolItem {
|
||||||
|
fn from(msg: (u64, Box<dyn SharedMessage>, bool)) -> Self {
|
||||||
|
ProtocolItem::Send(msg.0, msg.1, msg.2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProtocolItem {
|
||||||
|
pub fn unwrap_send(self) -> Result<(u64, Box<dyn SharedMessage>, bool), ProtocolItem> {
|
||||||
|
match self {
|
||||||
|
ProtocolItem::Send(a, b, c) => Ok((a, b, c)),
|
||||||
|
other => Err(other)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(self, mut body_type: BodyType, body_buff: &mut Vec<u8>) -> Result<ProtocolPacket<'_>, crate::error::Error> {
|
||||||
|
let mut argument = 0;
|
||||||
|
let mut type_tag = None;
|
||||||
|
let mut body = None;
|
||||||
|
let mut flags = ProtocolHeaderFlags::empty();
|
||||||
|
|
||||||
|
let kind = match self {
|
||||||
|
ProtocolItem::Nop => ProtocolHeaderActionKind::Nop,
|
||||||
|
ProtocolItem::Action(action) => match action {
|
||||||
|
Action::Close => ProtocolHeaderActionKind::Close,
|
||||||
|
Action::Flush => ProtocolHeaderActionKind::Flush,
|
||||||
|
Action::Init => ProtocolHeaderActionKind::Initialize,
|
||||||
|
Action::Sync => ProtocolHeaderActionKind::Synchronize,
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
ProtocolItem::Send(mid, msg, req) => {
|
||||||
|
let msg = msg.as_shared_boxed()
|
||||||
|
.map_err(|_| crate::error::Error::UnknownCodec)?;
|
||||||
|
|
||||||
|
argument = mid;
|
||||||
|
flags.set(ProtocolHeaderFlags::ARGUMENT, req);
|
||||||
|
flags.set(ProtocolHeaderFlags::BODY, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
|
||||||
|
type_tag = Some(msg.type_tag());
|
||||||
|
body = Some(generic_serialize(body_type, &*msg, body_buff)?);
|
||||||
|
|
||||||
|
ProtocolHeaderActionKind::Send
|
||||||
|
},
|
||||||
|
ProtocolItem::Event(ev) => match ev {
|
||||||
|
Event::Response(mid, res) => {
|
||||||
|
argument = mid;
|
||||||
|
flags.set(ProtocolHeaderFlags::ARGUMENT, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::BODY, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(msg) => {
|
||||||
|
let msg = msg.as_shared_boxed()
|
||||||
|
.map_err(|_| crate::error::Error::UnknownCodec)?;
|
||||||
|
|
||||||
|
type_tag = Some(msg.type_tag());
|
||||||
|
body = Some(generic_serialize(body_type, &*msg, body_buff)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
flags.set(ProtocolHeaderFlags::ERROR, true);
|
||||||
|
|
||||||
|
type_tag = Some("GenericError".into());
|
||||||
|
body_type = BodyType::Utf8;
|
||||||
|
body = Some(format!("{}", err).into_bytes().into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ProtocolHeaderActionKind::Response
|
||||||
|
},
|
||||||
|
Event::Error(err) => {
|
||||||
|
flags.set(ProtocolHeaderFlags::ERROR, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::BODY, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
|
||||||
|
|
||||||
|
type_tag = Some("GenericError".into());
|
||||||
|
body_type = BodyType::Utf8;
|
||||||
|
body = Some(format!("{}", err).into_bytes().into());
|
||||||
|
|
||||||
|
ProtocolHeaderActionKind::Error
|
||||||
|
}
|
||||||
|
Event::Finished(n) => {
|
||||||
|
argument = n;
|
||||||
|
flags.set(ProtocolHeaderFlags::ARGUMENT, true);
|
||||||
|
ProtocolHeaderActionKind::BatchComplete
|
||||||
|
},
|
||||||
|
Event::Synchronized(res) => {
|
||||||
|
match res {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
flags.set(ProtocolHeaderFlags::BODY, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::ERROR, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
|
||||||
|
|
||||||
|
type_tag = Some("GenericError".into());
|
||||||
|
body_type = BodyType::Utf8;
|
||||||
|
body = Some(format!("{}", err).into_bytes().into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ProtocolHeaderActionKind::Synchronized
|
||||||
|
}
|
||||||
|
Event::InitFailed(err) => {
|
||||||
|
flags.set(ProtocolHeaderFlags::BODY, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::ERROR, true);
|
||||||
|
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
|
||||||
|
|
||||||
|
type_tag = Some("GenericError".into());
|
||||||
|
body_type = BodyType::Utf8;
|
||||||
|
body = Some(format!("{}", err).into_bytes().into());
|
||||||
|
|
||||||
|
ProtocolHeaderActionKind::Ready
|
||||||
|
},
|
||||||
|
Event::Ready => ProtocolHeaderActionKind::Ready,
|
||||||
|
Event::Pause => ProtocolHeaderActionKind::Pause,
|
||||||
|
Event::Exited => ProtocolHeaderActionKind::Exited,
|
||||||
|
Event::Flushed => ProtocolHeaderActionKind::Flushed,
|
||||||
|
_ => unimplemented!()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(ProtocolPacket {
|
||||||
|
header: ProtocolHeader {
|
||||||
|
kind,
|
||||||
|
type_tag: type_tag.map(|x| x.to_string().into_bytes().into()),
|
||||||
|
flags,
|
||||||
|
body_type,
|
||||||
|
argument,
|
||||||
|
},
|
||||||
|
body
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct ProtocolPacket<'a> {
|
||||||
|
pub header: ProtocolHeader<'a>,
|
||||||
|
pub body: Option<Cow<'a, [u8]>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ProtocolPacket<'a> {
|
||||||
|
pub fn deserialize(
|
||||||
|
self,
|
||||||
|
bus: &Bus,
|
||||||
|
) -> Result<ProtocolItem, crate::error::Error>
|
||||||
|
{
|
||||||
|
let type_tag: Option<TypeTag> = if self.header.flags.contains(ProtocolHeaderFlags::TYPE_TAG) {
|
||||||
|
self.header
|
||||||
|
.type_tag
|
||||||
|
.map(|x| String::from_utf8_lossy(x.as_ref()).to_string().into())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let (body, error) = if self.header.flags.contains(ProtocolHeaderFlags::ERROR) {
|
||||||
|
let error = messagebus::error::GenericError {
|
||||||
|
type_tag: type_tag.unwrap(),
|
||||||
|
description: self.body.map(|x|String::from_utf8_lossy(x.as_ref()).to_string()).unwrap_or_default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
(None, Some(messagebus::error::Error::Other(error)))
|
||||||
|
} else if self.header.flags.contains(ProtocolHeaderFlags::TT_AND_BODY) {
|
||||||
|
let body = self.body.ok_or(crate::error::Error::ProtocolParseError)?;
|
||||||
|
let res = generic_deserialize(self.header.body_type, body.as_ref(), |de| {
|
||||||
|
bus.deserialize_message(type_tag.unwrap(), de)
|
||||||
|
.map_err(|x| x.map_msg(|_| ()))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(body) => (Some(body), None),
|
||||||
|
Err(err) => (None, Some(err)),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(None, None)
|
||||||
|
};
|
||||||
|
|
||||||
|
let argument = if self.header.flags.contains(ProtocolHeaderFlags::ARGUMENT) {
|
||||||
|
Some(self.header.argument)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(ProtocolItem::Event(match self.header.kind {
|
||||||
|
ProtocolHeaderActionKind::Response => Event::Response(
|
||||||
|
argument
|
||||||
|
.ok_or(crate::error::Error::ProtocolParseError)?,
|
||||||
|
error
|
||||||
|
.map(Err)
|
||||||
|
.or_else(|| body.map(Ok))
|
||||||
|
.ok_or(crate::error::Error::ProtocolParseError)?,
|
||||||
|
),
|
||||||
|
ProtocolHeaderActionKind::Synchronized => {
|
||||||
|
Event::Synchronized(error.map(Err).unwrap_or(Ok(())))
|
||||||
|
}
|
||||||
|
ProtocolHeaderActionKind::Error => {
|
||||||
|
Event::Error(error.ok_or(crate::error::Error::ProtocolParseError)?)
|
||||||
|
}
|
||||||
|
ProtocolHeaderActionKind::BatchComplete => Event::Finished(self.header.argument),
|
||||||
|
ProtocolHeaderActionKind::Flushed => Event::Flushed,
|
||||||
|
ProtocolHeaderActionKind::Exited => Event::Exited,
|
||||||
|
ProtocolHeaderActionKind::Ready => Event::Ready,
|
||||||
|
ProtocolHeaderActionKind::Pause => Event::Pause,
|
||||||
|
|
||||||
|
other => return Ok(ProtocolItem::Action(match other {
|
||||||
|
ProtocolHeaderActionKind::Initialize => Action::Init,
|
||||||
|
ProtocolHeaderActionKind::Close => Action::Close,
|
||||||
|
ProtocolHeaderActionKind::Flush => Action::Flush,
|
||||||
|
ProtocolHeaderActionKind::Synchronize => Action::Sync,
|
||||||
|
ProtocolHeaderActionKind::Send => {
|
||||||
|
let req = argument.is_some();
|
||||||
|
let mid = self.header.argument;
|
||||||
|
let body = body.ok_or(crate::error::Error::ProtocolParseError)?;
|
||||||
|
|
||||||
|
return Ok(ProtocolItem::Send(mid, body, req));
|
||||||
|
},
|
||||||
|
ProtocolHeaderActionKind::Nop => return Ok(ProtocolItem::Nop),
|
||||||
|
|
||||||
|
_ => unreachable!()
|
||||||
|
})),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn generic_deserialize<F, T>(k: BodyType, data: &[u8], f: F) -> Result<T, crate::error::Error>
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut dyn erased_serde::Deserializer<'_>) -> T,
|
||||||
|
{
|
||||||
|
match k {
|
||||||
|
BodyType::Cbor => {
|
||||||
|
let mut cbor_de = serde_cbor::Deserializer::from_slice(data);
|
||||||
|
let mut de = <dyn erased_serde::Deserializer>::erase(&mut cbor_de);
|
||||||
|
|
||||||
|
Ok(f(&mut de))
|
||||||
|
}
|
||||||
|
|
||||||
|
BodyType::Json => {
|
||||||
|
let mut json_de = serde_json::Deserializer::from_slice(data);
|
||||||
|
let mut de = <dyn erased_serde::Deserializer>::erase(&mut json_de);
|
||||||
|
|
||||||
|
Ok(f(&mut de))
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => Err(crate::error::Error::UnknownCodec),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn generic_serialize<'a>(kind: BodyType, msg: &dyn SharedMessage, buffer: &'a mut Vec<u8>) -> Result<Cow<'a, [u8]>, crate::error::Error> {
|
||||||
|
match kind {
|
||||||
|
BodyType::Cbor => {
|
||||||
|
let mut cbor_se = serde_cbor::Serializer::new(&mut *buffer);
|
||||||
|
let mut se = <dyn erased_serde::Serializer>::erase(&mut cbor_se);
|
||||||
|
msg.erased_serialize(&mut se)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
BodyType::Json => {
|
||||||
|
let mut json_se = serde_json::Serializer::new(&mut *buffer);
|
||||||
|
let mut se = <dyn erased_serde::Serializer>::erase(&mut json_se);
|
||||||
|
msg.erased_serialize(&mut se)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => return Err(crate::error::Error::UnknownCodec),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(buffer.as_slice().into())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::proto::ProtocolItem;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
BodyType, ProtocolHeader, ProtocolHeaderActionKind, ProtocolHeaderFlags, ProtocolPacket,
|
||||||
|
};
|
||||||
|
use messagebus::Event;
|
||||||
|
use messagebus::{derive::Message, Bus, Message, TypeTagged};
|
||||||
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
use std::borrow::Cow;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
|
||||||
|
#[namespace("test")]
|
||||||
|
#[message(shared, clone)]
|
||||||
|
struct TestSharedMessage {
|
||||||
|
test: String,
|
||||||
|
value: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_proto_pack_event() {
|
||||||
|
let (bus, _) = Bus::build()
|
||||||
|
.register_shared_message::<TestSharedMessage>()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let pkt = ProtocolPacket {
|
||||||
|
header: ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Response,
|
||||||
|
type_tag: Some(TestSharedMessage::type_tag_().as_bytes().to_vec().into()),
|
||||||
|
flags: ProtocolHeaderFlags::TT_BODY_AND_ARGUMENT,
|
||||||
|
body_type: BodyType::Json,
|
||||||
|
argument: 222,
|
||||||
|
},
|
||||||
|
body: Some(Cow::Borrowed(br#"{"test":"my test","value":12}"#)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let event = match pkt.deserialize(&bus).unwrap() {
|
||||||
|
ProtocolItem::Event(ev) => ev,
|
||||||
|
_ => unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(matches!(event, Event::Response(..)));
|
||||||
|
|
||||||
|
match event {
|
||||||
|
Event::Response(mid, msg) => {
|
||||||
|
assert_eq!(mid, 222);
|
||||||
|
// assert!(msg.is_ok());
|
||||||
|
let msg = msg.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(msg.type_tag(), TestSharedMessage::type_tag_());
|
||||||
|
let m: Box<TestSharedMessage> = msg.as_any_boxed().downcast().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(m.value, 12);
|
||||||
|
assert_eq!(&m.test, "my test");
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_proto_pack_event_error() {
|
||||||
|
let (bus, _) = Bus::build()
|
||||||
|
.register_shared_message::<TestSharedMessage>()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let pkt = ProtocolPacket {
|
||||||
|
header: ProtocolHeader {
|
||||||
|
kind: ProtocolHeaderActionKind::Response,
|
||||||
|
type_tag: Some(TestSharedMessage::type_tag_().as_bytes().to_vec().into()),
|
||||||
|
flags: ProtocolHeaderFlags::TT_ERROR_AND_ARGUMENT,
|
||||||
|
body_type: BodyType::Utf8,
|
||||||
|
argument: 222,
|
||||||
|
},
|
||||||
|
body: Some(Cow::Borrowed(br#"error description"#)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let event = match pkt.deserialize(&bus).unwrap() {
|
||||||
|
ProtocolItem::Event(ev) => ev,
|
||||||
|
_ => unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(matches!(event, Event::Response(..)));
|
||||||
|
|
||||||
|
#[allow(clippy::unit_cmp)]
|
||||||
|
match event {
|
||||||
|
Event::Response(mid, msg) => {
|
||||||
|
assert_eq!(mid, 222);
|
||||||
|
let msg = msg.unwrap_err();
|
||||||
|
|
||||||
|
assert!(matches!(msg, messagebus::error::Error::Other(val) if (
|
||||||
|
assert_eq!(val.type_tag, TestSharedMessage::type_tag_()) == () &&
|
||||||
|
assert_eq!(val.description, "error description") == ()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,8 @@
|
|||||||
// #[cfg(feature = "quic")]
|
// #[cfg(feature = "quic")]
|
||||||
mod quic;
|
mod quic;
|
||||||
|
|
||||||
|
// mod redis;
|
||||||
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use messagebus::{error::GenericError, Event, Message, TypeTag};
|
use messagebus::{error::GenericError, Event, Message, TypeTag};
|
||||||
use std::{collections::HashMap, pin::Pin};
|
use std::{collections::HashMap, pin::Pin};
|
||||||
@ -11,21 +13,15 @@ pub use quic::*;
|
|||||||
pub(crate) type GenericEventStream =
|
pub(crate) type GenericEventStream =
|
||||||
Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, GenericError>> + Send>>;
|
Pin<Box<dyn Stream<Item = Event<Box<dyn Message>, GenericError>> + Send>>;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub(crate) struct MessageTable {
|
pub struct MessageTable {
|
||||||
table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
|
table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageTable {
|
impl MessageTable {
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
table: HashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add(&mut self, req: TypeTag, resp: TypeTag, err: TypeTag) {
|
pub fn add(&mut self, req: TypeTag, resp: TypeTag, err: TypeTag) {
|
||||||
self.table.entry(req)
|
self.table
|
||||||
|
.entry(req)
|
||||||
.or_insert_with(Vec::new)
|
.or_insert_with(Vec::new)
|
||||||
.push((resp, err));
|
.push((resp, err));
|
||||||
}
|
}
|
||||||
@ -47,7 +43,6 @@ impl MessageTable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl From<Vec<(TypeTag, TypeTag, TypeTag)>> for MessageTable {
|
impl From<Vec<(TypeTag, TypeTag, TypeTag)>> for MessageTable {
|
||||||
fn from(table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Self {
|
fn from(table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Self {
|
||||||
let mut outgoing_table = MessageTable::default();
|
let mut outgoing_table = MessageTable::default();
|
||||||
@ -56,4 +51,4 @@ impl From<Vec<(TypeTag, TypeTag, TypeTag)>> for MessageTable {
|
|||||||
}
|
}
|
||||||
outgoing_table
|
outgoing_table
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,9 @@
|
|||||||
use crate::{
|
use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}};
|
||||||
error::Error,
|
use futures::{Future, FutureExt};
|
||||||
relays::{GenericEventStream, MessageTable},
|
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept};
|
||||||
};
|
|
||||||
use futures::{Future, FutureExt, Stream, pin_mut};
|
|
||||||
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, SharedMessage, TypeTag, TypeTagAccept, TypeTagged, error::{GenericError, SendError}};
|
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use serde::Deserialize;
|
use std::{net::SocketAddr, sync::atomic::AtomicBool};
|
||||||
use serde_derive::{Deserialize, Serialize};
|
use tokio::{sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}};
|
||||||
use core::slice::SlicePattern;
|
|
||||||
use std::{collections::{HashMap, HashSet}, net::SocketAddr, pin::Pin, sync::atomic::AtomicBool, task::Poll};
|
|
||||||
use tokio::{io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}, sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}};
|
|
||||||
use bytes::{Buf, BufMut};
|
use bytes::{Buf, BufMut};
|
||||||
|
|
||||||
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
|
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
|
||||||
@ -74,15 +68,14 @@ pub struct QuicClientRelay {
|
|||||||
host: String,
|
host: String,
|
||||||
endpoint: QuicClientRelayEndpoint,
|
endpoint: QuicClientRelayEndpoint,
|
||||||
outgoing_table: MessageTable,
|
outgoing_table: MessageTable,
|
||||||
sender: UnboundedSender<Request>,
|
sender: UnboundedSender<ProtocolItem>,
|
||||||
receiver_send: Mutex<Option<(oneshot::Sender<(quinn::RecvStream, quinn::Connection)>, UnboundedReceiver<Request>)>>,
|
receiver_send: Mutex<Option<(oneshot::Sender<(quinn::RecvStream, quinn::Connection)>, UnboundedReceiver<ProtocolItem>)>>,
|
||||||
receiver_recv: Mutex<Option<oneshot::Receiver<(quinn::RecvStream, quinn::Connection)>>>,
|
receiver_recv: Mutex<Option<oneshot::Receiver<(quinn::RecvStream, quinn::Connection)>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QuicClientRelay {
|
impl QuicClientRelay {
|
||||||
pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, Error> {
|
pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, Error> {
|
||||||
let endpoint = QuicClientRelayEndpoint::new(cert)?;
|
let endpoint = QuicClientRelayEndpoint::new(cert)?;
|
||||||
let mut outgoing_table = MessageTable::from(table);
|
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
let (recv_send, recv_recv) = oneshot::channel();
|
let (recv_send, recv_recv) = oneshot::channel();
|
||||||
|
|
||||||
@ -91,7 +84,7 @@ impl QuicClientRelay {
|
|||||||
addr,
|
addr,
|
||||||
host,
|
host,
|
||||||
endpoint,
|
endpoint,
|
||||||
outgoing_table,
|
outgoing_table: MessageTable::from(table),
|
||||||
sender,
|
sender,
|
||||||
receiver_send: Mutex::new(Some((recv_send, receiver))),
|
receiver_send: Mutex::new(Some((recv_send, receiver))),
|
||||||
receiver_recv: Mutex::new(Some(recv_recv)),
|
receiver_recv: Mutex::new(Some(recv_recv)),
|
||||||
@ -99,195 +92,6 @@ impl QuicClientRelay {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
#[repr(u16)]
|
|
||||||
pub enum ProtocolHeaderActionKind {
|
|
||||||
Nop,
|
|
||||||
Send,
|
|
||||||
Response,
|
|
||||||
Flush,
|
|
||||||
Flushed,
|
|
||||||
Synchronize,
|
|
||||||
Synchronized,
|
|
||||||
BatchComplete,
|
|
||||||
Close,
|
|
||||||
Exited,
|
|
||||||
Initialize,
|
|
||||||
Ready,
|
|
||||||
Pause,
|
|
||||||
Paused,
|
|
||||||
Error,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
pub struct ProtocolHeader<'a> {
|
|
||||||
kind: ProtocolHeaderActionKind,
|
|
||||||
type_tag: Option<&'a [u8]>,
|
|
||||||
failed: bool,
|
|
||||||
body_encoding: u32,
|
|
||||||
argument: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> ProtocolHeader<'a> {
|
|
||||||
pub fn send(mid: u64, tt: &'a TypeTag) -> ProtocolHeader<'a> {
|
|
||||||
ProtocolHeader {
|
|
||||||
kind: ProtocolHeaderActionKind::Send,
|
|
||||||
type_tag: Some(tt.as_bytes()),
|
|
||||||
failed: false,
|
|
||||||
body_encoding: 0,
|
|
||||||
argument: mid,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn flush() -> Self {
|
|
||||||
ProtocolHeader {
|
|
||||||
kind: ProtocolHeaderActionKind::Flush,
|
|
||||||
type_tag: None,
|
|
||||||
failed: false,
|
|
||||||
body_encoding: 0,
|
|
||||||
argument: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn close() -> Self {
|
|
||||||
ProtocolHeader {
|
|
||||||
kind: ProtocolHeaderActionKind::Close,
|
|
||||||
type_tag: None,
|
|
||||||
failed: false,
|
|
||||||
body_encoding: 0,
|
|
||||||
argument: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn sync() -> Self {
|
|
||||||
ProtocolHeader {
|
|
||||||
kind: ProtocolHeaderActionKind::Synchronize,
|
|
||||||
type_tag: None,
|
|
||||||
failed: false,
|
|
||||||
body_encoding: 0,
|
|
||||||
argument: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn init() -> Self {
|
|
||||||
ProtocolHeader {
|
|
||||||
kind: ProtocolHeaderActionKind::Initialize,
|
|
||||||
type_tag: None,
|
|
||||||
failed: false,
|
|
||||||
body_encoding: 0,
|
|
||||||
argument: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pause() -> Self {
|
|
||||||
ProtocolHeader {
|
|
||||||
kind: ProtocolHeaderActionKind::Pause,
|
|
||||||
type_tag: None,
|
|
||||||
failed: false,
|
|
||||||
body_encoding: 0,
|
|
||||||
argument: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
pub struct ProtocolPacket<'a> {
|
|
||||||
header: ProtocolHeader<'a>,
|
|
||||||
body: &'a [u8]
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Request {
|
|
||||||
Init,
|
|
||||||
Flush,
|
|
||||||
Sync,
|
|
||||||
Close,
|
|
||||||
Stats,
|
|
||||||
Send(u64, Box<dyn SharedMessage>, bool),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Request {
|
|
||||||
pub async fn serialize<W: AsyncWrite + Unpin>(&self, mut header_buff: &mut Vec<u8>, body_buff: &mut Vec<u8>, conn: &mut W) -> Result<(), Error> {
|
|
||||||
body_buff.clear();
|
|
||||||
header_buff.clear();
|
|
||||||
|
|
||||||
let mut tt = TypeTag::Borrowed("");
|
|
||||||
let header = match self {
|
|
||||||
Request::Init => unimplemented!(),
|
|
||||||
Request::Flush => ProtocolHeader::flush(),
|
|
||||||
Request::Sync => ProtocolHeader::sync(),
|
|
||||||
Request::Close => ProtocolHeader::close(),
|
|
||||||
Request::Stats => unimplemented!(),
|
|
||||||
Request::Send(mid, msg, req) => {
|
|
||||||
tt = msg.type_tag();
|
|
||||||
|
|
||||||
let mut cbor_se = serde_cbor::Serializer::new(&mut *body_buff);
|
|
||||||
let mut se = <dyn erased_serde::Serializer>::erase(&mut cbor_se);
|
|
||||||
|
|
||||||
msg.erased_serialize(&mut se);
|
|
||||||
|
|
||||||
ProtocolHeader::send(if *req {*mid} else {0}, &tt)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
serde_cbor::to_writer(&mut header_buff, &ProtocolPacket {
|
|
||||||
header,
|
|
||||||
body: body_buff.as_slice(),
|
|
||||||
}).unwrap();
|
|
||||||
|
|
||||||
let mut buf = [0u8; 16];
|
|
||||||
let mut writer = &mut buf[..];
|
|
||||||
|
|
||||||
writer.put(&b"MBUS"[..]);
|
|
||||||
writer.put_u16(1);
|
|
||||||
writer.put_u16(0);
|
|
||||||
writer.put_u64(header_buff.len() as _);
|
|
||||||
|
|
||||||
conn.write_all(&buf).await?;
|
|
||||||
conn.write_all(&header_buff).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
// buff.
|
|
||||||
|
|
||||||
|
|
||||||
// let x = match self {
|
|
||||||
// Request::Init => unimplemented!(),
|
|
||||||
// Request::Flush => ProtocolHeader::Flush,
|
|
||||||
// Request::Sync => ProtocolHeader::Synchronize,
|
|
||||||
// Request::Close => ProtocolHeader::Close,
|
|
||||||
// Request::Stats => unimplemented!(),
|
|
||||||
// Request::Send(_mid, msg) => {
|
|
||||||
// let ser = serde_cbor::Serializer::new(buff);
|
|
||||||
|
|
||||||
// ProtocolHeader::Send(msg.type_tag())
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Request::Request(mid, msg) => {
|
|
||||||
// ProtocolHeader::Request(*mid, msg.type_tag())
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Action> for Request {
|
|
||||||
fn from(action: Action) -> Self {
|
|
||||||
match action {
|
|
||||||
Action::Init => Request::Init,
|
|
||||||
Action::Flush => Request::Flush,
|
|
||||||
Action::Sync => Request::Sync,
|
|
||||||
Action::Close => Request::Close,
|
|
||||||
Action::Stats => Request::Stats,
|
|
||||||
_ => unimplemented!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<(bool, u64, Box<dyn SharedMessage>)> for Request {
|
|
||||||
fn from((req, mid, msg): (bool, u64, Box<dyn SharedMessage>)) -> Self {
|
|
||||||
Request::Send(mid, msg, req)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TypeTagAccept for QuicClientRelay {
|
impl TypeTagAccept for QuicClientRelay {
|
||||||
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||||
self.outgoing_table.accept(msg, resp, err)
|
self.outgoing_table.accept(msg, resp, err)
|
||||||
@ -312,13 +116,33 @@ impl SendUntypedReceiver for QuicClientRelay {
|
|||||||
let conn = self.endpoint.connect(&self.addr, &self.host);
|
let conn = self.endpoint.connect(&self.addr, &self.host);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
println!("spawn");
|
||||||
let mut conn = conn.await.unwrap();
|
let mut conn = conn.await.unwrap();
|
||||||
sender.send((conn.recv, conn.connection)).unwrap();
|
sender.send((conn.recv, conn.connection)).unwrap();
|
||||||
let mut buf1 = Vec::new();
|
let mut body_buff = Vec::new();
|
||||||
let mut buf2 = Vec::new();
|
let mut header_buff = Vec::new();
|
||||||
|
|
||||||
while let Some(r) = rx.recv().await {
|
while let Some(r) = rx.recv().await {
|
||||||
r.serialize(&mut buf1, &mut buf2, &mut conn.send);
|
body_buff.clear();
|
||||||
|
header_buff.clear();
|
||||||
|
|
||||||
|
let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap();
|
||||||
|
serde_cbor::to_writer(&mut header_buff, &pkt).unwrap();
|
||||||
|
|
||||||
|
println!("msg {:?}", pkt);
|
||||||
|
|
||||||
|
let mut buf = [0u8; 16];
|
||||||
|
let mut writer = &mut buf[..];
|
||||||
|
|
||||||
|
writer.put(&b"MBUS"[..]);
|
||||||
|
writer.put_u16(1);
|
||||||
|
writer.put_u16(0);
|
||||||
|
writer.put_u64(header_buff.len() as _);
|
||||||
|
|
||||||
|
conn.send.write_all(&buf).await.unwrap();
|
||||||
|
println!("header sent");
|
||||||
|
conn.send.write_all(&header_buff).await.unwrap();
|
||||||
|
println!("body sent");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -336,11 +160,16 @@ impl SendUntypedReceiver for QuicClientRelay {
|
|||||||
req: bool,
|
req: bool,
|
||||||
_bus: &Bus,
|
_bus: &Bus,
|
||||||
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
|
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
|
||||||
if let Ok(val) = msg.as_shared_boxed() {
|
match msg.as_shared_boxed() {
|
||||||
self.sender.send((req, mid, msg).into()).unwrap();
|
Ok(msg) => {
|
||||||
Ok(())
|
if let Err(err) = self.sender.send((mid, msg, req).into()) {
|
||||||
} else {
|
Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box()))
|
||||||
Err(SendError:)
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -360,7 +189,7 @@ impl ReciveUntypedReceiver for QuicClientRelay {
|
|||||||
if first {
|
if first {
|
||||||
return Some((Event::Ready, (false, recv, conn, bus, buff)));
|
return Some((Event::Ready, (false, recv, conn, bus, buff)));
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe { buff.set_len(16) };
|
unsafe { buff.set_len(16) };
|
||||||
recv.read_exact(&mut buff).await.unwrap();
|
recv.read_exact(&mut buff).await.unwrap();
|
||||||
|
|
||||||
@ -386,64 +215,8 @@ impl ReciveUntypedReceiver for QuicClientRelay {
|
|||||||
let event = match content_type {
|
let event = match content_type {
|
||||||
0 => { // CBOR
|
0 => { // CBOR
|
||||||
let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap();
|
let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap();
|
||||||
|
match proto.deserialize(&bus).unwrap() {
|
||||||
use ProtocolHeaderActionKind::*;
|
ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
|
||||||
match proto.header.kind {
|
|
||||||
Nop => unimplemented!(),
|
|
||||||
Response => {
|
|
||||||
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
|
||||||
|
|
||||||
let res = if proto.header.failed {
|
|
||||||
Err(
|
|
||||||
messagebus::error::Error::Other(GenericError {
|
|
||||||
type_tag: tt.into(),
|
|
||||||
description: "unknown".into()
|
|
||||||
}
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
let mut cbor_de = serde_cbor::Deserializer::from_slice(proto.body);
|
|
||||||
let mut de = <dyn erased_serde::Deserializer>::erase(&mut cbor_de);
|
|
||||||
|
|
||||||
bus.deserialize_message(tt.into(), &mut de)
|
|
||||||
.map_err(|x| x.map_msg(|_| ()))
|
|
||||||
};
|
|
||||||
|
|
||||||
Event::Response(proto.header.argument, res)
|
|
||||||
}
|
|
||||||
Ready => {
|
|
||||||
if proto.header.failed {
|
|
||||||
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
|
||||||
Event::InitFailed(messagebus::error::Error::Other(GenericError {
|
|
||||||
type_tag: tt.into(),
|
|
||||||
description: "unknown".into()
|
|
||||||
}))
|
|
||||||
} else {
|
|
||||||
Event::Ready
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Exited => Event::Exited,
|
|
||||||
Pause => Event::Pause,
|
|
||||||
BatchComplete => Event::Finished(proto.header.argument),
|
|
||||||
Flushed => Event::Flushed,
|
|
||||||
Error => {
|
|
||||||
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
|
||||||
Event::Error(GenericError {
|
|
||||||
type_tag: tt.into(),
|
|
||||||
description: "unknown".into()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Synchronized => {
|
|
||||||
if proto.header.failed {
|
|
||||||
let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string();
|
|
||||||
Event::Synchronized( Err(messagebus::error::Error::Other(GenericError {
|
|
||||||
type_tag: tt.into(),
|
|
||||||
description: "unknown".into()
|
|
||||||
})))
|
|
||||||
} else {
|
|
||||||
Event::Synchronized(Ok(()))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
_ => unimplemented!()
|
_ => unimplemented!()
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -3,3 +3,6 @@ mod server;
|
|||||||
|
|
||||||
pub use client::QuicClientRelay;
|
pub use client::QuicClientRelay;
|
||||||
pub use server::QuicServerRelay;
|
pub use server::QuicServerRelay;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -1 +1,258 @@
|
|||||||
pub struct QuicServerRelay {}
|
use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}};
|
||||||
|
use futures::{Future, FutureExt, StreamExt};
|
||||||
|
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::{net::SocketAddr, sync::{Arc, atomic::AtomicBool}};
|
||||||
|
use tokio::{sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}};
|
||||||
|
use bytes::{Buf, BufMut};
|
||||||
|
|
||||||
|
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
|
||||||
|
|
||||||
|
pub struct QuicServerRelayEndpoint {
|
||||||
|
endpoint: quinn::Endpoint,
|
||||||
|
incoming: Mutex<Option<quinn::Incoming>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QuicServerRelayEndpoint {
|
||||||
|
pub fn new(key_path: &str, cert_path: &str, addr: &SocketAddr) -> Result<Self, Error> {
|
||||||
|
let mut transport_config = quinn::TransportConfig::default();
|
||||||
|
transport_config.max_concurrent_uni_streams(0)?;
|
||||||
|
|
||||||
|
let mut server_config = quinn::ServerConfig::default();
|
||||||
|
server_config.transport = Arc::new(transport_config);
|
||||||
|
|
||||||
|
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
|
||||||
|
|
||||||
|
server_config.protocols(ALPN_QUIC_HTTP);
|
||||||
|
server_config.enable_keylog();
|
||||||
|
|
||||||
|
let key = std::fs::read(key_path)?;
|
||||||
|
let cert_der = std::fs::read(cert_path)?;
|
||||||
|
|
||||||
|
let key = quinn::PrivateKey::from_der(&key)?;
|
||||||
|
let cert_chain = quinn::Certificate::from_der(&cert_der)?;
|
||||||
|
|
||||||
|
let cert = quinn::CertificateChain::from_certs([cert_chain]);
|
||||||
|
|
||||||
|
server_config.certificate(cert, key)?;
|
||||||
|
|
||||||
|
let mut endpoint = quinn::Endpoint::builder();
|
||||||
|
endpoint.listen(server_config.build());
|
||||||
|
|
||||||
|
let (endpoint, incoming) = endpoint.bind(addr)?;
|
||||||
|
|
||||||
|
Ok(Self { endpoint, incoming: Mutex::new(Some(incoming)) })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn incoming(&self) -> impl Future<Output = Result<QuicServerConnection, Error>> {
|
||||||
|
let mut conn = self.incoming.lock().take().unwrap();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let conn = conn.next().await.unwrap();
|
||||||
|
let quinn::NewConnection { connection, .. } = conn.await?;
|
||||||
|
|
||||||
|
Ok(QuicServerConnection {
|
||||||
|
connection,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connect(
|
||||||
|
&self,
|
||||||
|
addr: &SocketAddr,
|
||||||
|
host: &str,
|
||||||
|
) -> impl Future<Output = Result<QuicServerConnection, Error>> {
|
||||||
|
let conn = self.endpoint.connect(addr, host);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let quinn::NewConnection { connection, .. } = conn?.await?;
|
||||||
|
|
||||||
|
Ok(QuicServerConnection {
|
||||||
|
connection,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub async fn wait_idle(&self) {
|
||||||
|
self.endpoint.wait_idle().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct QuicServerConnection {
|
||||||
|
connection: quinn::Connection,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct QuicServerRelay {
|
||||||
|
ready_flag: AtomicBool,
|
||||||
|
endpoint: QuicServerRelayEndpoint,
|
||||||
|
outgoing_table: MessageTable,
|
||||||
|
sender: UnboundedSender<ProtocolItem>,
|
||||||
|
receiver_send: Mutex<Option<(UnboundedSender<quinn::RecvStream>, UnboundedReceiver<ProtocolItem>)>>,
|
||||||
|
receiver_recv: Mutex<Option<UnboundedReceiver<quinn::RecvStream>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QuicServerRelay {
|
||||||
|
pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, Error> {
|
||||||
|
let endpoint = QuicServerRelayEndpoint::new(key_path, cert_path, &addr)?;
|
||||||
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
let (recv_send, recv_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
ready_flag: AtomicBool::new(false),
|
||||||
|
endpoint,
|
||||||
|
outgoing_table: MessageTable::from(table),
|
||||||
|
sender,
|
||||||
|
receiver_send: Mutex::new(Some((recv_send, receiver))),
|
||||||
|
receiver_recv: Mutex::new(Some(recv_recv)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TypeTagAccept for QuicServerRelay {
|
||||||
|
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||||
|
self.outgoing_table.accept(msg, resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
|
||||||
|
let iter = self.outgoing_table.iter_types();
|
||||||
|
|
||||||
|
for (m, r, e) in iter {
|
||||||
|
if cb(m, r, e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendUntypedReceiver for QuicServerRelay {
|
||||||
|
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
|
||||||
|
match msg {
|
||||||
|
Action::Init => {
|
||||||
|
let (sender, mut rx) = self.receiver_send.lock().take().unwrap();
|
||||||
|
let conn = self.endpoint.incoming();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let conn = conn.await.unwrap();
|
||||||
|
|
||||||
|
let mut body_buff = Vec::new();
|
||||||
|
let mut header_buff = Vec::new();
|
||||||
|
|
||||||
|
while let Some(r) = rx.recv().await {
|
||||||
|
let (mut send, recv) = conn.connection.open_bi().await.unwrap();
|
||||||
|
|
||||||
|
sender.send(recv).unwrap();
|
||||||
|
|
||||||
|
body_buff.clear();
|
||||||
|
header_buff.clear();
|
||||||
|
|
||||||
|
let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap();
|
||||||
|
serde_cbor::to_writer(&mut header_buff, &pkt).unwrap();
|
||||||
|
|
||||||
|
let mut buf = [0u8; 16];
|
||||||
|
let mut writer = &mut buf[..];
|
||||||
|
|
||||||
|
writer.put(&b"MBUS"[..]);
|
||||||
|
writer.put_u16(1);
|
||||||
|
writer.put_u16(0);
|
||||||
|
writer.put_u64(header_buff.len() as _);
|
||||||
|
|
||||||
|
send.write_all(&buf).await.unwrap();
|
||||||
|
send.write_all(&header_buff).await.unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
other => self.sender.send(other.into()).unwrap(),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_msg(
|
||||||
|
&self,
|
||||||
|
mid: u64,
|
||||||
|
msg: Box<dyn Message>,
|
||||||
|
req: bool,
|
||||||
|
_bus: &Bus,
|
||||||
|
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
|
||||||
|
match msg.as_shared_boxed() {
|
||||||
|
Ok(msg) => {
|
||||||
|
if let Err(err) = self.sender.send((mid, msg, req).into()) {
|
||||||
|
Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box()))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReciveUntypedReceiver for QuicServerRelay {
|
||||||
|
type Stream = GenericEventStream;
|
||||||
|
|
||||||
|
fn event_stream(&self, bus: Bus) -> Self::Stream {
|
||||||
|
let mut recv = self.receiver_recv.lock().take().unwrap();
|
||||||
|
|
||||||
|
Box::pin(futures::stream::poll_fn(move |cx|recv.poll_recv(cx))
|
||||||
|
.map(move |recv| {
|
||||||
|
let buff = Vec::with_capacity(16);
|
||||||
|
let bus = bus.clone();
|
||||||
|
|
||||||
|
futures::stream::unfold(
|
||||||
|
(true, recv, bus, buff),
|
||||||
|
|(first, mut recv, bus, mut buff)| async move {
|
||||||
|
if first {
|
||||||
|
return Some((Event::Ready, (false, recv, bus, buff)));
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("1");
|
||||||
|
|
||||||
|
unsafe { buff.set_len(16) };
|
||||||
|
recv.read_exact(&mut buff).await.unwrap();
|
||||||
|
|
||||||
|
println!("{:?}", buff);
|
||||||
|
|
||||||
|
let mut reader = &buff[..];
|
||||||
|
let mut sign = [0u8; 4];
|
||||||
|
reader.copy_to_slice(&mut sign);
|
||||||
|
assert!(&sign != b"MBUS");
|
||||||
|
|
||||||
|
let version = reader.get_u16();
|
||||||
|
assert!(version == 1);
|
||||||
|
|
||||||
|
let content_type = reader.get_u16();
|
||||||
|
|
||||||
|
let body_size = reader.get_u64();
|
||||||
|
let diff = buff.capacity() as i64 - body_size as i64;
|
||||||
|
if diff < 0 {
|
||||||
|
buff.reserve(-diff as usize);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe { buff.set_len(body_size as usize); }
|
||||||
|
recv.read_exact(&mut buff).await.unwrap();
|
||||||
|
|
||||||
|
println!("{:?}", buff);
|
||||||
|
|
||||||
|
let event = match content_type {
|
||||||
|
0 => { // CBOR
|
||||||
|
let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap();
|
||||||
|
|
||||||
|
match proto.deserialize(&bus).unwrap() {
|
||||||
|
ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
|
||||||
|
_ => unimplemented!()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => unimplemented!()
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((event, (false, recv, bus, buff)))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.flatten()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
124
crates/remote/src/relays/redis/mod.rs
Normal file
124
crates/remote/src/relays/redis/mod.rs
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept};
|
||||||
|
|
||||||
|
use crate::error::Error;
|
||||||
|
|
||||||
|
use super::{GenericEventStream, MessageTable};
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
pub struct RedisRelay {
|
||||||
|
in_table: MessageTable,
|
||||||
|
out_table: MessageTable,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RedisRelay {
|
||||||
|
fn new(uri: &str) -> Result<Self, Error> {
|
||||||
|
let client = redis::Client::open("redis://127.0.0.1/")?;
|
||||||
|
unimplemented!()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// let mut publish_conn = client.get_async_connection().await?;
|
||||||
|
// let mut pubsub_conn = client.get_async_connection().await?.into_pubsub();
|
||||||
|
|
||||||
|
// pubsub_conn.subscribe("wavephone").await?;
|
||||||
|
// let mut pubsub_stream = pubsub_conn.on_message();
|
||||||
|
|
||||||
|
// publish_conn.publish("wavephone", "banana").await?;
|
||||||
|
|
||||||
|
// let pubsub_msg: String = pubsub_stream.next().await.unwrap().get_payload()?;
|
||||||
|
// assert_eq!(&pubsub_msg, "banana");
|
||||||
|
|
||||||
|
// Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TypeTagAccept for RedisRelay {
|
||||||
|
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
|
||||||
|
self.out_table.accept(msg, resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
|
||||||
|
let iter = self.out_table.iter_types();
|
||||||
|
|
||||||
|
for (m, r, e) in iter {
|
||||||
|
if cb(m, r, e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendUntypedReceiver for RedisRelay {
|
||||||
|
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
|
||||||
|
match msg {
|
||||||
|
Action::Init => {
|
||||||
|
// let (sender, mut rx) = self.receiver_send.lock().take().unwrap();
|
||||||
|
// let conn = self.endpoint.incoming();
|
||||||
|
|
||||||
|
// tokio::spawn(async move {
|
||||||
|
// let mut conn = conn.await.unwrap();
|
||||||
|
// sender.send((conn.recv, conn.connection)).unwrap();
|
||||||
|
// let mut buf1 = Vec::new();
|
||||||
|
// let mut buf2 = Vec::new();
|
||||||
|
|
||||||
|
// while let Some(r) = rx.recv().await {
|
||||||
|
// r.serialize(&mut buf1, &mut buf2, &mut conn.send)
|
||||||
|
// .await
|
||||||
|
// .unwrap();
|
||||||
|
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
}
|
||||||
|
|
||||||
|
other => self.sender.send(other.into()).unwrap(),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_msg(
|
||||||
|
&self,
|
||||||
|
mid: u64,
|
||||||
|
msg: Box<dyn Message>,
|
||||||
|
req: bool,
|
||||||
|
_bus: &Bus,
|
||||||
|
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
|
||||||
|
match msg.as_shared_boxed() {
|
||||||
|
Ok(msg) => {
|
||||||
|
if let Err(err) = self.sender.send((req, mid, msg).into()) {
|
||||||
|
Err(messagebus::error::Error::TryAgain(err.0.unwrap_msg().unwrap()))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReciveUntypedReceiver for RedisRelay {
|
||||||
|
type Stream = GenericEventStream;
|
||||||
|
|
||||||
|
fn event_stream(&self, bus: Bus) -> Self::Stream {
|
||||||
|
let recevier = self.recevier.lock().take().unwrap();
|
||||||
|
|
||||||
|
Box::pin(
|
||||||
|
futures::stream::unfold((recevier, bus), |(recv, bus)| async move {
|
||||||
|
if let Ok(r) = recv.next().await? {
|
||||||
|
let stream = futures::stream::unfold((true, r, bus.clone()), |(first, r, bus)| async move {
|
||||||
|
if first {
|
||||||
|
return Some((Event::Ready, (false, r, bus)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Some((Event::Pause, (false, r, bus)))
|
||||||
|
});
|
||||||
|
|
||||||
|
Some((stream.left_stream(), (recv, bus)))
|
||||||
|
} else {
|
||||||
|
Some((futures::stream::once(async move { Event::Pause }).right_stream(), (recv, bus)))
|
||||||
|
}
|
||||||
|
}).flatten()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
@ -6,20 +6,13 @@ use futures::{Future, FutureExt};
|
|||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::{
|
use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, Relay, SharedMessage, SynchronizedHandler, Untyped, envelop::{IntoSharedMessage, TypeTag}, error::{Error, StdSyncSendError}, receiver::{
|
||||||
envelop::TypeTag,
|
|
||||||
error::{Error, StdSyncSendError},
|
|
||||||
receiver::{
|
|
||||||
BusPollerCallback, Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
|
BusPollerCallback, Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver,
|
||||||
UntypedPollerCallback,
|
UntypedPollerCallback,
|
||||||
},
|
}, receivers};
|
||||||
receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler,
|
|
||||||
AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler,
|
|
||||||
IntoBoxedMessage, Message, Relay, SynchronizedHandler, Untyped,
|
|
||||||
};
|
|
||||||
|
|
||||||
type MessageDeserializerCallback = Box<
|
type MessageDeserializerCallback = Box<
|
||||||
dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result<Box<dyn Message>, Error>
|
dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result<Box<dyn SharedMessage>, Error>
|
||||||
+ Send
|
+ Send
|
||||||
+ Sync,
|
+ Sync,
|
||||||
>;
|
>;
|
||||||
@ -219,7 +212,7 @@ impl MessageTypeDescriptor {
|
|||||||
pub fn deserialize_boxed(
|
pub fn deserialize_boxed(
|
||||||
&self,
|
&self,
|
||||||
de: &mut dyn erased_serde::Deserializer<'_>,
|
de: &mut dyn erased_serde::Deserializer<'_>,
|
||||||
) -> Result<Box<dyn Message>, Error> {
|
) -> Result<Box<dyn SharedMessage>, Error> {
|
||||||
(self.de)(de)
|
(self.de)(de)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -249,7 +242,7 @@ impl Module {
|
|||||||
self.message_types.insert(
|
self.message_types.insert(
|
||||||
M::type_tag_(),
|
M::type_tag_(),
|
||||||
MessageTypeDescriptor {
|
MessageTypeDescriptor {
|
||||||
de: Box::new(move |de| Ok(M::deserialize(de)?.into_boxed())),
|
de: Box::new(move |de| Ok(M::deserialize(de)?.into_shared())),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -154,8 +154,29 @@ impl<T: Message> IntoBoxedMessage for T {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait SharedMessage: Message + erased_serde::Serialize {}
|
pub trait IntoSharedMessage {
|
||||||
impl<T: Message + erased_serde::Serialize> SharedMessage for T {}
|
fn into_shared(self) -> Box<dyn SharedMessage>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Message + serde::Serialize> IntoSharedMessage for T {
|
||||||
|
fn into_shared(self) -> Box<dyn SharedMessage> {
|
||||||
|
Box::new(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub trait SharedMessage: Message + erased_serde::Serialize {
|
||||||
|
fn upcast_arc(self: Arc<Self>) -> Arc<dyn Message>;
|
||||||
|
fn upcast_box(self: Box<Self>) -> Box<dyn Message>;
|
||||||
|
fn upcast_ref(&self) -> &dyn Message;
|
||||||
|
fn upcast_mut(&mut self) -> &mut dyn Message;
|
||||||
|
}
|
||||||
|
impl<T: Message + erased_serde::Serialize> SharedMessage for T {
|
||||||
|
fn upcast_arc(self: Arc<Self>) -> Arc<dyn Message> { self }
|
||||||
|
fn upcast_box(self: Box<Self>) -> Box<dyn Message> { self }
|
||||||
|
fn upcast_ref(&self) -> &dyn Message { self }
|
||||||
|
fn upcast_mut(&mut self) -> &mut dyn Message { self }
|
||||||
|
}
|
||||||
|
|
||||||
// pub trait IntoTakeable {
|
// pub trait IntoTakeable {
|
||||||
// fn into_takeable(&mut self) -> Takeable<'_>;
|
// fn into_takeable(&mut self) -> Takeable<'_>;
|
||||||
|
@ -553,7 +553,7 @@ impl Bus {
|
|||||||
|
|
||||||
if let Some(rs) = self.inner.receivers.get(&tt).and_then(|rs| rs.first()) {
|
if let Some(rs) = self.inner.receivers.get(&tt).and_then(|rs| rs.first()) {
|
||||||
let msg = self.deserialize_message(tt.clone(), de)?;
|
let msg = self.deserialize_message(tt.clone(), de)?;
|
||||||
Ok(rs.send_boxed(self, mid, msg, false, rs.reserve(&tt).await)?)
|
Ok(rs.send_boxed(self, mid, msg.upcast_box(), false, rs.reserve(&tt).await)?)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::NoReceivers)
|
Err(Error::NoReceivers)
|
||||||
}
|
}
|
||||||
@ -578,7 +578,7 @@ impl Bus {
|
|||||||
rc.send_boxed(
|
rc.send_boxed(
|
||||||
self,
|
self,
|
||||||
mid | 1 << (usize::BITS - 1),
|
mid | 1 << (usize::BITS - 1),
|
||||||
msg,
|
msg.upcast_box(),
|
||||||
true,
|
true,
|
||||||
rc.reserve(&tt).await,
|
rc.reserve(&tt).await,
|
||||||
)?;
|
)?;
|
||||||
@ -593,7 +593,7 @@ impl Bus {
|
|||||||
&self,
|
&self,
|
||||||
tt: TypeTag,
|
tt: TypeTag,
|
||||||
de: &mut dyn erased_serde::Deserializer<'_>,
|
de: &mut dyn erased_serde::Deserializer<'_>,
|
||||||
) -> Result<Box<dyn Message>, Error<Box<dyn Message>>> {
|
) -> Result<Box<dyn SharedMessage>, Error<Box<dyn Message>>> {
|
||||||
let md = self
|
let md = self
|
||||||
.inner
|
.inner
|
||||||
.message_types
|
.message_types
|
||||||
|
@ -152,7 +152,7 @@ pub enum Event<M, E: StdSyncSendError> {
|
|||||||
Response(u64, Result<M, Error<(), E>>),
|
Response(u64, Result<M, Error<(), E>>),
|
||||||
Synchronized(Result<(), Error<(), E>>),
|
Synchronized(Result<(), Error<(), E>>),
|
||||||
Finished(u64),
|
Finished(u64),
|
||||||
Error(E),
|
Error(Error<(), E>),
|
||||||
InitFailed(Error<(), E>),
|
InitFailed(Error<(), E>),
|
||||||
Stats(Stats),
|
Stats(Stats),
|
||||||
Flushed,
|
Flushed,
|
||||||
@ -161,6 +161,23 @@ pub enum Event<M, E: StdSyncSendError> {
|
|||||||
Pause,
|
Pause,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<M, E: StdSyncSendError> Event<M, E> {
|
||||||
|
pub fn map_msg<U, F: FnOnce(M) -> U>(self, f: F) -> Event<U, E> {
|
||||||
|
match self {
|
||||||
|
Event::Response(mid, res) => Event::Response(mid, res.map(f)),
|
||||||
|
Event::Synchronized(res) => Event::Synchronized(res),
|
||||||
|
Event::Finished(cnt) => Event::Finished(cnt),
|
||||||
|
Event::Error(err) => Event::Error(err),
|
||||||
|
Event::InitFailed(err) => Event::InitFailed(err),
|
||||||
|
Event::Stats(st) => Event::Stats(st),
|
||||||
|
Event::Flushed => Event::Flushed,
|
||||||
|
Event::Exited => Event::Exited,
|
||||||
|
Event::Ready => Event::Ready,
|
||||||
|
Event::Pause => Event::Pause,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct ReceiverWrapper<M, R, E, S>
|
struct ReceiverWrapper<M, R, E, S>
|
||||||
where
|
where
|
||||||
M: Message,
|
M: Message,
|
||||||
|
@ -40,7 +40,7 @@ macro_rules! process_batch_result {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
$stx.send(Event::Error(er)).unwrap();
|
$stx.send(Event::Error(Error::Other(er))).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user