From 5c03068c79c74e17dd287104ef9a890edc0adf5f Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Thu, 16 Sep 2021 19:04:34 +0400 Subject: [PATCH] QUIC Client Relay progress --- Cargo.toml | 6 + crates/remote/Cargo.toml | 13 +- crates/remote/src/error.rs | 17 ++- crates/remote/src/lib.rs | 2 +- crates/remote/src/relays/quic/client.rs | 162 ++++++++++++++++++++---- crates/remote/src/relays/quic/server.rs | 4 +- src/relay.rs | 2 +- 7 files changed, 166 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 682db7a..6e2d09b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,12 @@ license = "MIT OR Apache-2.0" exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] edition = "2018" +[workspace] +members = [ + "crates/remote", + "crates/derive", +] + [dependencies] messagebus_derive = "0.1" tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 482021c..320b919 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -14,8 +14,15 @@ edition = "2018" # quic = ["quinn"] [dependencies] -messagebus = "0.9" +thiserror = "1.0" +messagebus = { path="../../" } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } parking_lot = "0.11" -quinn = { version = "0.7" } -thiserror = "1.0.29" +quinn = "0.7" +rmp = "0.8.10" +rmp-serde = "0.15.5" +erased-serde = "0.3.16" +serde_derive = "1.0.130" +serde = "1.0.130" +# quinn = { version = "0.7", optional = true } + diff --git a/crates/remote/src/error.rs b/crates/remote/src/error.rs index 9caf34a..49e4fcd 100644 --- a/crates/remote/src/error.rs +++ b/crates/remote/src/error.rs @@ -1,4 +1,4 @@ -use quinn::{ConnectError, ConnectionError, ParseError, EndpointError, WriteError, ReadToEndError}; +use quinn::{ConnectError, ConnectionError, EndpointError, ParseError, ReadToEndError, WriteError}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -8,21 +8,20 @@ pub enum Error { // #[error("ReadToEnd: {0}")] // ReadToEnd(#[from] ReadToEndError), #[error("ConnectionError: {0}")] - ConnectionError(#[from ] ConnectionError), + ConnectionError(#[from] ConnectionError), #[error("ConnectError: {0}")] - ConnectError(#[from ] ConnectError), - + ConnectError(#[from] ConnectError), + #[error("EndpointError: {0}")] - EndpointError(#[from ] EndpointError), + EndpointError(#[from] EndpointError), #[error("WriteError: {0}")] - WriteError(#[from ] WriteError), + WriteError(#[from] WriteError), #[error("ReadToEndError: {0}")] - ReadToEndError(#[from ] ReadToEndError), + ReadToEndError(#[from] ReadToEndError), #[error("QuinnConnectError: {0}")] QuinnParseError(#[from] ParseError), - -} \ No newline at end of file +} diff --git a/crates/remote/src/lib.rs b/crates/remote/src/lib.rs index 385a0cc..b65150f 100644 --- a/crates/remote/src/lib.rs +++ b/crates/remote/src/lib.rs @@ -1,2 +1,2 @@ -pub mod relays; pub mod error; +pub mod relays; diff --git a/crates/remote/src/relays/quic/client.rs b/crates/remote/src/relays/quic/client.rs index 883f56f..0ecd87c 100644 --- a/crates/remote/src/relays/quic/client.rs +++ b/crates/remote/src/relays/quic/client.rs @@ -1,51 +1,167 @@ -use std::net::SocketAddr; use crate::error::Error; +use core::task::{Context, Poll}; +use messagebus::{ + error::{GenericError, SendError}, + Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, + TypeTagAccept, +}; +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, +}; pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; -pub struct QuicClientRelay { +pub struct QuicClientRelayEndpoint { endpoint: quinn::Endpoint, } -impl QuicClientRelay { +impl QuicClientRelayEndpoint { pub fn new(cert: &str) -> Result { let mut client_config = quinn::ClientConfigBuilder::default(); client_config.protocols(ALPN_QUIC_HTTP); client_config.enable_keylog(); - + let cert_der = std::fs::read(cert)?; let cert = quinn::Certificate::from_der(&cert_der)?; - + client_config.add_certificate_authority(cert).unwrap(); - + let mut endpoint = quinn::Endpoint::builder(); endpoint.default_client_config(client_config.build()); - + let (endpoint, _) = endpoint.bind(&"0.0.0.0:0".parse().unwrap())?; Ok(Self { endpoint }) } - pub async fn connect(&self, addr: SocketAddr, host: &str) -> Result<(), Error> { - let new_conn = self.endpoint - .connect(&addr, host)? - .await?; - - let quinn::NewConnection { connection: conn, .. } = new_conn; - - let (mut send, recv) = conn.open_bi().await?; - - send.write_all("hi".as_bytes()).await?; - send.finish().await?; - - let _resp = recv - .read_to_end(usize::max_value()) - .await?; + pub async fn connect( + &self, + addr: SocketAddr, + host: &str, + ) -> Result { + let quinn::NewConnection { connection, .. } = self.endpoint.connect(&addr, host)?.await?; + let (send, recv) = connection.open_bi().await?; - conn.close(0u32.into(), b"done"); + Ok(QuicClientConnection { + connection, + send, + recv, + }) + } + #[inline] + pub async fn wait_idle(&self) { self.endpoint.wait_idle().await; + } +} + +pub struct QuicClientConnection { + connection: quinn::Connection, + send: quinn::SendStream, + recv: quinn::RecvStream, +} + +impl QuicClientConnection { + pub fn send(&self, req: Request) -> Result<(), Error> { + Ok(()) + } +} + +pub struct TypeTable { + type_tags: HashSet, +} + +pub struct QuicClientRelay { + connection: QuicClientConnection, + incoming_table: HashMap>, + outgoing_table: HashMap>, +} + +impl TypeTagAccept for QuicClientRelay { + fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { + self.outgoing_table.get(msg).map_or(false, |v| { + v.into_iter().any(|(r, e)| { + resp.map_or(true, |resp| resp.as_ref() == r.as_ref()) + && err.map_or(true, |err| err.as_ref() == e.as_ref()) + }) + }) + } + + fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { + let iter = self + .outgoing_table + .iter() + .map(|(k, v)| v.iter().map(move |(e, r)| (k, r, e))) + .flatten(); + + for (m, r, e) in iter { + if cb(m, r, e) { + return; + } + } + } +} + +#[derive(Debug)] +pub enum Request { + Flush, + Sync, + Close, + Stats, + Send(u64, Box), + Request(u64, Box), +} + +impl SendUntypedReceiver for QuicClientRelay { + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError> { + self.connection + .send(match msg { + Action::Init => return Ok(()), + Action::Flush => Request::Flush, + Action::Sync => Request::Sync, + Action::Close => Request::Close, + Action::Stats => Request::Stats, + _ => unimplemented!(), + }) + .unwrap(); + + Ok(()) + } + + fn send_msg( + &self, + mid: u64, + msg: Box, + req: bool, + _bus: &Bus, + ) -> Result<(), SendError>> { + self.connection + .send(if req { + Request::Request(mid, msg) + } else { + Request::Send(mid, msg) + }) + .unwrap(); Ok(()) } } + +impl ReciveUntypedReceiver for QuicClientRelay { + fn poll_events( + &self, + ctx: &mut Context<'_>, + _bus: &Bus, + ) -> Poll, GenericError>> { + Poll::Pending + + // let poll = self.srx.lock().poll_recv(ctx); + + // match poll { + // Poll::Pending => Poll::Pending, + // Poll::Ready(Some(event)) => Poll::Ready(event), + // Poll::Ready(None) => Poll::Ready(Event::Exited), + // } + } +} diff --git a/crates/remote/src/relays/quic/server.rs b/crates/remote/src/relays/quic/server.rs index ef8232a..5efbf15 100644 --- a/crates/remote/src/relays/quic/server.rs +++ b/crates/remote/src/relays/quic/server.rs @@ -1,3 +1 @@ -pub struct QuicServerRelay { - -} +pub struct QuicServerRelay {} diff --git a/src/relay.rs b/src/relay.rs index 558af90..7db661f 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -248,7 +248,7 @@ where break; } Event::Flushed => { - self.context.need_flush.store(true, Ordering::SeqCst); + self.context.need_flush.store(false, Ordering::SeqCst); self.context.flushed.notify_waiters() } Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),