QUIC Client Relay progress

This commit is contained in:
Andrey Tkachenko 2021-09-16 19:04:34 +04:00
parent a0544f01e4
commit 5c03068c79
7 changed files with 166 additions and 40 deletions

View File

@ -10,6 +10,12 @@ license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"] exclude = [".gitignore", ".cargo/config", ".github/**", "codecov.yml"]
edition = "2018" edition = "2018"
[workspace]
members = [
"crates/remote",
"crates/derive",
]
[dependencies] [dependencies]
messagebus_derive = "0.1" messagebus_derive = "0.1"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }

View File

@ -14,8 +14,15 @@ edition = "2018"
# quic = ["quinn"] # quic = ["quinn"]
[dependencies] [dependencies]
messagebus = "0.9" thiserror = "1.0"
messagebus = { path="../../" }
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] } tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "sync", "time"] }
parking_lot = "0.11" parking_lot = "0.11"
quinn = { version = "0.7" } quinn = "0.7"
thiserror = "1.0.29" rmp = "0.8.10"
rmp-serde = "0.15.5"
erased-serde = "0.3.16"
serde_derive = "1.0.130"
serde = "1.0.130"
# quinn = { version = "0.7", optional = true }

View File

@ -1,4 +1,4 @@
use quinn::{ConnectError, ConnectionError, ParseError, EndpointError, WriteError, ReadToEndError}; use quinn::{ConnectError, ConnectionError, EndpointError, ParseError, ReadToEndError, WriteError};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
@ -24,5 +24,4 @@ pub enum Error {
#[error("QuinnConnectError: {0}")] #[error("QuinnConnectError: {0}")]
QuinnParseError(#[from] ParseError), QuinnParseError(#[from] ParseError),
} }

View File

@ -1,2 +1,2 @@
pub mod relays;
pub mod error; pub mod error;
pub mod relays;

View File

@ -1,13 +1,22 @@
use std::net::SocketAddr;
use crate::error::Error; use crate::error::Error;
use core::task::{Context, Poll};
use messagebus::{
error::{GenericError, SendError},
Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag,
TypeTagAccept,
};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
};
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
pub struct QuicClientRelay { pub struct QuicClientRelayEndpoint {
endpoint: quinn::Endpoint, endpoint: quinn::Endpoint,
} }
impl QuicClientRelay { impl QuicClientRelayEndpoint {
pub fn new(cert: &str) -> Result<Self, Error> { pub fn new(cert: &str) -> Result<Self, Error> {
let mut client_config = quinn::ClientConfigBuilder::default(); let mut client_config = quinn::ClientConfigBuilder::default();
client_config.protocols(ALPN_QUIC_HTTP); client_config.protocols(ALPN_QUIC_HTTP);
@ -26,26 +35,133 @@ impl QuicClientRelay {
Ok(Self { endpoint }) Ok(Self { endpoint })
} }
pub async fn connect(&self, addr: SocketAddr, host: &str) -> Result<(), Error> { pub async fn connect(
let new_conn = self.endpoint &self,
.connect(&addr, host)? addr: SocketAddr,
.await?; host: &str,
) -> Result<QuicClientConnection, Error> {
let quinn::NewConnection { connection, .. } = self.endpoint.connect(&addr, host)?.await?;
let (send, recv) = connection.open_bi().await?;
let quinn::NewConnection { connection: conn, .. } = new_conn; Ok(QuicClientConnection {
connection,
let (mut send, recv) = conn.open_bi().await?; send,
recv,
send.write_all("hi".as_bytes()).await?; })
send.finish().await?; }
let _resp = recv
.read_to_end(usize::max_value())
.await?;
conn.close(0u32.into(), b"done");
#[inline]
pub async fn wait_idle(&self) {
self.endpoint.wait_idle().await; self.endpoint.wait_idle().await;
}
}
pub struct QuicClientConnection {
connection: quinn::Connection,
send: quinn::SendStream,
recv: quinn::RecvStream,
}
impl QuicClientConnection {
pub fn send(&self, req: Request) -> Result<(), Error> {
Ok(())
}
}
pub struct TypeTable {
type_tags: HashSet<TypeTag>,
}
pub struct QuicClientRelay {
connection: QuicClientConnection,
incoming_table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
outgoing_table: HashMap<TypeTag, Vec<(TypeTag, TypeTag)>>,
}
impl TypeTagAccept for QuicClientRelay {
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.outgoing_table.get(msg).map_or(false, |v| {
v.into_iter().any(|(r, e)| {
resp.map_or(true, |resp| resp.as_ref() == r.as_ref())
&& err.map_or(true, |err| err.as_ref() == e.as_ref())
})
})
}
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
let iter = self
.outgoing_table
.iter()
.map(|(k, v)| v.iter().map(move |(e, r)| (k, r, e)))
.flatten();
for (m, r, e) in iter {
if cb(m, r, e) {
return;
}
}
}
}
#[derive(Debug)]
pub enum Request {
Flush,
Sync,
Close,
Stats,
Send(u64, Box<dyn Message>),
Request(u64, Box<dyn Message>),
}
impl SendUntypedReceiver for QuicClientRelay {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), SendError<Action>> {
self.connection
.send(match msg {
Action::Init => return Ok(()),
Action::Flush => Request::Flush,
Action::Sync => Request::Sync,
Action::Close => Request::Close,
Action::Stats => Request::Stats,
_ => unimplemented!(),
})
.unwrap();
Ok(())
}
fn send_msg(
&self,
mid: u64,
msg: Box<dyn Message>,
req: bool,
_bus: &Bus,
) -> Result<(), SendError<Box<dyn Message>>> {
self.connection
.send(if req {
Request::Request(mid, msg)
} else {
Request::Send(mid, msg)
})
.unwrap();
Ok(()) Ok(())
} }
} }
impl ReciveUntypedReceiver for QuicClientRelay {
fn poll_events(
&self,
ctx: &mut Context<'_>,
_bus: &Bus,
) -> Poll<Event<Box<dyn Message>, GenericError>> {
Poll::Pending
// let poll = self.srx.lock().poll_recv(ctx);
// match poll {
// Poll::Pending => Poll::Pending,
// Poll::Ready(Some(event)) => Poll::Ready(event),
// Poll::Ready(None) => Poll::Ready(Event::Exited),
// }
}
}

View File

@ -1,3 +1 @@
pub struct QuicServerRelay { pub struct QuicServerRelay {}
}

View File

@ -248,7 +248,7 @@ where
break; break;
} }
Event::Flushed => { Event::Flushed => {
self.context.need_flush.store(true, Ordering::SeqCst); self.context.need_flush.store(false, Ordering::SeqCst);
self.context.flushed.notify_waiters() self.context.flushed.notify_waiters()
} }
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(), Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),