MessageBus Remote split out; crate reorganization

This commit is contained in:
Andrey Tkachenko 2021-09-16 16:37:45 +04:00
parent c9ea551e0d
commit a0544f01e4
18 changed files with 125 additions and 27 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
/target
/derive/target
/crates/*/target
Cargo.lock

View File

@ -10,9 +10,6 @@ license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
[features]
quic = ["quinn"]
[dependencies]
messagebus_derive = "0.1"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
@ -27,8 +24,6 @@ erased-serde = "0.3"
serde = "1"
serde_derive = "1"
dashmap = "4.0.2"
quinn = { version = "0.7", optional = true }
[dev-dependencies]
anyhow = "1.0"

View File

@ -55,7 +55,11 @@ fn clone_part(ast: &syn::DeriveInput, has_clone: bool) -> proc_macro2::TokenStre
}
}
fn type_tag_part(ast: &syn::DeriveInput, type_tag: Option<String>, namespace: Option<String>) -> proc_macro2::TokenStream {
fn type_tag_part(
ast: &syn::DeriveInput,
type_tag: Option<String>,
namespace: Option<String>,
) -> proc_macro2::TokenStream {
let class_name = &ast.ident;
let name = if let Some(tt) = type_tag {
tt
@ -228,7 +232,8 @@ pub fn derive_message(input: TokenStream) -> TokenStream {
match &mut param.value_mut() {
syn::GenericParam::Lifetime(_) => {}
syn::GenericParam::Type(param) => {
let bound: syn::TypeParamBound = syn::parse_str("messagebus::MessageBounds").unwrap();
let bound: syn::TypeParamBound =
syn::parse_str("messagebus::MessageBounds").unwrap();
param.bounds.push(bound);
}
syn::GenericParam::Const(_param) => {}

21
crates/remote/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "messagebus_remote"
version = "0.1.0"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus", "quic", "remote", "rpc", "parallel", "computing"]
categories = ["network-programming", "asynchronous"]
description = "MessageBus remote allows intercommunicate by messages between instances"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018"
# [features]
# quic = ["quinn"]
[dependencies]
messagebus = "0.9"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
parking_lot = "0.11"
quinn = { version = "0.7" }
thiserror = "1.0.29"

View File

@ -0,0 +1,28 @@
use quinn::{ConnectError, ConnectionError, ParseError, EndpointError, WriteError, ReadToEndError};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO Error: {0}")]
Io(#[from] std::io::Error),
// #[error("ReadToEnd: {0}")]
// ReadToEnd(#[from] ReadToEndError),
#[error("ConnectionError: {0}")]
ConnectionError(#[from ] ConnectionError),
#[error("ConnectError: {0}")]
ConnectError(#[from ] ConnectError),
#[error("EndpointError: {0}")]
EndpointError(#[from ] EndpointError),
#[error("WriteError: {0}")]
WriteError(#[from ] WriteError),
#[error("ReadToEndError: {0}")]
ReadToEndError(#[from ] ReadToEndError),
#[error("QuinnConnectError: {0}")]
QuinnParseError(#[from] ParseError),
}

2
crates/remote/src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod relays;
pub mod error;

View File

@ -0,0 +1,5 @@
// #[cfg(feature = "quic")]
mod quic;
// #[cfg(feature = "quic")]
pub use quic::*;

View File

@ -0,0 +1,51 @@
use std::net::SocketAddr;
use crate::error::Error;
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
pub struct QuicClientRelay {
endpoint: quinn::Endpoint,
}
impl QuicClientRelay {
pub fn new(cert: &str) -> Result<Self, Error> {
let mut client_config = quinn::ClientConfigBuilder::default();
client_config.protocols(ALPN_QUIC_HTTP);
client_config.enable_keylog();
let cert_der = std::fs::read(cert)?;
let cert = quinn::Certificate::from_der(&cert_der)?;
client_config.add_certificate_authority(cert).unwrap();
let mut endpoint = quinn::Endpoint::builder();
endpoint.default_client_config(client_config.build());
let (endpoint, _) = endpoint.bind(&"0.0.0.0:0".parse().unwrap())?;
Ok(Self { endpoint })
}
pub async fn connect(&self, addr: SocketAddr, host: &str) -> Result<(), Error> {
let new_conn = self.endpoint
.connect(&addr, host)?
.await?;
let quinn::NewConnection { connection: conn, .. } = new_conn;
let (mut send, recv) = conn.open_bi().await?;
send.write_all("hi".as_bytes()).await?;
send.finish().await?;
let _resp = recv
.read_to_end(usize::max_value())
.await?;
conn.close(0u32.into(), b"done");
self.endpoint.wait_idle().await;
Ok(())
}
}

View File

@ -0,0 +1,3 @@
pub struct QuicServerRelay {
}

View File

@ -5,7 +5,6 @@ mod handler;
mod receiver;
pub mod receivers;
mod relay;
pub mod relays;
mod stats;
mod trait_object;
@ -42,7 +41,7 @@ pub use handler::*;
pub use relay::Relay;
pub use receiver::{
Action, Event, ReciveTypedReceiver,
ReciveUnypedReceiver, SendTypedReceiver,
ReciveUntypedReceiver, SendTypedReceiver,
SendUntypedReceiver, TypeTagAccept,
};

View File

@ -55,7 +55,7 @@ where
fn poll_events(&self, ctx: &mut Context<'_>, bus: &Bus) -> Poll<Event<M, E>>;
}
pub trait ReciveUnypedReceiver: Sync {
pub trait ReciveUntypedReceiver: Sync {
fn poll_events(
&self,
ctx: &mut Context<'_>,

View File

@ -5,7 +5,7 @@ use crate::{
TypeTagAccept,
},
stats::Stats,
Bus, Event, Message, Permit, ReciveUnypedReceiver, TypeTag,
Bus, Event, Message, Permit, ReciveUntypedReceiver, TypeTag,
};
use dashmap::DashMap;
use std::sync::Arc;
@ -16,8 +16,8 @@ use core::{
use futures::{future::poll_fn, Future};
use tokio::sync::{oneshot, Notify};
pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUnypedReceiver + 'static {}
impl<T: TypeTagAccept + SendUntypedReceiver + ReciveUnypedReceiver + 'static> Relay for T {}
pub trait Relay: TypeTagAccept + SendUntypedReceiver + ReciveUntypedReceiver + 'static {}
impl<T: TypeTagAccept + SendUntypedReceiver + ReciveUntypedReceiver + 'static> Relay for T {}
struct SlabCfg;
impl sharded_slab::Config for SlabCfg {

View File

@ -1,9 +0,0 @@
#[cfg(feature = "quic")]
mod quic;
pub enum AuthKind {
Token(String),
}
#[cfg(feature = "quic")]
pub use quic::*;

View File

@ -1 +0,0 @@
pub struct QuicClientRelay {}

View File

@ -1 +0,0 @@
pub struct QuicServerRelay {}

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use messagebus::{
derive::{Error as MbError, Message},
error::{self, GenericError},
receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUnypedReceiver,
receivers, Action, AsyncHandler, Bus, Event, Message, MessageBounds, ReciveUntypedReceiver,
SendUntypedReceiver, TypeTagAccept, TypeTagged,
};
use parking_lot::Mutex;
@ -153,7 +153,7 @@ impl SendUntypedReceiver for TestRelay {
}
}
impl ReciveUnypedReceiver for TestRelay {
impl ReciveUntypedReceiver for TestRelay {
fn poll_events(
&self,
ctx: &mut Context<'_>,