diff --git a/.gitignore b/.gitignore index 2c6804a..ce31095 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target -/derive/target +/crates/*/target Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 90d2ad3..682db7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,6 @@ license = "MIT OR Apache-2.0" exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] edition = "2018" -[features] -quic = ["quinn"] - [dependencies] messagebus_derive = "0.1" tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } @@ -27,8 +24,6 @@ erased-serde = "0.3" serde = "1" serde_derive = "1" dashmap = "4.0.2" -quinn = { version = "0.7", optional = true } - [dev-dependencies] anyhow = "1.0" diff --git a/derive/Cargo.toml b/crates/derive/Cargo.toml similarity index 100% rename from derive/Cargo.toml rename to crates/derive/Cargo.toml diff --git a/derive/src/lib.rs b/crates/derive/src/lib.rs similarity index 97% rename from derive/src/lib.rs rename to crates/derive/src/lib.rs index 97e739d..c7efe25 100644 --- a/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -55,7 +55,11 @@ fn clone_part(ast: &syn::DeriveInput, has_clone: bool) -> proc_macro2::TokenStre } } -fn type_tag_part(ast: &syn::DeriveInput, type_tag: Option, namespace: Option) -> proc_macro2::TokenStream { +fn type_tag_part( + ast: &syn::DeriveInput, + type_tag: Option, + namespace: Option, +) -> proc_macro2::TokenStream { let class_name = &ast.ident; let name = if let Some(tt) = type_tag { tt @@ -228,7 +232,8 @@ pub fn derive_message(input: TokenStream) -> TokenStream { match &mut param.value_mut() { syn::GenericParam::Lifetime(_) => {} syn::GenericParam::Type(param) => { - let bound: syn::TypeParamBound = syn::parse_str("messagebus::MessageBounds").unwrap(); + let bound: syn::TypeParamBound = + syn::parse_str("messagebus::MessageBounds").unwrap(); param.bounds.push(bound); } syn::GenericParam::Const(_param) => {} diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml new file mode 100644 index 0000000..482021c --- /dev/null +++ b/crates/remote/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "messagebus_remote" +version = "0.1.0" +authors = ["Andrey Tkachenko "] +repository = "https://github.com/andreytkachenko/messagebus.git" +keywords = ["futures", "async", "tokio", "message", "bus", "quic", "remote", "rpc", "parallel", "computing"] +categories = ["network-programming", "asynchronous"] +description = "MessageBus remote allows intercommunicate by messages between instances" +license = "MIT OR Apache-2.0" +exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] +edition = "2018" + +# [features] +# quic = ["quinn"] + +[dependencies] +messagebus = "0.9" +tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } +parking_lot = "0.11" +quinn = { version = "0.7" } +thiserror = "1.0.29" diff --git a/crates/remote/src/error.rs b/crates/remote/src/error.rs new file mode 100644 index 0000000..9caf34a --- /dev/null +++ b/crates/remote/src/error.rs @@ -0,0 +1,28 @@ +use quinn::{ConnectError, ConnectionError, ParseError, EndpointError, WriteError, ReadToEndError}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("IO Error: {0}")] + Io(#[from] std::io::Error), + + // #[error("ReadToEnd: {0}")] + // ReadToEnd(#[from] ReadToEndError), + #[error("ConnectionError: {0}")] + ConnectionError(#[from ] ConnectionError), + + #[error("ConnectError: {0}")] + ConnectError(#[from ] ConnectError), + + #[error("EndpointError: {0}")] + EndpointError(#[from ] EndpointError), + + #[error("WriteError: {0}")] + WriteError(#[from ] WriteError), + + #[error("ReadToEndError: {0}")] + ReadToEndError(#[from ] ReadToEndError), + + #[error("QuinnConnectError: {0}")] + QuinnParseError(#[from] ParseError), + +} \ No newline at end of file diff --git a/crates/remote/src/lib.rs b/crates/remote/src/lib.rs new file mode 100644 index 0000000..385a0cc --- /dev/null +++ b/crates/remote/src/lib.rs @@ -0,0 +1,2 @@ +pub mod relays; +pub mod error; diff --git a/crates/remote/src/relays/mod.rs b/crates/remote/src/relays/mod.rs new file mode 100644 index 0000000..f73dbbf --- /dev/null +++ b/crates/remote/src/relays/mod.rs @@ -0,0 +1,5 @@ +// #[cfg(feature = "quic")] +mod quic; + +// #[cfg(feature = "quic")] +pub use quic::*; diff --git a/crates/remote/src/relays/quic/client.rs b/crates/remote/src/relays/quic/client.rs new file mode 100644 index 0000000..883f56f --- /dev/null +++ b/crates/remote/src/relays/quic/client.rs @@ -0,0 +1,51 @@ +use std::net::SocketAddr; +use crate::error::Error; + +pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; + +pub struct QuicClientRelay { + endpoint: quinn::Endpoint, +} + +impl QuicClientRelay { + pub fn new(cert: &str) -> Result { + let mut client_config = quinn::ClientConfigBuilder::default(); + client_config.protocols(ALPN_QUIC_HTTP); + client_config.enable_keylog(); + + let cert_der = std::fs::read(cert)?; + let cert = quinn::Certificate::from_der(&cert_der)?; + + client_config.add_certificate_authority(cert).unwrap(); + + let mut endpoint = quinn::Endpoint::builder(); + endpoint.default_client_config(client_config.build()); + + let (endpoint, _) = endpoint.bind(&"0.0.0.0:0".parse().unwrap())?; + + Ok(Self { endpoint }) + } + + pub async fn connect(&self, addr: SocketAddr, host: &str) -> Result<(), Error> { + let new_conn = self.endpoint + .connect(&addr, host)? + .await?; + + let quinn::NewConnection { connection: conn, .. } = new_conn; + + let (mut send, recv) = conn.open_bi().await?; + + send.write_all("hi".as_bytes()).await?; + send.finish().await?; + + let _resp = recv + .read_to_end(usize::max_value()) + .await?; + + conn.close(0u32.into(), b"done"); + + self.endpoint.wait_idle().await; + + Ok(()) + } +} diff --git a/src/relays/quic/mod.rs b/crates/remote/src/relays/quic/mod.rs similarity index 100% rename from src/relays/quic/mod.rs rename to crates/remote/src/relays/quic/mod.rs diff --git a/crates/remote/src/relays/quic/server.rs b/crates/remote/src/relays/quic/server.rs new file mode 100644 index 0000000..ef8232a --- /dev/null +++ b/crates/remote/src/relays/quic/server.rs @@ -0,0 +1,3 @@ +pub struct QuicServerRelay { + +} diff --git a/src/lib.rs b/src/lib.rs index 3a2275f..f03b4e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ mod handler; mod receiver; pub mod receivers; mod relay; -pub mod relays; mod stats; mod trait_object; @@ -42,7 +41,7 @@ pub use handler::*; pub use relay::Relay; pub use receiver::{ Action, Event, ReciveTypedReceiver, - ReciveUnypedReceiver, SendTypedReceiver, + ReciveUntypedReceiver, SendTypedReceiver, SendUntypedReceiver, TypeTagAccept, }; diff --git a/src/receiver.rs b/src/receiver.rs index f18b1b7..004b265 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -55,7 +55,7 @@ where fn poll_events(&self, ctx: &mut Context<'_>, bus: &Bus) -> Poll>; } -pub trait ReciveUnypedReceiver: Sync { +pub trait ReciveUntypedReceiver: Sync { fn poll_events( &self, ctx: &mut Context<'_>, diff --git a/src/relay.rs b/src/relay.rs index 278b391..558af90 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -5,7 +5,7 @@ use crate::{ TypeTagAccept, }, stats::Stats, - Bus, Event, Message, Permit, ReciveUnypedReceiver, TypeTag, + Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag, }; use dashmap::DashMap; use std::sync::Arc; @@ -16,8 +16,8 @@ use core::{ use futures::{future::poll_fn, Future}; use tokio::sync::{oneshot, Notify}; -pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUnypedReceiver + 'static {} -impl Relay for T {} +pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUntypedReceiver + 'static {} +impl Relay for T {} struct SlabCfg; impl sharded_slab::Config for SlabCfg { diff --git a/src/relays/mod.rs b/src/relays/mod.rs deleted file mode 100644 index e79c1ec..0000000 --- a/src/relays/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[cfg(feature = "quic")] -mod quic; - -pub enum AuthKind { - Token(String), -} - -#[cfg(feature = "quic")] -pub use quic::*; diff --git a/src/relays/quic/client.rs b/src/relays/quic/client.rs deleted file mode 100644 index e34db86..0000000 --- a/src/relays/quic/client.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct QuicClientRelay {} diff --git a/src/relays/quic/server.rs b/src/relays/quic/server.rs deleted file mode 100644 index 5efbf15..0000000 --- a/src/relays/quic/server.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct QuicServerRelay {} diff --git a/tests/test_relay.rs b/tests/test_relay.rs index ca901bc..70eacee 100644 --- a/tests/test_relay.rs +++ b/tests/test_relay.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use messagebus::{ derive::{Error as MbError, Message}, error::{self, GenericError}, - receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUnypedReceiver, + receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUntypedReceiver, SendUntypedReceiver, TypeTagAccept, TypeTagged, }; use parking_lot::Mutex; @@ -153,7 +153,7 @@ impl SendUntypedReceiver for TestRelay { } } -impl ReciveUnypedReceiver for TestRelay { +impl ReciveUntypedReceiver for TestRelay { fn poll_events( &self, ctx: &mut Context<'_>,