From 16690db44c6e882412c01357b500ac0b464b0456 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Fri, 24 Sep 2021 12:46:15 +0400 Subject: [PATCH] revert upgrate to 2021 edition --- Cargo.toml | 4 +- crates/derive/Cargo.toml | 4 +- crates/remote/Cargo.toml | 12 +- crates/remote/examples/basic.rs | 2 - crates/remote/examples/cert.der | Bin 0 -> 342 bytes crates/remote/examples/cert.key | Bin 0 -> 138 bytes crates/remote/examples/quic_client.rs | 58 +++ crates/remote/examples/quic_server.rs | 85 ++++ crates/remote/src/error.rs | 18 + crates/remote/src/lib.rs | 1 + crates/remote/src/proto.rs | 510 ++++++++++++++++++++++++ crates/remote/src/relays/mod.rs | 17 +- crates/remote/src/relays/quic/client.rs | 315 ++------------- crates/remote/src/relays/quic/mod.rs | 3 + crates/remote/src/relays/quic/server.rs | 259 +++++++++++- crates/remote/src/relays/redis/mod.rs | 124 ++++++ src/builder.rs | 17 +- src/envelop.rs | 25 +- src/lib.rs | 6 +- src/receiver.rs | 19 +- src/receivers/mod.rs | 2 +- 21 files changed, 1171 insertions(+), 310 deletions(-) delete mode 100644 crates/remote/examples/basic.rs create mode 100644 crates/remote/examples/cert.der create mode 100644 crates/remote/examples/cert.key create mode 100644 crates/remote/examples/quic_client.rs create mode 100644 crates/remote/examples/quic_server.rs create mode 100644 crates/remote/src/proto.rs create mode 100644 crates/remote/src/relays/redis/mod.rs diff --git a/Cargo.toml b/Cargo.toml index cb94d6a..7b9867f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "messagebus" -version = "0.9.6" +version = "0.9.7" authors = ["Andrey Tkachenko "] repository = "https://github.com/andreytkachenko/messagebus.git" keywords = ["futures", "async", "tokio", "message", "bus"] @@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"] description = "MessageBus allows intercommunicate with messages between modules" license = "MIT OR Apache-2.0" exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] -edition = "2021" +edition = "2018" [workspace] members = [ diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index e425ab8..eedb621 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "messagebus_derive" -version = "0.2.1" +version = "0.2.2" authors = ["Andrey Tkachenko "] repository = "https://github.com/andreytkachenko/messagebus.git" keywords = ["futures", "async", "tokio", "message", "bus"] @@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"] description = "MessageBus allows intercommunicate with messages between modules" license = "MIT OR Apache-2.0" exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] -edition = "2021" +edition = "2018" [lib] proc-macro = true diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 0d18abc..bea8eb0 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -8,7 +8,7 @@ categories = ["network-programming", "asynchronous"] description = "MessageBus remote allows intercommunicate by messages between instances" license = "MIT OR Apache-2.0" exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] -edition = "2021" +edition = "2018" # [features] # quic = ["quinn"] @@ -16,7 +16,7 @@ edition = "2021" [dependencies] thiserror = "1.0" messagebus = { path="../../" } -tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time", "io-util"] } parking_lot = "0.11" quinn = "0.7" rmp = "0.8.10" @@ -28,8 +28,16 @@ futures = "0.3.17" cbor = "0.4.1" serde_cbor = "0.11.2" bytes = "1.1.0" +quinn-proto = "0.7.3" +rustls = "0.19.1" +redis = "0.21.2" +bitflags = "1.3.2" +serde_json = "1.0.68" [dev-dependencies] +anyhow = "1.0.44" +async-trait = "0.1.51" +thiserror = "1.0.29" tokio = { version = "1.11.0", features = ["full"] } # quinn = { version = "0.7", optional = true } diff --git a/crates/remote/examples/basic.rs b/crates/remote/examples/basic.rs deleted file mode 100644 index 7f755fb..0000000 --- a/crates/remote/examples/basic.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[tokio::main] -async fn main() {} diff --git a/crates/remote/examples/cert.der b/crates/remote/examples/cert.der new file mode 100644 index 0000000000000000000000000000000000000000..4509c788f134d749c07eaf0affe0c3e5400ed45b GIT binary patch literal 342 zcmXqLVhl29{JDUciIItkli`E#Rk7C!r~M4L*f_M>JkHs&Ff$n_8p<2UvN4CUF!P8N zC8ww6DHNyXq$w0^781vNrwb{UTUv@`)IL*ftY~^&WeWlua>(iqZq{p0WS#uz3=hTG1I46ePM gyKla9+?Dlb79Cmo@QcK(>pR}4X-gVie(5a;0J=7GQUCw| literal 0 HcmV?d00001 diff --git a/crates/remote/examples/cert.key b/crates/remote/examples/cert.key new file mode 100644 index 0000000000000000000000000000000000000000..0d401a820c8a1c2551e2f91629ac24dddc7cd289 GIT binary patch literal 138 zcmV;50CoQ`frkPC05B5<2P%e0&OHJF1_&yKNX|V20S5$aFlzz<0R$jBhr() + .register_shared_message::() + .register_relay(relay) + .build(); + + + b.ready().await; + println!("ready"); + + let resp: Resp = b.request(Req { + data: 12, + text: String::from("test") + }, Default::default()) + .await + .unwrap(); + + println!("resp {:?}", resp); + + b.flush().await; + b.close().await; + poller.await; +} diff --git a/crates/remote/examples/quic_server.rs b/crates/remote/examples/quic_server.rs new file mode 100644 index 0000000..b76e3e9 --- /dev/null +++ b/crates/remote/examples/quic_server.rs @@ -0,0 +1,85 @@ +use messagebus::{error, Message, derive::{Message, Error as MbError}, AsyncHandler, Bus}; +use messagebus_remote::relays::{QuicServerRelay}; +use serde_derive::{Serialize, Deserialize}; +use async_trait::async_trait; +use thiserror::Error; + + +#[derive(Debug, Error, MbError)] +enum Error { + #[error("Error({0})")] + Error(anyhow::Error), +} + +impl From> for Error { + fn from(err: error::Error) -> Self { + Self::Error(err.into()) + } +} + + +#[derive(Serialize, Deserialize, Debug, Clone, Message)] +#[namespace("example")] +#[message(shared, clone)] +pub struct Req { + data: i32, + text: String +} + +#[derive(Serialize, Deserialize, Debug, Clone, Message)] +#[namespace("example")] +#[message(shared, clone)] +pub struct Resp { + data: i32, + text: String +} + + +struct TmpReceiver; + +#[async_trait] +impl AsyncHandler for TmpReceiver { + type Error = Error; + type Response = Resp; + + async fn handle(&self, msg: Req, _bus: &Bus) -> Result { + println!("TmpReceiver::handle {:?}", msg); + Ok(Resp { + data: msg.data + 12, + text: format!("<< {} >>", msg.text), + }) + } + + async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> { + println!("TmpReceiver::sync"); + Ok(()) + } +} + + +#[tokio::main] +async fn main() { + let relay = QuicServerRelay::new( + "./examples/cert.key", + "./examples/cert.der", + "0.0.0.0:8083".parse().unwrap(), + vec![ + ("example::Req".into(), "example::Resp".into(), "GenericError".into()) + ] + ).unwrap(); + + let (b, poller) = Bus::build() + .register_shared_message::() + .register_shared_message::() + .register_relay(relay) + .register(TmpReceiver) + .subscribe_async::(8, Default::default()) + .done() + .build(); + + b.ready().await; + + println!("ready"); + + poller.await; +} diff --git a/crates/remote/src/error.rs b/crates/remote/src/error.rs index 49e4fcd..944f9da 100644 --- a/crates/remote/src/error.rs +++ b/crates/remote/src/error.rs @@ -24,4 +24,22 @@ pub enum Error { #[error("QuinnConnectError: {0}")] QuinnParseError(#[from] ParseError), + + #[error("ConfigError: {0}")] + ConfigError(#[from] quinn_proto::ConfigError), + + #[error("TLSError: {0}")] + TLSError(#[from] rustls::TLSError), + + #[error("Redis: {0}")] + Redis(#[from] redis::RedisError), + + #[error("ProtocolParseError")] + ProtocolParseError, + + #[error("UnknownCodec")] + UnknownCodec, + + #[error("SerdeErased {0}")] + SerdeErased(#[from] erased_serde::Error), } diff --git a/crates/remote/src/lib.rs b/crates/remote/src/lib.rs index b65150f..92dffa8 100644 --- a/crates/remote/src/lib.rs +++ b/crates/remote/src/lib.rs @@ -1,2 +1,3 @@ pub mod error; +pub mod proto; pub mod relays; diff --git a/crates/remote/src/proto.rs b/crates/remote/src/proto.rs new file mode 100644 index 0000000..67a7195 --- /dev/null +++ b/crates/remote/src/proto.rs @@ -0,0 +1,510 @@ +use messagebus::{Action, Bus, Event, SharedMessage, TypeTag}; +use serde_derive::{Deserialize, Serialize}; +use std::borrow::Cow; + +#[derive(Debug, Deserialize, Serialize)] +#[repr(u16)] +pub enum ProtocolHeaderActionKind { + Nop, + Error, + + Send, + Response, + BatchComplete, + + Flush, + Flushed, + + Synchronize, + Synchronized, + + Close, + Exited, + + Initialize, + Ready, + + Pause, +} + +#[derive(Debug, Copy, Clone, Deserialize, Serialize)] +#[non_exhaustive] +#[repr(u32)] +pub enum BodyType { + None, + Utf8, + Cbor, + MessagePack, + Json, + Bson, +} + +bitflags::bitflags! { + #[derive(Deserialize, Serialize)] + pub struct ProtocolHeaderFlags: u32 { + const TYPE_TAG = 0b00000001; + const BODY = 0b00000010; + const ERROR = 0b00000100; + const ARGUMENT = 0b00001000; + const TT_AND_ERROR = Self::TYPE_TAG.bits | Self::ERROR.bits; + const TT_AND_BODY = Self::TYPE_TAG.bits | Self::BODY.bits; + const TT_ERROR_AND_ARGUMENT = Self::TYPE_TAG.bits | Self::ERROR.bits | Self::ARGUMENT.bits; + const TT_BODY_AND_ARGUMENT = Self::TYPE_TAG.bits | Self::BODY.bits | Self::ARGUMENT.bits; + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ProtocolHeader<'a> { + pub kind: ProtocolHeaderActionKind, + pub type_tag: Option>, + pub flags: ProtocolHeaderFlags, + pub body_type: BodyType, + pub argument: u64, +} + +impl<'a> ProtocolHeader<'a> { + pub fn send(mid: u64, tt: &'a TypeTag, body_type: BodyType) -> ProtocolHeader<'a> { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Send, + type_tag: Some(tt.as_bytes().into()), + flags: ProtocolHeaderFlags::TT_BODY_AND_ARGUMENT, + body_type, + argument: mid, + } + } + + pub fn flush() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Flush, + type_tag: None, + flags: ProtocolHeaderFlags::empty(), + body_type: BodyType::None, + argument: 0, + } + } + + pub fn close() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Close, + type_tag: None, + flags: ProtocolHeaderFlags::empty(), + body_type: BodyType::None, + argument: 0, + } + } + + pub fn sync() -> Self { + ProtocolHeader { + kind: ProtocolHeaderActionKind::Synchronize, + type_tag: None, + flags: ProtocolHeaderFlags::empty(), + body_type: BodyType::None, + argument: 0, + } + } + + // pub fn init() -> Self { + // ProtocolHeader { + // kind: ProtocolHeaderActionKind::Initialize, + // type_tag: None, + // failed: false, + // body_encoding: 0, + // argument: 0, + // } + // } + + // pub fn pause() -> Self { + // ProtocolHeader { + // kind: ProtocolHeaderActionKind::Pause, + // type_tag: None, + // failed: false, + // body_encoding: 0, + // argument: 0, + // } + // } +} + +#[derive(Debug)] +pub enum ProtocolItem { + Nop, + Event(Event, messagebus::error::GenericError>), + Action(Action), + Send(u64, Box, bool) +} + +impl From for ProtocolItem { + fn from(action: Action) -> Self { + ProtocolItem::Action(action) + } +} + +impl From, messagebus::error::GenericError>> for ProtocolItem { + fn from(ev: Event, messagebus::error::GenericError>) -> Self { + ProtocolItem::Event(ev) + } +} + +impl From<(u64, Box, bool)> for ProtocolItem { + fn from(msg: (u64, Box, bool)) -> Self { + ProtocolItem::Send(msg.0, msg.1, msg.2) + } +} + +impl ProtocolItem { + pub fn unwrap_send(self) -> Result<(u64, Box, bool), ProtocolItem> { + match self { + ProtocolItem::Send(a, b, c) => Ok((a, b, c)), + other => Err(other) + } + } + + pub fn serialize(self, mut body_type: BodyType, body_buff: &mut Vec) -> Result, crate::error::Error> { + let mut argument = 0; + let mut type_tag = None; + let mut body = None; + let mut flags = ProtocolHeaderFlags::empty(); + + let kind = match self { + ProtocolItem::Nop => ProtocolHeaderActionKind::Nop, + ProtocolItem::Action(action) => match action { + Action::Close => ProtocolHeaderActionKind::Close, + Action::Flush => ProtocolHeaderActionKind::Flush, + Action::Init => ProtocolHeaderActionKind::Initialize, + Action::Sync => ProtocolHeaderActionKind::Synchronize, + _ => unimplemented!(), + } + ProtocolItem::Send(mid, msg, req) => { + let msg = msg.as_shared_boxed() + .map_err(|_| crate::error::Error::UnknownCodec)?; + + argument = mid; + flags.set(ProtocolHeaderFlags::ARGUMENT, req); + flags.set(ProtocolHeaderFlags::BODY, true); + flags.set(ProtocolHeaderFlags::TYPE_TAG, true); + type_tag = Some(msg.type_tag()); + body = Some(generic_serialize(body_type, &*msg, body_buff)?); + + ProtocolHeaderActionKind::Send + }, + ProtocolItem::Event(ev) => match ev { + Event::Response(mid, res) => { + argument = mid; + flags.set(ProtocolHeaderFlags::ARGUMENT, true); + flags.set(ProtocolHeaderFlags::BODY, true); + flags.set(ProtocolHeaderFlags::TYPE_TAG, true); + + match res { + Ok(msg) => { + let msg = msg.as_shared_boxed() + .map_err(|_| crate::error::Error::UnknownCodec)?; + + type_tag = Some(msg.type_tag()); + body = Some(generic_serialize(body_type, &*msg, body_buff)?); + } + + Err(err) => { + flags.set(ProtocolHeaderFlags::ERROR, true); + + type_tag = Some("GenericError".into()); + body_type = BodyType::Utf8; + body = Some(format!("{}", err).into_bytes().into()); + } + } + + ProtocolHeaderActionKind::Response + }, + Event::Error(err) => { + flags.set(ProtocolHeaderFlags::ERROR, true); + flags.set(ProtocolHeaderFlags::BODY, true); + flags.set(ProtocolHeaderFlags::TYPE_TAG, true); + + type_tag = Some("GenericError".into()); + body_type = BodyType::Utf8; + body = Some(format!("{}", err).into_bytes().into()); + + ProtocolHeaderActionKind::Error + } + Event::Finished(n) => { + argument = n; + flags.set(ProtocolHeaderFlags::ARGUMENT, true); + ProtocolHeaderActionKind::BatchComplete + }, + Event::Synchronized(res) => { + match res { + Ok(_) => {} + Err(err) => { + flags.set(ProtocolHeaderFlags::BODY, true); + flags.set(ProtocolHeaderFlags::ERROR, true); + flags.set(ProtocolHeaderFlags::TYPE_TAG, true); + + type_tag = Some("GenericError".into()); + body_type = BodyType::Utf8; + body = Some(format!("{}", err).into_bytes().into()); + } + } + ProtocolHeaderActionKind::Synchronized + } + Event::InitFailed(err) => { + flags.set(ProtocolHeaderFlags::BODY, true); + flags.set(ProtocolHeaderFlags::ERROR, true); + flags.set(ProtocolHeaderFlags::TYPE_TAG, true); + + type_tag = Some("GenericError".into()); + body_type = BodyType::Utf8; + body = Some(format!("{}", err).into_bytes().into()); + + ProtocolHeaderActionKind::Ready + }, + Event::Ready => ProtocolHeaderActionKind::Ready, + Event::Pause => ProtocolHeaderActionKind::Pause, + Event::Exited => ProtocolHeaderActionKind::Exited, + Event::Flushed => ProtocolHeaderActionKind::Flushed, + _ => unimplemented!() + } + }; + + Ok(ProtocolPacket { + header: ProtocolHeader { + kind, + type_tag: type_tag.map(|x| x.to_string().into_bytes().into()), + flags, + body_type, + argument, + }, + body + }) + } +} + + +#[derive(Debug, Deserialize, Serialize)] +pub struct ProtocolPacket<'a> { + pub header: ProtocolHeader<'a>, + pub body: Option>, +} + +impl<'a> ProtocolPacket<'a> { + pub fn deserialize( + self, + bus: &Bus, + ) -> Result + { + let type_tag: Option = if self.header.flags.contains(ProtocolHeaderFlags::TYPE_TAG) { + self.header + .type_tag + .map(|x| String::from_utf8_lossy(x.as_ref()).to_string().into()) + } else { + None + }; + + let (body, error) = if self.header.flags.contains(ProtocolHeaderFlags::ERROR) { + let error = messagebus::error::GenericError { + type_tag: type_tag.unwrap(), + description: self.body.map(|x|String::from_utf8_lossy(x.as_ref()).to_string()).unwrap_or_default(), + }; + + (None, Some(messagebus::error::Error::Other(error))) + } else if self.header.flags.contains(ProtocolHeaderFlags::TT_AND_BODY) { + let body = self.body.ok_or(crate::error::Error::ProtocolParseError)?; + let res = generic_deserialize(self.header.body_type, body.as_ref(), |de| { + bus.deserialize_message(type_tag.unwrap(), de) + .map_err(|x| x.map_msg(|_| ())) + })?; + + match res { + Ok(body) => (Some(body), None), + Err(err) => (None, Some(err)), + } + } else { + (None, None) + }; + + let argument = if self.header.flags.contains(ProtocolHeaderFlags::ARGUMENT) { + Some(self.header.argument) + } else { + None + }; + + Ok(ProtocolItem::Event(match self.header.kind { + ProtocolHeaderActionKind::Response => Event::Response( + argument + .ok_or(crate::error::Error::ProtocolParseError)?, + error + .map(Err) + .or_else(|| body.map(Ok)) + .ok_or(crate::error::Error::ProtocolParseError)?, + ), + ProtocolHeaderActionKind::Synchronized => { + Event::Synchronized(error.map(Err).unwrap_or(Ok(()))) + } + ProtocolHeaderActionKind::Error => { + Event::Error(error.ok_or(crate::error::Error::ProtocolParseError)?) + } + ProtocolHeaderActionKind::BatchComplete => Event::Finished(self.header.argument), + ProtocolHeaderActionKind::Flushed => Event::Flushed, + ProtocolHeaderActionKind::Exited => Event::Exited, + ProtocolHeaderActionKind::Ready => Event::Ready, + ProtocolHeaderActionKind::Pause => Event::Pause, + + other => return Ok(ProtocolItem::Action(match other { + ProtocolHeaderActionKind::Initialize => Action::Init, + ProtocolHeaderActionKind::Close => Action::Close, + ProtocolHeaderActionKind::Flush => Action::Flush, + ProtocolHeaderActionKind::Synchronize => Action::Sync, + ProtocolHeaderActionKind::Send => { + let req = argument.is_some(); + let mid = self.header.argument; + let body = body.ok_or(crate::error::Error::ProtocolParseError)?; + + return Ok(ProtocolItem::Send(mid, body, req)); + }, + ProtocolHeaderActionKind::Nop => return Ok(ProtocolItem::Nop), + + _ => unreachable!() + })), + })) + } +} + +fn generic_deserialize(k: BodyType, data: &[u8], f: F) -> Result +where + F: FnOnce(&mut dyn erased_serde::Deserializer<'_>) -> T, +{ + match k { + BodyType::Cbor => { + let mut cbor_de = serde_cbor::Deserializer::from_slice(data); + let mut de = ::erase(&mut cbor_de); + + Ok(f(&mut de)) + } + + BodyType::Json => { + let mut json_de = serde_json::Deserializer::from_slice(data); + let mut de = ::erase(&mut json_de); + + Ok(f(&mut de)) + } + + _ => Err(crate::error::Error::UnknownCodec), + } +} + +fn generic_serialize<'a>(kind: BodyType, msg: &dyn SharedMessage, buffer: &'a mut Vec) -> Result, crate::error::Error> { + match kind { + BodyType::Cbor => { + let mut cbor_se = serde_cbor::Serializer::new(&mut *buffer); + let mut se = ::erase(&mut cbor_se); + msg.erased_serialize(&mut se)?; + } + + BodyType::Json => { + let mut json_se = serde_json::Serializer::new(&mut *buffer); + let mut se = ::erase(&mut json_se); + msg.erased_serialize(&mut se)?; + } + + _ => return Err(crate::error::Error::UnknownCodec), + } + + Ok(buffer.as_slice().into()) +} + +#[cfg(test)] +mod tests { + use crate::proto::ProtocolItem; + + use super::{ + BodyType, ProtocolHeader, ProtocolHeaderActionKind, ProtocolHeaderFlags, ProtocolPacket, + }; + use messagebus::Event; + use messagebus::{derive::Message, Bus, Message, TypeTagged}; + use serde_derive::{Deserialize, Serialize}; + use std::borrow::Cow; + + #[derive(Serialize, Deserialize, Debug, Clone, Message)] + #[namespace("test")] + #[message(shared, clone)] + struct TestSharedMessage { + test: String, + value: i32, + } + + #[test] + fn test_proto_pack_event() { + let (bus, _) = Bus::build() + .register_shared_message::() + .build(); + + let pkt = ProtocolPacket { + header: ProtocolHeader { + kind: ProtocolHeaderActionKind::Response, + type_tag: Some(TestSharedMessage::type_tag_().as_bytes().to_vec().into()), + flags: ProtocolHeaderFlags::TT_BODY_AND_ARGUMENT, + body_type: BodyType::Json, + argument: 222, + }, + body: Some(Cow::Borrowed(br#"{"test":"my test","value":12}"#)), + }; + + let event = match pkt.deserialize(&bus).unwrap() { + ProtocolItem::Event(ev) => ev, + _ => unreachable!() + }; + + assert!(matches!(event, Event::Response(..))); + + match event { + Event::Response(mid, msg) => { + assert_eq!(mid, 222); + // assert!(msg.is_ok()); + let msg = msg.unwrap(); + + assert_eq!(msg.type_tag(), TestSharedMessage::type_tag_()); + let m: Box = msg.as_any_boxed().downcast().unwrap(); + + assert_eq!(m.value, 12); + assert_eq!(&m.test, "my test"); + } + _ => unreachable!(), + } + } + + #[test] + fn test_proto_pack_event_error() { + let (bus, _) = Bus::build() + .register_shared_message::() + .build(); + + let pkt = ProtocolPacket { + header: ProtocolHeader { + kind: ProtocolHeaderActionKind::Response, + type_tag: Some(TestSharedMessage::type_tag_().as_bytes().to_vec().into()), + flags: ProtocolHeaderFlags::TT_ERROR_AND_ARGUMENT, + body_type: BodyType::Utf8, + argument: 222, + }, + body: Some(Cow::Borrowed(br#"error description"#)), + }; + + let event = match pkt.deserialize(&bus).unwrap() { + ProtocolItem::Event(ev) => ev, + _ => unreachable!() + }; + + assert!(matches!(event, Event::Response(..))); + + #[allow(clippy::unit_cmp)] + match event { + Event::Response(mid, msg) => { + assert_eq!(mid, 222); + let msg = msg.unwrap_err(); + + assert!(matches!(msg, messagebus::error::Error::Other(val) if ( + assert_eq!(val.type_tag, TestSharedMessage::type_tag_()) == () && + assert_eq!(val.description, "error description") == () + ))); + } + _ => unreachable!(), + } + } +} diff --git a/crates/remote/src/relays/mod.rs b/crates/remote/src/relays/mod.rs index 36637d9..78101a2 100644 --- a/crates/remote/src/relays/mod.rs +++ b/crates/remote/src/relays/mod.rs @@ -1,6 +1,8 @@ // #[cfg(feature = "quic")] mod quic; +// mod redis; + use futures::Stream; use messagebus::{error::GenericError, Event, Message, TypeTag}; use std::{collections::HashMap, pin::Pin}; @@ -11,21 +13,15 @@ pub use quic::*; pub(crate) type GenericEventStream = Pin, GenericError>> + Send>>; - #[derive(Debug, Default)] -pub(crate) struct MessageTable { +pub struct MessageTable { table: HashMap>, } impl MessageTable { - pub fn new() -> Self { - Self { - table: HashMap::new(), - } - } - pub fn add(&mut self, req: TypeTag, resp: TypeTag, err: TypeTag) { - self.table.entry(req) + self.table + .entry(req) .or_insert_with(Vec::new) .push((resp, err)); } @@ -47,7 +43,6 @@ impl MessageTable { } } - impl From> for MessageTable { fn from(table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Self { let mut outgoing_table = MessageTable::default(); @@ -56,4 +51,4 @@ impl From> for MessageTable { } outgoing_table } -} \ No newline at end of file +} diff --git a/crates/remote/src/relays/quic/client.rs b/crates/remote/src/relays/quic/client.rs index 474b143..b184e63 100644 --- a/crates/remote/src/relays/quic/client.rs +++ b/crates/remote/src/relays/quic/client.rs @@ -1,15 +1,9 @@ -use crate::{ - error::Error, - relays::{GenericEventStream, MessageTable}, -}; -use futures::{Future, FutureExt, Stream, pin_mut}; -use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, SharedMessage, TypeTag, TypeTagAccept, TypeTagged, error::{GenericError, SendError}}; +use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}}; +use futures::{Future, FutureExt}; +use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept}; use parking_lot::Mutex; -use serde::Deserialize; -use serde_derive::{Deserialize, Serialize}; -use core::slice::SlicePattern; -use std::{collections::{HashMap, HashSet}, net::SocketAddr, pin::Pin, sync::atomic::AtomicBool, task::Poll}; -use tokio::{io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}, sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}}; +use std::{net::SocketAddr, sync::atomic::AtomicBool}; +use tokio::{sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}}; use bytes::{Buf, BufMut}; pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; @@ -74,15 +68,14 @@ pub struct QuicClientRelay { host: String, endpoint: QuicClientRelayEndpoint, outgoing_table: MessageTable, - sender: UnboundedSender, - receiver_send: Mutex, UnboundedReceiver)>>, + sender: UnboundedSender, + receiver_send: Mutex, UnboundedReceiver)>>, receiver_recv: Mutex>>, } impl QuicClientRelay { pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { let endpoint = QuicClientRelayEndpoint::new(cert)?; - let mut outgoing_table = MessageTable::from(table); let (sender, receiver) = mpsc::unbounded_channel(); let (recv_send, recv_recv) = oneshot::channel(); @@ -91,7 +84,7 @@ impl QuicClientRelay { addr, host, endpoint, - outgoing_table, + outgoing_table: MessageTable::from(table), sender, receiver_send: Mutex::new(Some((recv_send, receiver))), receiver_recv: Mutex::new(Some(recv_recv)), @@ -99,195 +92,6 @@ impl QuicClientRelay { } } -#[derive(Deserialize, Serialize)] -#[repr(u16)] -pub enum ProtocolHeaderActionKind { - Nop, - Send, - Response, - Flush, - Flushed, - Synchronize, - Synchronized, - BatchComplete, - Close, - Exited, - Initialize, - Ready, - Pause, - Paused, - Error, -} - -#[derive(Deserialize, Serialize)] -pub struct ProtocolHeader<'a> { - kind: ProtocolHeaderActionKind, - type_tag: Option<&'a [u8]>, - failed: bool, - body_encoding: u32, - argument: u64, -} - -impl<'a> ProtocolHeader<'a> { - pub fn send(mid: u64, tt: &'a TypeTag) -> ProtocolHeader<'a> { - ProtocolHeader { - kind: ProtocolHeaderActionKind::Send, - type_tag: Some(tt.as_bytes()), - failed: false, - body_encoding: 0, - argument: mid, - } - } - - pub fn flush() -> Self { - ProtocolHeader { - kind: ProtocolHeaderActionKind::Flush, - type_tag: None, - failed: false, - body_encoding: 0, - argument: 0, - } - } - - pub fn close() -> Self { - ProtocolHeader { - kind: ProtocolHeaderActionKind::Close, - type_tag: None, - failed: false, - body_encoding: 0, - argument: 0, - } - } - - pub fn sync() -> Self { - ProtocolHeader { - kind: ProtocolHeaderActionKind::Synchronize, - type_tag: None, - failed: false, - body_encoding: 0, - argument: 0, - } - } - - pub fn init() -> Self { - ProtocolHeader { - kind: ProtocolHeaderActionKind::Initialize, - type_tag: None, - failed: false, - body_encoding: 0, - argument: 0, - } - } - - pub fn pause() -> Self { - ProtocolHeader { - kind: ProtocolHeaderActionKind::Pause, - type_tag: None, - failed: false, - body_encoding: 0, - argument: 0, - } - } -} - -#[derive(Deserialize, Serialize)] -pub struct ProtocolPacket<'a> { - header: ProtocolHeader<'a>, - body: &'a [u8] -} - -#[derive(Debug)] -pub enum Request { - Init, - Flush, - Sync, - Close, - Stats, - Send(u64, Box, bool), -} - -impl Request { - pub async fn serialize(&self, mut header_buff: &mut Vec, body_buff: &mut Vec, conn: &mut W) -> Result<(), Error> { - body_buff.clear(); - header_buff.clear(); - - let mut tt = TypeTag::Borrowed(""); - let header = match self { - Request::Init => unimplemented!(), - Request::Flush => ProtocolHeader::flush(), - Request::Sync => ProtocolHeader::sync(), - Request::Close => ProtocolHeader::close(), - Request::Stats => unimplemented!(), - Request::Send(mid, msg, req) => { - tt = msg.type_tag(); - - let mut cbor_se = serde_cbor::Serializer::new(&mut *body_buff); - let mut se = ::erase(&mut cbor_se); - - msg.erased_serialize(&mut se); - - ProtocolHeader::send(if *req {*mid} else {0}, &tt) - } - }; - - serde_cbor::to_writer(&mut header_buff, &ProtocolPacket { - header, - body: body_buff.as_slice(), - }).unwrap(); - - let mut buf = [0u8; 16]; - let mut writer = &mut buf[..]; - - writer.put(&b"MBUS"[..]); - writer.put_u16(1); - writer.put_u16(0); - writer.put_u64(header_buff.len() as _); - - conn.write_all(&buf).await?; - conn.write_all(&header_buff).await?; - - Ok(()) - // buff. - - - // let x = match self { - // Request::Init => unimplemented!(), - // Request::Flush => ProtocolHeader::Flush, - // Request::Sync => ProtocolHeader::Synchronize, - // Request::Close => ProtocolHeader::Close, - // Request::Stats => unimplemented!(), - // Request::Send(_mid, msg) => { - // let ser = serde_cbor::Serializer::new(buff); - - // ProtocolHeader::Send(msg.type_tag()) - // } - - // Request::Request(mid, msg) => { - // ProtocolHeader::Request(*mid, msg.type_tag()) - // } - // }; - } -} - -impl From for Request { - fn from(action: Action) -> Self { - match action { - Action::Init => Request::Init, - Action::Flush => Request::Flush, - Action::Sync => Request::Sync, - Action::Close => Request::Close, - Action::Stats => Request::Stats, - _ => unimplemented!(), - } - } -} - -impl From<(bool, u64, Box)> for Request { - fn from((req, mid, msg): (bool, u64, Box)) -> Self { - Request::Send(mid, msg, req) - } -} - impl TypeTagAccept for QuicClientRelay { fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { self.outgoing_table.accept(msg, resp, err) @@ -312,13 +116,33 @@ impl SendUntypedReceiver for QuicClientRelay { let conn = self.endpoint.connect(&self.addr, &self.host); tokio::spawn(async move { + println!("spawn"); let mut conn = conn.await.unwrap(); sender.send((conn.recv, conn.connection)).unwrap(); - let mut buf1 = Vec::new(); - let mut buf2 = Vec::new(); + let mut body_buff = Vec::new(); + let mut header_buff = Vec::new(); while let Some(r) = rx.recv().await { - r.serialize(&mut buf1, &mut buf2, &mut conn.send); + body_buff.clear(); + header_buff.clear(); + + let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap(); + serde_cbor::to_writer(&mut header_buff, &pkt).unwrap(); + + println!("msg {:?}", pkt); + + let mut buf = [0u8; 16]; + let mut writer = &mut buf[..]; + + writer.put(&b"MBUS"[..]); + writer.put_u16(1); + writer.put_u16(0); + writer.put_u64(header_buff.len() as _); + + conn.send.write_all(&buf).await.unwrap(); + println!("header sent"); + conn.send.write_all(&header_buff).await.unwrap(); + println!("body sent"); } }); } @@ -336,11 +160,16 @@ impl SendUntypedReceiver for QuicClientRelay { req: bool, _bus: &Bus, ) -> Result<(), messagebus::error::Error>> { - if let Ok(val) = msg.as_shared_boxed() { - self.sender.send((req, mid, msg).into()).unwrap(); - Ok(()) - } else { - Err(SendError:) + match msg.as_shared_boxed() { + Ok(msg) => { + if let Err(err) = self.sender.send((mid, msg, req).into()) { + Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box())) + } else { + Ok(()) + } + } + + Err(msg) => Err(messagebus::error::Error::TryAgain(msg)), } } } @@ -360,7 +189,7 @@ impl ReciveUntypedReceiver for QuicClientRelay { if first { return Some((Event::Ready, (false, recv, conn, bus, buff))); } - + unsafe { buff.set_len(16) }; recv.read_exact(&mut buff).await.unwrap(); @@ -386,64 +215,8 @@ impl ReciveUntypedReceiver for QuicClientRelay { let event = match content_type { 0 => { // CBOR let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap(); - - use ProtocolHeaderActionKind::*; - match proto.header.kind { - Nop => unimplemented!(), - Response => { - let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); - - let res = if proto.header.failed { - Err( - messagebus::error::Error::Other(GenericError { - type_tag: tt.into(), - description: "unknown".into() - } - )) - } else { - let mut cbor_de = serde_cbor::Deserializer::from_slice(proto.body); - let mut de = ::erase(&mut cbor_de); - - bus.deserialize_message(tt.into(), &mut de) - .map_err(|x| x.map_msg(|_| ())) - }; - - Event::Response(proto.header.argument, res) - } - Ready => { - if proto.header.failed { - let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); - Event::InitFailed(messagebus::error::Error::Other(GenericError { - type_tag: tt.into(), - description: "unknown".into() - })) - } else { - Event::Ready - } - } - Exited => Event::Exited, - Pause => Event::Pause, - BatchComplete => Event::Finished(proto.header.argument), - Flushed => Event::Flushed, - Error => { - let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); - Event::Error(GenericError { - type_tag: tt.into(), - description: "unknown".into() - }) - } - Synchronized => { - if proto.header.failed { - let tt = String::from_utf8_lossy(proto.header.type_tag.unwrap()).to_string(); - Event::Synchronized( Err(messagebus::error::Error::Other(GenericError { - type_tag: tt.into(), - description: "unknown".into() - }))) - } else { - Event::Synchronized(Ok(())) - } - }, - + match proto.deserialize(&bus).unwrap() { + ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), _ => unimplemented!() } }, diff --git a/crates/remote/src/relays/quic/mod.rs b/crates/remote/src/relays/quic/mod.rs index 67a445b..6f99013 100644 --- a/crates/remote/src/relays/quic/mod.rs +++ b/crates/remote/src/relays/quic/mod.rs @@ -3,3 +3,6 @@ mod server; pub use client::QuicClientRelay; pub use server::QuicServerRelay; + + + diff --git a/crates/remote/src/relays/quic/server.rs b/crates/remote/src/relays/quic/server.rs index 5efbf15..2172e74 100644 --- a/crates/remote/src/relays/quic/server.rs +++ b/crates/remote/src/relays/quic/server.rs @@ -1 +1,258 @@ -pub struct QuicServerRelay {} +use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}}; +use futures::{Future, FutureExt, StreamExt}; +use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept}; +use parking_lot::Mutex; +use std::{net::SocketAddr, sync::{Arc, atomic::AtomicBool}}; +use tokio::{sync::{mpsc::{self, UnboundedSender, UnboundedReceiver}, oneshot}}; +use bytes::{Buf, BufMut}; + +pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; + +pub struct QuicServerRelayEndpoint { + endpoint: quinn::Endpoint, + incoming: Mutex>, +} + +impl QuicServerRelayEndpoint { + pub fn new(key_path: &str, cert_path: &str, addr: &SocketAddr) -> Result { + let mut transport_config = quinn::TransportConfig::default(); + transport_config.max_concurrent_uni_streams(0)?; + + let mut server_config = quinn::ServerConfig::default(); + server_config.transport = Arc::new(transport_config); + + let mut server_config = quinn::ServerConfigBuilder::new(server_config); + + server_config.protocols(ALPN_QUIC_HTTP); + server_config.enable_keylog(); + + let key = std::fs::read(key_path)?; + let cert_der = std::fs::read(cert_path)?; + + let key = quinn::PrivateKey::from_der(&key)?; + let cert_chain = quinn::Certificate::from_der(&cert_der)?; + + let cert = quinn::CertificateChain::from_certs([cert_chain]); + + server_config.certificate(cert, key)?; + + let mut endpoint = quinn::Endpoint::builder(); + endpoint.listen(server_config.build()); + + let (endpoint, incoming) = endpoint.bind(addr)?; + + Ok(Self { endpoint, incoming: Mutex::new(Some(incoming)) }) + } + + pub fn incoming(&self) -> impl Future> { + let mut conn = self.incoming.lock().take().unwrap(); + + async move { + let conn = conn.next().await.unwrap(); + let quinn::NewConnection { connection, .. } = conn.await?; + + Ok(QuicServerConnection { + connection, + }) + } + } + + pub fn connect( + &self, + addr: &SocketAddr, + host: &str, + ) -> impl Future> { + let conn = self.endpoint.connect(addr, host); + + async move { + let quinn::NewConnection { connection, .. } = conn?.await?; + + Ok(QuicServerConnection { + connection, + }) + } + } + + #[inline] + pub async fn wait_idle(&self) { + self.endpoint.wait_idle().await; + } +} + +pub struct QuicServerConnection { + connection: quinn::Connection, +} + +pub struct QuicServerRelay { + ready_flag: AtomicBool, + endpoint: QuicServerRelayEndpoint, + outgoing_table: MessageTable, + sender: UnboundedSender, + receiver_send: Mutex, UnboundedReceiver)>>, + receiver_recv: Mutex>>, +} + +impl QuicServerRelay { + pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { + let endpoint = QuicServerRelayEndpoint::new(key_path, cert_path, &addr)?; + let (sender, receiver) = mpsc::unbounded_channel(); + let (recv_send, recv_recv) = mpsc::unbounded_channel(); + + Ok(Self { + ready_flag: AtomicBool::new(false), + endpoint, + outgoing_table: MessageTable::from(table), + sender, + receiver_send: Mutex::new(Some((recv_send, receiver))), + receiver_recv: Mutex::new(Some(recv_recv)), + }) + } +} + +impl TypeTagAccept for QuicServerRelay { + fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { + self.outgoing_table.accept(msg, resp, err) + } + + fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { + let iter = self.outgoing_table.iter_types(); + + for (m, r, e) in iter { + if cb(m, r, e) { + return; + } + } + } +} + +impl SendUntypedReceiver for QuicServerRelay { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { + match msg { + Action::Init => { + let (sender, mut rx) = self.receiver_send.lock().take().unwrap(); + let conn = self.endpoint.incoming(); + + tokio::spawn(async move { + let conn = conn.await.unwrap(); + + let mut body_buff = Vec::new(); + let mut header_buff = Vec::new(); + + while let Some(r) = rx.recv().await { + let (mut send, recv) = conn.connection.open_bi().await.unwrap(); + + sender.send(recv).unwrap(); + + body_buff.clear(); + header_buff.clear(); + + let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap(); + serde_cbor::to_writer(&mut header_buff, &pkt).unwrap(); + + let mut buf = [0u8; 16]; + let mut writer = &mut buf[..]; + + writer.put(&b"MBUS"[..]); + writer.put_u16(1); + writer.put_u16(0); + writer.put_u64(header_buff.len() as _); + + send.write_all(&buf).await.unwrap(); + send.write_all(&header_buff).await.unwrap(); + } + }); + } + + other => self.sender.send(other.into()).unwrap(), + } + + Ok(()) + } + + fn send_msg( + &self, + mid: u64, + msg: Box, + req: bool, + _bus: &Bus, + ) -> Result<(), messagebus::error::Error>> { + match msg.as_shared_boxed() { + Ok(msg) => { + if let Err(err) = self.sender.send((mid, msg, req).into()) { + Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box())) + } else { + Ok(()) + } + } + + Err(msg) => Err(messagebus::error::Error::TryAgain(msg)), + } + } +} + +impl ReciveUntypedReceiver for QuicServerRelay { + type Stream = GenericEventStream; + + fn event_stream(&self, bus: Bus) -> Self::Stream { + let mut recv = self.receiver_recv.lock().take().unwrap(); + + Box::pin(futures::stream::poll_fn(move |cx|recv.poll_recv(cx)) + .map(move |recv| { + let buff = Vec::with_capacity(16); + let bus = bus.clone(); + + futures::stream::unfold( + (true, recv, bus, buff), + |(first, mut recv, bus, mut buff)| async move { + if first { + return Some((Event::Ready, (false, recv, bus, buff))); + } + + println!("1"); + + unsafe { buff.set_len(16) }; + recv.read_exact(&mut buff).await.unwrap(); + + println!("{:?}", buff); + + let mut reader = &buff[..]; + let mut sign = [0u8; 4]; + reader.copy_to_slice(&mut sign); + assert!(&sign != b"MBUS"); + + let version = reader.get_u16(); + assert!(version == 1); + + let content_type = reader.get_u16(); + + let body_size = reader.get_u64(); + let diff = buff.capacity() as i64 - body_size as i64; + if diff < 0 { + buff.reserve(-diff as usize); + } + + unsafe { buff.set_len(body_size as usize); } + recv.read_exact(&mut buff).await.unwrap(); + + println!("{:?}", buff); + + let event = match content_type { + 0 => { // CBOR + let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap(); + + match proto.deserialize(&bus).unwrap() { + ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), + _ => unimplemented!() + } + }, + _ => unimplemented!() + }; + + Some((event, (false, recv, bus, buff))) + }, + ) + }) + .flatten() + ) + } +} \ No newline at end of file diff --git a/crates/remote/src/relays/redis/mod.rs b/crates/remote/src/relays/redis/mod.rs new file mode 100644 index 0000000..805acaf --- /dev/null +++ b/crates/remote/src/relays/redis/mod.rs @@ -0,0 +1,124 @@ +use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept}; + +use crate::error::Error; + +use super::{GenericEventStream, MessageTable}; +use futures::StreamExt; + +pub struct RedisRelay { + in_table: MessageTable, + out_table: MessageTable, +} + +impl RedisRelay { + fn new(uri: &str) -> Result { + let client = redis::Client::open("redis://127.0.0.1/")?; + unimplemented!() + + + + // let mut publish_conn = client.get_async_connection().await?; + // let mut pubsub_conn = client.get_async_connection().await?.into_pubsub(); + + // pubsub_conn.subscribe("wavephone").await?; + // let mut pubsub_stream = pubsub_conn.on_message(); + + // publish_conn.publish("wavephone", "banana").await?; + + // let pubsub_msg: String = pubsub_stream.next().await.unwrap().get_payload()?; + // assert_eq!(&pubsub_msg, "banana"); + + // Ok(()) + } +} + +impl TypeTagAccept for RedisRelay { + fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { + self.out_table.accept(msg, resp, err) + } + + fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { + let iter = self.out_table.iter_types(); + + for (m, r, e) in iter { + if cb(m, r, e) { + return; + } + } + } +} + +impl SendUntypedReceiver for RedisRelay { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { + match msg { + Action::Init => { + // let (sender, mut rx) = self.receiver_send.lock().take().unwrap(); + // let conn = self.endpoint.incoming(); + + // tokio::spawn(async move { + // let mut conn = conn.await.unwrap(); + // sender.send((conn.recv, conn.connection)).unwrap(); + // let mut buf1 = Vec::new(); + // let mut buf2 = Vec::new(); + + // while let Some(r) = rx.recv().await { + // r.serialize(&mut buf1, &mut buf2, &mut conn.send) + // .await + // .unwrap(); + + // } + // }); + } + + other => self.sender.send(other.into()).unwrap(), + } + + Ok(()) + } + + fn send_msg( + &self, + mid: u64, + msg: Box, + req: bool, + _bus: &Bus, + ) -> Result<(), messagebus::error::Error>> { + match msg.as_shared_boxed() { + Ok(msg) => { + if let Err(err) = self.sender.send((req, mid, msg).into()) { + Err(messagebus::error::Error::TryAgain(err.0.unwrap_msg().unwrap())) + } else { + Ok(()) + } + } + + Err(msg) => Err(messagebus::error::Error::TryAgain(msg)), + } + } +} + +impl ReciveUntypedReceiver for RedisRelay { + type Stream = GenericEventStream; + + fn event_stream(&self, bus: Bus) -> Self::Stream { + let recevier = self.recevier.lock().take().unwrap(); + + Box::pin( + futures::stream::unfold((recevier, bus), |(recv, bus)| async move { + if let Ok(r) = recv.next().await? { + let stream = futures::stream::unfold((true, r, bus.clone()), |(first, r, bus)| async move { + if first { + return Some((Event::Ready, (false, r, bus))); + } + + Some((Event::Pause, (false, r, bus))) + }); + + Some((stream.left_stream(), (recv, bus))) + } else { + Some((futures::stream::once(async move { Event::Pause }).right_stream(), (recv, bus))) + } + }).flatten() + ) + } +} diff --git a/src/builder.rs b/src/builder.rs index 553fc94..3af2bed 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -6,20 +6,13 @@ use futures::{Future, FutureExt}; use smallvec::SmallVec; use tokio::sync::Mutex; -use crate::{ - envelop::TypeTag, - error::{Error, StdSyncSendError}, - receiver::{ +use crate::{AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, Message, Relay, SharedMessage, SynchronizedHandler, Untyped, envelop::{IntoSharedMessage, TypeTag}, error::{Error, StdSyncSendError}, receiver::{ BusPollerCallback, Receiver, ReciveTypedReceiver, SendTypedReceiver, SendUntypedReceiver, UntypedPollerCallback, - }, - receivers, AsyncBatchHandler, AsyncBatchSynchronizedHandler, AsyncHandler, - AsyncSynchronizedHandler, BatchHandler, BatchSynchronizedHandler, Bus, BusInner, Handler, - IntoBoxedMessage, Message, Relay, SynchronizedHandler, Untyped, -}; + }, receivers}; type MessageDeserializerCallback = Box< - dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result, Error> + dyn Fn(&mut dyn erased_serde::Deserializer<'_>) -> Result, Error> + Send + Sync, >; @@ -219,7 +212,7 @@ impl MessageTypeDescriptor { pub fn deserialize_boxed( &self, de: &mut dyn erased_serde::Deserializer<'_>, - ) -> Result, Error> { + ) -> Result, Error> { (self.de)(de) } } @@ -249,7 +242,7 @@ impl Module { self.message_types.insert( M::type_tag_(), MessageTypeDescriptor { - de: Box::new(move |de| Ok(M::deserialize(de)?.into_boxed())), + de: Box::new(move |de| Ok(M::deserialize(de)?.into_shared())), }, ); diff --git a/src/envelop.rs b/src/envelop.rs index 575e1e0..f15d440 100644 --- a/src/envelop.rs +++ b/src/envelop.rs @@ -154,8 +154,29 @@ impl IntoBoxedMessage for T { } } -pub trait SharedMessage: Message + erased_serde::Serialize {} -impl SharedMessage for T {} +pub trait IntoSharedMessage { + fn into_shared(self) -> Box; +} + +impl IntoSharedMessage for T { + fn into_shared(self) -> Box { + Box::new(self) + } +} + + +pub trait SharedMessage: Message + erased_serde::Serialize { + fn upcast_arc(self: Arc) -> Arc; + fn upcast_box(self: Box) -> Box; + fn upcast_ref(&self) -> &dyn Message; + fn upcast_mut(&mut self) -> &mut dyn Message; +} +impl SharedMessage for T { + fn upcast_arc(self: Arc) -> Arc { self } + fn upcast_box(self: Box) -> Box { self } + fn upcast_ref(&self) -> &dyn Message { self } + fn upcast_mut(&mut self) -> &mut dyn Message { self } +} // pub trait IntoTakeable { // fn into_takeable(&mut self) -> Takeable<'_>; diff --git a/src/lib.rs b/src/lib.rs index 11a3a1a..353dd45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -553,7 +553,7 @@ impl Bus { if let Some(rs) = self.inner.receivers.get(&tt).and_then(|rs| rs.first()) { let msg = self.deserialize_message(tt.clone(), de)?; - Ok(rs.send_boxed(self, mid, msg, false, rs.reserve(&tt).await)?) + Ok(rs.send_boxed(self, mid, msg.upcast_box(), false, rs.reserve(&tt).await)?) } else { Err(Error::NoReceivers) } @@ -578,7 +578,7 @@ impl Bus { rc.send_boxed( self, mid | 1 << (usize::BITS - 1), - msg, + msg.upcast_box(), true, rc.reserve(&tt).await, )?; @@ -593,7 +593,7 @@ impl Bus { &self, tt: TypeTag, de: &mut dyn erased_serde::Deserializer<'_>, - ) -> Result, Error>> { + ) -> Result, Error>> { let md = self .inner .message_types diff --git a/src/receiver.rs b/src/receiver.rs index a765cd9..0276267 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -152,7 +152,7 @@ pub enum Event { Response(u64, Result>), Synchronized(Result<(), Error<(), E>>), Finished(u64), - Error(E), + Error(Error<(), E>), InitFailed(Error<(), E>), Stats(Stats), Flushed, @@ -161,6 +161,23 @@ pub enum Event { Pause, } +impl Event { + pub fn map_msg U>(self, f: F) -> Event { + match self { + Event::Response(mid, res) => Event::Response(mid, res.map(f)), + Event::Synchronized(res) => Event::Synchronized(res), + Event::Finished(cnt) => Event::Finished(cnt), + Event::Error(err) => Event::Error(err), + Event::InitFailed(err) => Event::InitFailed(err), + Event::Stats(st) => Event::Stats(st), + Event::Flushed => Event::Flushed, + Event::Exited => Event::Exited, + Event::Ready => Event::Ready, + Event::Pause => Event::Pause, + } + } +} + struct ReceiverWrapper where M: Message, diff --git a/src/receivers/mod.rs b/src/receivers/mod.rs index adc176a..0c3a19f 100644 --- a/src/receivers/mod.rs +++ b/src/receivers/mod.rs @@ -40,7 +40,7 @@ macro_rules! process_batch_result { .unwrap(); } - $stx.send(Event::Error(er)).unwrap(); + $stx.send(Event::Error(Error::Other(er))).unwrap(); } } };