Redis begin & fixes for QUIC

This commit is contained in:
Andrey Tkachenko 2021-09-28 15:19:44 +04:00
parent 73763f8a9d
commit 535c39c9dc
7 changed files with 597 additions and 589 deletions

View File

@ -30,9 +30,10 @@ serde_cbor = "0.11.2"
bytes = "1.1.0"
quinn-proto = "0.7.3"
rustls = "0.19.1"
redis = "0.21.2"
redis = {version = "0.21.2", features = ["aio", "tokio-comp"]}
bitflags = "1.3.2"
serde_json = "1.0.68"
log = "0.4.14"
[dev-dependencies]
anyhow = "1.0.44"

View File

@ -158,7 +158,7 @@ impl ProtocolItem {
}
}
pub fn serialize(self, mut body_type: BodyType, body_buff: &mut Vec<u8>) -> Result<ProtocolPacket<'_>, crate::error::Error> {
pub fn serialize<'a>(&self, mut body_type: BodyType, body_buff: &'a mut Vec<u8>) -> Result<ProtocolPacket<'a>, crate::error::Error> {
let mut argument = 0;
let mut type_tag = None;
let mut body = None;
@ -174,11 +174,11 @@ impl ProtocolItem {
_ => unimplemented!(),
}
ProtocolItem::Send(mid, msg, req) => {
let msg = msg.as_shared_boxed()
.map_err(|_| crate::error::Error::UnknownCodec)?;
let msg = msg.as_shared_ref()
.ok_or(crate::error::Error::UnknownCodec)?;
argument = mid;
flags.set(ProtocolHeaderFlags::ARGUMENT, req);
argument = *mid;
flags.set(ProtocolHeaderFlags::ARGUMENT, *req);
flags.set(ProtocolHeaderFlags::BODY, true);
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
type_tag = Some(msg.type_tag());
@ -188,15 +188,15 @@ impl ProtocolItem {
},
ProtocolItem::Event(ev) => match ev {
Event::Response(mid, res) => {
argument = mid;
argument = *mid;
flags.set(ProtocolHeaderFlags::ARGUMENT, true);
flags.set(ProtocolHeaderFlags::BODY, true);
flags.set(ProtocolHeaderFlags::TYPE_TAG, true);
match res {
Ok(msg) => {
let msg = msg.as_shared_boxed()
.map_err(|_| crate::error::Error::UnknownCodec)?;
let msg = msg.as_shared_ref()
.ok_or(crate::error::Error::UnknownCodec)?;
type_tag = Some(msg.type_tag());
body = Some(generic_serialize(body_type, &*msg, body_buff)?);
@ -225,7 +225,7 @@ impl ProtocolItem {
ProtocolHeaderActionKind::Error
}
Event::Finished(n) => {
argument = n;
argument = *n;
flags.set(ProtocolHeaderFlags::ARGUMENT, true);
ProtocolHeaderActionKind::BatchComplete
},

View File

@ -1,7 +1,6 @@
// #[cfg(feature = "quic")]
mod quic;
// mod redis;
mod redis;
use futures::Stream;
use messagebus::{error::GenericError, Event, Message, TypeTag};
@ -26,6 +25,11 @@ impl MessageTable {
.push((resp, err));
}
pub fn iter_keys(&self) -> impl Iterator<Item = &str> + '_ {
self.table.keys()
.map(|k|k.as_ref())
}
pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.table.get(msg).map_or(false, |v| {
v.iter().any(|(r, e)| {

View File

@ -1,22 +1,20 @@
use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}};
use futures::StreamExt;
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendOptions, SendUntypedReceiver, TypeTag, TypeTagAccept};
use parking_lot::Mutex;
use quinn::IncomingBiStreams;
use std::{net::SocketAddr, sync::{Arc, atomic::{AtomicU64, Ordering}}};
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use bytes::{Buf, BufMut};
use crate::{error::Error};
use futures::{Future, Stream};
use quinn::{Connecting};
use std::{net::SocketAddr, pin::Pin, task::{Context, Poll}};
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
use super::WaitIdle;
pub struct QuicClientRelayEndpoint {
pub struct QuicClientEndpoint {
addr: SocketAddr,
host: String,
endpoint: quinn::Endpoint,
}
impl QuicClientRelayEndpoint {
pub fn new(cert: &str) -> Result<Self, Error> {
impl QuicClientEndpoint {
pub fn new(cert: &str, addr: SocketAddr, host: String) -> Result<Self, Error> {
let mut client_config = quinn::ClientConfigBuilder::default();
client_config.protocols(ALPN_QUIC_HTTP);
client_config.protocols(super::ALPN_QUIC_HTTP);
client_config.enable_keylog();
let cert_der = std::fs::read(cert)?;
@ -29,221 +27,25 @@ impl QuicClientRelayEndpoint {
let (endpoint, _) = endpoint.bind(&"0.0.0.0:0".parse().unwrap())?;
Ok(Self { endpoint })
}
}
pub struct QuicClientRelay {
// ready_flag: AtomicBool,
self_id: Arc<AtomicU64>,
addr: SocketAddr,
host: String,
endpoint: QuicClientRelayEndpoint,
outgoing_table: MessageTable,
sender: UnboundedSender<ProtocolItem>,
receiver: Mutex<Option<(UnboundedReceiver<ProtocolItem>, UnboundedSender<IncomingBiStreams>)>>,
st_receiver: Mutex<Option<UnboundedReceiver<IncomingBiStreams>>>,
}
impl QuicClientRelay {
pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, Error> {
let endpoint = QuicClientRelayEndpoint::new(cert)?;
let (sender, receiver) = mpsc::unbounded_channel();
let (st_sender, st_receiver) = mpsc::unbounded_channel();
Ok(Self {
// ready_flag: AtomicBool::new(false),
self_id: Arc::new(AtomicU64::new(0)),
addr,
host,
endpoint,
outgoing_table: MessageTable::from(table),
sender,
receiver: Mutex::new(Some((receiver, st_sender))),
st_receiver: Mutex::new(Some(st_receiver)),
Ok(Self {
addr, host,
endpoint
})
}
}
impl TypeTagAccept for QuicClientRelay {
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.outgoing_table.accept(msg, resp, err)
}
impl Stream for QuicClientEndpoint {
type Item = Connecting;
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
let iter = self.outgoing_table.iter_types();
for (m, r, e) in iter {
if cb(m, r, e) {
return;
}
}
}
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Poll::Ready(this.endpoint.connect(&this.addr, &this.host).ok())
}
}
impl SendUntypedReceiver for QuicClientRelay {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
match msg {
Action::Init(self_id) => {
let (mut rx, recv_stream) = self.receiver.lock().take().unwrap();
let conn = self.endpoint.endpoint.connect(&self.addr, &self.host).unwrap();
self.self_id.store(self_id, Ordering::SeqCst);
tokio::spawn(async move {
let mut body_buff = Vec::new();
let mut header_buff = Vec::new();
let conn = conn.await.unwrap();
recv_stream.send(conn.bi_streams).unwrap();
while let Some(r) = rx.recv().await {
header_buff.clear();
body_buff.clear();
let (mut send, _) = conn.connection.open_bi().await.unwrap();
let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap();
header_buff.put(&b"MBUS"[..]);
header_buff.put_u16(1);
header_buff.put_u16(0);
header_buff.put_u64(header_buff.len() as _);
serde_cbor::to_writer(&mut header_buff, &pkt).unwrap();
send.write_all(&header_buff).await.unwrap();
send.finish().await.unwrap();
println!("sent");
}
});
}
other => self.sender.send(other.into()).unwrap(),
}
Ok(())
impl<'a> WaitIdle<'a> for QuicClientEndpoint {
type Fut = Pin<Box<dyn Future<Output = ()> + Send + 'a>> ;
fn wait_idle(&'a self) -> Self::Fut {
Box::pin(self.endpoint.wait_idle())
}
fn send_msg(
&self,
mid: u64,
msg: Box<dyn Message>,
req: bool,
_bus: &Bus,
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
match msg.as_shared_boxed() {
Ok(msg) => {
if let Err(err) = self.sender.send((mid, msg, req).into()) {
Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box()))
} else {
Ok(())
}
}
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
}
}
}
impl ReciveUntypedReceiver for QuicClientRelay {
type Stream = GenericEventStream;
fn event_stream(&self, bus: Bus) -> Self::Stream {
let self_id = self.self_id.clone();
let sender = self.sender.clone();
let mut recv = self.st_receiver.lock().take().unwrap();
Box::pin(
futures::stream::poll_fn(move |cx|recv.poll_recv(cx))
.map(move |uni_streams| {
let self_id = self_id.clone();
let bus = bus.clone();
let sender = sender.clone();
// let buff = Bytes::new();
futures::stream::unfold((true, uni_streams, bus, sender, self_id), |(first, mut uni_streams, bus, sender, self_id)| async move {
loop {
if first {
return Some((Event::Ready, (false, uni_streams, bus, sender, self_id)));
}
let (_, recv) = match uni_streams.next().await? {
Ok(recv) => recv,
Err(err) => {
println!("error: {}", err);
return None;
}
};
let buff = recv
.read_to_end(usize::max_value())
.await
.unwrap();
// assert_eq!(&buff[0..4], b"MBUS");
let mut reader = &buff[4..];
let version = reader.get_u16();
let content_type = reader.get_u16();
let body_size = reader.get_u64();
println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size);
let event = match content_type {
0 => { // CBOR
let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap();
match proto.deserialize(&bus).unwrap() {
ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
ProtocolItem::Action(action) => {
match action {
Action::Close => {
println!("warning: Close recevied - ignoring!");
sender.send(ProtocolItem::Event(Event::Exited)).unwrap();
},
Action::Flush => {
bus.flush().await;
sender.send(ProtocolItem::Event(Event::Flushed)).unwrap();
},
Action::Sync => {
bus.sync().await;
sender.send(ProtocolItem::Event(Event::Synchronized(Ok(())))).unwrap();
},
Action::Init(..) => (),
Action::Stats => (),
_ => (),
}
continue;
}
ProtocolItem::Send(mid, msg, req) => {
if req {
let res = bus.request_boxed(
msg.upcast_box(),
SendOptions::Except(self_id.load(Ordering::SeqCst))
)
.await
.map(|x|x.as_shared_boxed().unwrap())
.map_err(|x|x.map_msg(|_|()));
sender.send(ProtocolItem::Event(Event::Response(mid, res))).unwrap();
} else {
let _ = bus.send_boxed(msg.upcast_box(), Default::default())
.await;
}
continue;
}
_ => unimplemented!()
}
},
_ => unimplemented!()
};
return Some((event, (false, uni_streams, bus, sender, self_id)));
}
})
})
.flatten()
)
}
}
}

View File

@ -1,87 +1,362 @@
mod client;
mod server;
pub use client::QuicClientRelay;
use messagebus::{Bus, ReciveUntypedReceiver};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
pub use client::QuicClientEndpoint;
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendOptions, SendUntypedReceiver, TypeTag, TypeTagAccept};
use parking_lot::Mutex;
use quinn::IncomingUniStreams;
pub use server::QuicServerRelay;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use bytes::{Buf};
use crate::proto::{ProtocolItem, ProtocolPacket};
use super::GenericEventStream;
pub struct QuicRelay {
sender: UnboundedSender<ProtocolItem>,
receiver: Mutex<Option<(UnboundedReceiver<ProtocolItem>, UnboundedSender<IncomingUniStreams>)>>,
st_receiver: Mutex<Option<UnboundedReceiver<IncomingUniStreams>>>,
}
use quinn::{Connecting, IncomingBiStreams};
pub use server::QuicServerEndpoint;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::{Future, Stream, StreamExt, pin_mut};
use bytes::{Buf, BufMut};
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
impl ReciveUntypedReceiver for QuicRelay {
use crate::proto::{BodyType, ProtocolItem, ProtocolPacket};
pub type QuicClientRelay = QuicRelay<QuicClientEndpoint>;
pub type QuicServerRelay = QuicRelay<QuicServerEndpoint>;
use super::{GenericEventStream, MessageTable};
pub trait WaitIdle<'a>: Sync {
type Fut: Future + Send + 'a;
fn wait_idle(&'a self) -> Self::Fut;
}
#[derive(Debug)]
enum RecvDo {
Pause,
Ready,
Closed,
Incoming(IncomingBiStreams),
}
pub struct QuicRelay<B> {
base: Mutex<Option<B>>,
self_id: Arc<AtomicU64>,
outgoing_table: MessageTable,
item_sender: UnboundedSender<Option<ProtocolItem>>,
item_receiver: Mutex<Option<UnboundedReceiver<Option<ProtocolItem>>>>,
event_sender: UnboundedSender<RecvDo>,
event_receiver: Mutex<Option<UnboundedReceiver<RecvDo>>>,
}
impl QuicRelay<QuicClientEndpoint> {
pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, crate::error::Error> {
let (item_sender, item_receiver) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded_channel();
Ok(QuicRelay {
base: Mutex::new(Some(QuicClientEndpoint::new(cert, addr, host)?)),
self_id: Arc::new(AtomicU64::new(0)),
outgoing_table: MessageTable::from(table),
item_sender,
item_receiver: Mutex::new(Some(item_receiver)),
event_sender,
event_receiver: Mutex::new(Some(event_receiver)),
})
}
}
impl QuicRelay<QuicServerEndpoint> {
pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, crate::error::Error> {
let (item_sender, item_receiver) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded_channel();
Ok(QuicRelay {
base: Mutex::new(Some(QuicServerEndpoint::new(key_path, cert_path, &addr )?)),
self_id: Arc::new(AtomicU64::new(0)),
outgoing_table: MessageTable::from(table),
item_sender,
item_receiver: Mutex::new(Some(item_receiver)),
event_sender,
event_receiver: Mutex::new(Some(event_receiver)),
})
}
}
impl<B> TypeTagAccept for QuicRelay<B>
where B: Stream<Item = Connecting> + Send + 'static
{
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.outgoing_table.accept(msg, resp, err)
}
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
let iter = self.outgoing_table.iter_types();
for (m, r, e) in iter {
if cb(m, r, e) {
return;
}
}
}
}
impl<B> SendUntypedReceiver for QuicRelay<B>
where B: for<'a> WaitIdle<'a> + Stream<Item = Connecting> + Send + 'static
{
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
match msg {
Action::Init(self_id) => {
let event_sender = self.event_sender.clone();
let mut rx = self.item_receiver.lock().take().unwrap();
let incoming = self.base.lock().take().unwrap();
self.self_id.store(self_id, Ordering::SeqCst);
tokio::spawn(async move {
pin_mut!(incoming);
let mut body_buff = Vec::new();
let mut header_buff = Vec::new();
let mut item = None;
loop {
println!("begin");
let conn = match incoming.next().await {
Some(x) => x,
None => {
log::warn!("No more connections. Message {:?} has been lost!", item);
break;
}
};
let conn = match conn.await {
Ok(conn) => conn,
Err(err) => {
log::warn!("connection dropped with err {}. waiting next connection", err);
continue;
}
};
event_sender.send(RecvDo::Ready).unwrap();
event_sender.send(RecvDo::Incoming(conn.bi_streams)).unwrap();
loop {
let r = if let Some(r) = item.take() {
r
} else {
match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await {
Ok(Some(Some(r))) => r,
Ok(None) | Ok(Some(None)) => {
conn.connection.close(0u32.into(), b"done");
incoming.wait_idle().await;
break;
},
Err(_) => {
println!("PING");
let (mut send, _) = match conn.connection.open_bi().await {
Ok(x) => x,
Err(err) => {
println!("err {}", err);
break;
}
};
let _ = send.finish().await;
continue;
}
}
};
header_buff.clear();
body_buff.clear();
let (mut send, _) = match conn.connection.open_bi().await {
Ok(x) => x,
Err(err) => {
println!("err {}", err);
break;
}
};
let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap();
header_buff.put(&b"MBUS"[..]);
header_buff.put_u16(1);
header_buff.put_u16(0);
header_buff.put_u64(header_buff.len() as _);
serde_cbor::to_writer(&mut header_buff, &pkt).unwrap();
let result = send.write_all(&header_buff).await;
let result = if result.is_ok() {
send.finish().await
} else {
result
};
if let Err(err) = result {
item = Some(r);
log::warn!("broken connection err {}. try with next connection", err);
break;
}
println!("1");
}
println!("2");
event_sender.send(RecvDo::Pause).unwrap();
}
println!("exit main loop");
});
}
Action::Close => {
self.item_sender.send(None).unwrap();
self.event_sender.send(RecvDo::Closed).unwrap();
}
other => self.item_sender.send(Some(other.into())).unwrap(),
}
Ok(())
}
fn send_msg(
&self,
mid: u64,
msg: Box<dyn Message>,
req: bool,
_bus: &Bus,
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
match msg.as_shared_boxed() {
Ok(msg) => {
if let Err(err) = self.item_sender.send(Some((mid, msg, req).into())) {
Err(messagebus::error::Error::TryAgain(err.0.unwrap().unwrap_send().unwrap().1.upcast_box()))
} else {
Ok(())
}
}
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
}
}
}
impl<B> ReciveUntypedReceiver for QuicRelay<B>
where B: Send
{
type Stream = GenericEventStream;
fn event_stream(&self, bus: Bus) -> Self::Stream {
let mut recv = self.st_receiver.lock().take().unwrap();
let self_id = self.self_id.clone();
let sender = self.item_sender.clone();
let mut recv = self.event_receiver.lock().take().unwrap();
Box::pin(
futures::stream::poll_fn(move |cx|recv.poll_recv(cx))
.map(move |uni_streams| {
.map(move |recv_do| {
let self_id = self_id.clone();
let bus = bus.clone();
uni_streams.filter_map(move |recv| {
let bus = bus.clone();
let mut buff = Vec::new();
let sender = sender.clone();
async move {
let mut recv = recv.ok()?;
match recv_do {
RecvDo::Incoming(incoming) => {
futures::stream::unfold((incoming, bus, sender, self_id), |(mut incoming, bus, sender, self_id)| async move {
loop {
let (_, recv) = match incoming.next().await? {
Ok(recv) => recv,
Err(err) => {
println!("error: {}", err);
return None;
}
};
let buff = recv
.read_to_end(usize::max_value())
.await
.unwrap();
// assert_eq!(&buff[0..4], b"MBUS");
println!("1");
unsafe { buff.set_len(16) };
recv.read_exact(&mut buff).await.unwrap();
println!("{:?}", buff);
let mut reader = &buff[..];
let mut sign = [0u8; 4];
reader.copy_to_slice(&mut sign);
assert!(&sign != b"MBUS");
let version = reader.get_u16();
assert!(version == 1);
let content_type = reader.get_u16();
let body_size = reader.get_u64();
let diff = buff.capacity() as i64 - body_size as i64;
if diff < 0 {
buff.reserve(-diff as usize);
}
unsafe { buff.set_len(body_size as usize); }
recv.read_exact(&mut buff).await.unwrap();
println!("{:?}", buff);
let event = match content_type {
0 => { // CBOR
let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap();
match proto.deserialize(&bus).unwrap() {
ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
_ => unimplemented!()
if buff.is_empty() {
println!("PONG");
continue;
}
},
_ => unimplemented!()
};
Some(event)
let mut reader = &buff[4..];
let version = reader.get_u16();
let content_type = reader.get_u16();
let body_size = reader.get_u64();
println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size);
let event = match content_type {
0 => { // CBOR
let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap();
match proto.deserialize(&bus).unwrap() {
ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
ProtocolItem::Action(action) => {
match action {
Action::Close => {
println!("warning: Close recevied - ignoring!");
sender.send(Some(ProtocolItem::Event(Event::Exited))).unwrap();
},
Action::Flush => {
bus.flush().await;
sender.send(Some(ProtocolItem::Event(Event::Flushed))).unwrap();
},
Action::Sync => {
bus.sync().await;
sender.send(Some(ProtocolItem::Event(Event::Synchronized(Ok(()))))).unwrap();
},
Action::Init(..) => (),
Action::Stats => (),
_ => (),
}
continue;
}
ProtocolItem::Send(mid, msg, req) => {
if req {
let res = bus.request_boxed(
msg.upcast_box(),
SendOptions::Except(self_id.load(Ordering::SeqCst))
)
.await
.map(|x|x.as_shared_boxed().unwrap())
.map_err(|x|x.map_msg(|_|()));
sender.send(Some(ProtocolItem::Event(Event::Response(mid, res)))).unwrap();
} else {
let _ = bus.send_boxed(msg.upcast_box(), Default::default())
.await;
}
continue;
}
_ => unimplemented!()
}
},
_ => unimplemented!()
};
return Some((event, (incoming, bus, sender, self_id)));
}
}).right_stream()
}
})
other => futures::stream::once(async move {
match other {
RecvDo::Pause => Event::Pause,
RecvDo::Ready => Event::Ready,
RecvDo::Closed => Event::Exited,
_ => unreachable!()
}
}).left_stream()
}
})
.flatten()
)

View File

@ -1,20 +1,16 @@
use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}};
use futures::StreamExt;
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendOptions, SendUntypedReceiver, TypeTag, TypeTagAccept};
use parking_lot::Mutex;
use quinn::IncomingBiStreams;
use std::{net::SocketAddr, sync::{Arc, atomic::{AtomicU64, Ordering}}};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use bytes::{Buf, BufMut};
use crate::error::Error;
use futures::{Future, Stream};
use quinn::Connecting;
use std::{net::SocketAddr, pin::Pin, sync::Arc, task::{Context, Poll}};
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
use super::WaitIdle;
pub struct QuicServerRelayEndpoint {
// endpoint: Mutex<Option<quinn::Endpoint>>,
incoming: Mutex<Option<quinn::Incoming>>,
pub struct QuicServerEndpoint {
endpoint: quinn::Endpoint,
incoming: quinn::Incoming,
}
impl QuicServerRelayEndpoint {
impl QuicServerEndpoint {
pub fn new(key_path: &str, cert_path: &str, addr: &SocketAddr) -> Result<Self, Error> {
let mut transport_config = quinn::TransportConfig::default();
transport_config.max_concurrent_uni_streams(0)?;
@ -24,7 +20,7 @@ impl QuicServerRelayEndpoint {
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
server_config.protocols(ALPN_QUIC_HTTP);
server_config.protocols(super::ALPN_QUIC_HTTP);
server_config.enable_keylog();
let key = std::fs::read(key_path)?;
@ -40,229 +36,27 @@ impl QuicServerRelayEndpoint {
let mut endpoint = quinn::Endpoint::builder();
endpoint.listen(server_config.build());
let (_, incoming) = endpoint.bind(addr)?;
let (endpoint, incoming) = endpoint.bind(addr)?;
Ok(Self {
// endpoint: Mutex::new(Some(endpoint)),
incoming: Mutex::new(Some(incoming))
endpoint,
incoming
})
}
}
pub struct QuicServerRelay {
// ready_flag: AtomicBool,
self_id: Arc<AtomicU64>,
endpoint: QuicServerRelayEndpoint,
outgoing_table: MessageTable,
sender: UnboundedSender<ProtocolItem>,
receiver: Mutex<Option<(UnboundedReceiver<ProtocolItem>, UnboundedSender<IncomingBiStreams>)>>,
st_receiver: Mutex<Option<UnboundedReceiver<IncomingBiStreams>>>,
impl Stream for QuicServerEndpoint {
type Item = Connecting;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
unsafe { Pin::new_unchecked(&mut this.incoming) }.poll_next(cx)
}
}
impl QuicServerRelay {
pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, Error> {
let endpoint = QuicServerRelayEndpoint::new(key_path, cert_path, &addr)?;
let (sender, receiver) = mpsc::unbounded_channel();
let (st_sender, st_receiver) = mpsc::unbounded_channel();
Ok(Self {
// ready_flag: AtomicBool::new(false),
self_id: Arc::new(AtomicU64::new(0)),
endpoint,
outgoing_table: MessageTable::from(table),
sender,
receiver: Mutex::new(Some((receiver, st_sender))),
st_receiver: Mutex::new(Some(st_receiver)),
})
}
}
impl TypeTagAccept for QuicServerRelay {
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.outgoing_table.accept(msg, resp, err)
}
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
let iter = self.outgoing_table.iter_types();
for (m, r, e) in iter {
if cb(m, r, e) {
return;
}
}
}
}
impl SendUntypedReceiver for QuicServerRelay {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
match msg {
Action::Init(self_id) => {
let (mut rx, recv_stream) = self.receiver.lock().take().unwrap();
let mut incoming = self.endpoint.incoming.lock().take().unwrap();
self.self_id.store(self_id, Ordering::SeqCst);
tokio::spawn(async move {
let mut body_buff = Vec::new();
let mut header_buff = Vec::new();
loop {
let conn = match incoming.next().await {
Some(x) => x,
None => todo!("message lost!!!")
};
println!("new connection");
let conn = conn.await.unwrap();
recv_stream.send(conn.bi_streams).unwrap();
while let Some(r) = rx.recv().await {
header_buff.clear();
body_buff.clear();
let (mut send, _) = conn.connection.open_bi().await.unwrap();
let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap();
header_buff.put(&b"MBUS"[..]);
header_buff.put_u16(1);
header_buff.put_u16(0);
header_buff.put_u64(header_buff.len() as _);
serde_cbor::to_writer(&mut header_buff, &pkt).unwrap();
send.write_all(&header_buff).await.unwrap();
send.finish().await.unwrap();
}
}
});
}
other => self.sender.send(other.into()).unwrap(),
}
Ok(())
}
fn send_msg(
&self,
mid: u64,
msg: Box<dyn Message>,
req: bool,
_bus: &Bus,
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
match msg.as_shared_boxed() {
Ok(msg) => {
if let Err(err) = self.sender.send((mid, msg, req).into()) {
Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box()))
} else {
Ok(())
}
}
Err(msg) => Err(messagebus::error::Error::TryAgain(msg)),
}
}
}
impl ReciveUntypedReceiver for QuicServerRelay {
type Stream = GenericEventStream;
fn event_stream(&self, bus: Bus) -> Self::Stream {
let self_id = self.self_id.clone();
let sender = self.sender.clone();
let mut recv = self.st_receiver.lock().take().unwrap();
Box::pin(
futures::stream::poll_fn(move |cx|recv.poll_recv(cx))
.map(move |uni_streams| {
let self_id = self_id.clone();
let bus = bus.clone();
let sender = sender.clone();
// let buff = Bytes::new();
futures::stream::unfold((true, uni_streams, bus, sender, self_id), |(first, mut uni_streams, bus, sender, self_id)| async move {
loop {
if first {
return Some((Event::Ready, (false, uni_streams, bus, sender, self_id)));
}
let (_, recv) = match uni_streams.next().await? {
Ok(recv) => recv,
Err(err) => {
println!("error: {}", err);
return None;
}
};
let buff = recv
.read_to_end(usize::max_value())
.await
.unwrap();
// assert_eq!(&buff[0..4], b"MBUS");
let mut reader = &buff[4..];
let version = reader.get_u16();
let content_type = reader.get_u16();
let body_size = reader.get_u64();
println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size);
let event = match content_type {
0 => { // CBOR
let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap();
match proto.deserialize(&bus).unwrap() {
ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
ProtocolItem::Action(action) => {
match action {
Action::Close => {
println!("warning: Close recevied - ignoring!");
sender.send(ProtocolItem::Event(Event::Exited)).unwrap();
},
Action::Flush => {
bus.flush().await;
sender.send(ProtocolItem::Event(Event::Flushed)).unwrap();
},
Action::Sync => {
bus.sync().await;
sender.send(ProtocolItem::Event(Event::Synchronized(Ok(())))).unwrap();
},
Action::Init(..) => (),
Action::Stats => (),
_ => (),
}
continue;
}
ProtocolItem::Send(mid, msg, req) => {
if req {
let res = bus.request_boxed(
msg.upcast_box(),
SendOptions::Except(self_id.load(Ordering::SeqCst))
)
.await
.map(|x|x.as_shared_boxed().unwrap())
.map_err(|x|x.map_msg(|_|()));
sender.send(ProtocolItem::Event(Event::Response(mid, res))).unwrap();
} else {
let _ = bus.send_boxed(msg.upcast_box(), Default::default())
.await;
}
continue;
}
_ => unimplemented!()
}
},
_ => unimplemented!()
};
return Some((event, (false, uni_streams, bus, sender, self_id)));
}
})
})
.flatten()
)
impl<'a> WaitIdle<'a> for QuicServerEndpoint {
type Fut = Pin<Box<dyn Future<Output = ()> + Send + 'a>> ;
fn wait_idle(&'a self) -> Self::Fut {
Box::pin(self.endpoint.wait_idle())
}
}

View File

@ -1,44 +1,61 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept};
use crate::error::Error;
use super::{GenericEventStream, MessageTable};
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use crate::proto::{BodyType, ProtocolItem};
use super::{GenericEventStream, MessageTable};
use redis::AsyncCommands;
#[derive(Debug)]
enum RecvDo {
Pause,
Ready,
Closed,
}
pub struct RedisRelay {
in_table: MessageTable,
out_table: MessageTable,
client: Arc<redis::Client>,
self_id: Arc<AtomicU64>,
table: MessageTable,
item_sender: UnboundedSender<Option<ProtocolItem>>,
item_receiver: Mutex<Option<UnboundedReceiver<Option<ProtocolItem>>>>,
event_sender: UnboundedSender<RecvDo>,
event_receiver: Mutex<Option<UnboundedReceiver<RecvDo>>>,
}
impl RedisRelay {
fn new(uri: &str) -> Result<Self, Error> {
let client = redis::Client::open("redis://127.0.0.1/")?;
unimplemented!()
pub fn new(path: &str, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result<Self, crate::error::Error> {
let client = redis::Client::open(path)?;
let (item_sender, item_receiver) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded_channel();
// let mut publish_conn = client.get_async_connection().await?;
// let mut pubsub_conn = client.get_async_connection().await?.into_pubsub();
// pubsub_conn.subscribe("wavephone").await?;
// let mut pubsub_stream = pubsub_conn.on_message();
// publish_conn.publish("wavephone", "banana").await?;
// let pubsub_msg: String = pubsub_stream.next().await.unwrap().get_payload()?;
// assert_eq!(&pubsub_msg, "banana");
// Ok(())
Ok(RedisRelay {
client: Arc::new(client),
self_id: Arc::new(AtomicU64::new(0)),
table: MessageTable::from(table),
item_sender,
item_receiver: Mutex::new(Some(item_receiver)),
event_sender,
event_receiver: Mutex::new(Some(event_receiver)),
})
}
}
}
impl TypeTagAccept for RedisRelay {
fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool {
self.out_table.accept(msg, resp, err)
self.table.accept(msg, resp, err)
}
fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) {
let iter = self.out_table.iter_types();
let iter = self.table.iter_types();
for (m, r, e) in iter {
if cb(m, r, e) {
@ -51,26 +68,48 @@ impl TypeTagAccept for RedisRelay {
impl SendUntypedReceiver for RedisRelay {
fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error<Action>> {
match msg {
Action::Init => {
// let (sender, mut rx) = self.receiver_send.lock().take().unwrap();
// let conn = self.endpoint.incoming();
Action::Init(self_id) => {
let event_sender = self.event_sender.clone();
let mut rx = self.item_receiver.lock().take().unwrap();
// tokio::spawn(async move {
// let mut conn = conn.await.unwrap();
// sender.send((conn.recv, conn.connection)).unwrap();
// let mut buf1 = Vec::new();
// let mut buf2 = Vec::new();
let client = self.client.clone();
self.self_id.store(self_id, Ordering::SeqCst);
// while let Some(r) = rx.recv().await {
// r.serialize(&mut buf1, &mut buf2, &mut conn.send)
// .await
// .unwrap();
tokio::spawn(async move {
let mut connection = client.get_tokio_connection().await.unwrap();
let mut body_buff = Vec::new();
let mut header_buff = Vec::new();
// let mut item = None;
// }
// });
event_sender.send(RecvDo::Ready).unwrap();
while let Some(Some(item)) = rx.recv().await {
header_buff.clear();
body_buff.clear();
let pkt = item.serialize(BodyType::Cbor, &mut body_buff).unwrap();
serde_cbor::to_writer(&mut header_buff, &pkt).unwrap();
let channel = match &item {
ProtocolItem::Action(_) => "mbus_action".into(),
ProtocolItem::Send(_, msg, _) => format!("mbus_request::{}", msg.type_tag()),
ProtocolItem::Event(ev) => "mbus_response::".into(),
_ => unreachable!()
};
let () = connection.publish(channel, &header_buff).await.unwrap();
}
});
}
other => self.sender.send(other.into()).unwrap(),
Action::Close => {
self.item_sender.send(None).unwrap();
// self.event_sender.send(RecvDo::Closed).unwrap();
}
other => self.item_sender.send(Some(other.into())).unwrap(),
}
Ok(())
@ -85,8 +124,8 @@ impl SendUntypedReceiver for RedisRelay {
) -> Result<(), messagebus::error::Error<Box<dyn Message>>> {
match msg.as_shared_boxed() {
Ok(msg) => {
if let Err(err) = self.sender.send((req, mid, msg).into()) {
Err(messagebus::error::Error::TryAgain(err.0.unwrap_msg().unwrap()))
if let Err(err) = self.item_sender.send(Some((mid, msg, req).into())) {
Err(messagebus::error::Error::TryAgain(err.0.unwrap().unwrap_send().unwrap().1.upcast_box()))
} else {
Ok(())
}
@ -101,24 +140,117 @@ impl ReciveUntypedReceiver for RedisRelay {
type Stream = GenericEventStream;
fn event_stream(&self, bus: Bus) -> Self::Stream {
let recevier = self.recevier.lock().take().unwrap();
let self_id = self.self_id.clone();
let sender = self.item_sender.clone();
let mut recv = self.event_receiver.lock().take().unwrap();
Box::pin(
futures::stream::unfold((recevier, bus), |(recv, bus)| async move {
if let Ok(r) = recv.next().await? {
let stream = futures::stream::unfold((true, r, bus.clone()), |(first, r, bus)| async move {
if first {
return Some((Event::Ready, (false, r, bus)));
}
Some((Event::Pause, (false, r, bus)))
});
futures::stream::poll_fn(move |cx|recv.poll_recv(cx))
.map(move |recv_do| {
let self_id = self_id.clone();
let bus = bus.clone();
let sender = sender.clone();
Some((stream.left_stream(), (recv, bus)))
} else {
Some((futures::stream::once(async move { Event::Pause }).right_stream(), (recv, bus)))
}
}).flatten()
match recv_do {
// RecvDo::Incoming(incoming) => {
// futures::stream::unfold((incoming, bus, sender, self_id), |(mut incoming, bus, sender, self_id)| async move {
// loop {
// let (_, recv) = match incoming.next().await? {
// Ok(recv) => recv,
// Err(err) => {
// println!("error: {}", err);
// return None;
// }
// };
// let buff = recv
// .read_to_end(usize::max_value())
// .await
// .unwrap();
// // assert_eq!(&buff[0..4], b"MBUS");
// if buff.is_empty() {
// println!("PONG");
// continue;
// }
// let mut reader = &buff[4..];
// let version = reader.get_u16();
// let content_type = reader.get_u16();
// let body_size = reader.get_u64();
// println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size);
// let event = match content_type {
// 0 => { // CBOR
// let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap();
// match proto.deserialize(&bus).unwrap() {
// ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()),
// ProtocolItem::Action(action) => {
// match action {
// Action::Close => {
// println!("warning: Close recevied - ignoring!");
// sender.send(Some(ProtocolItem::Event(Event::Exited))).unwrap();
// },
// Action::Flush => {
// bus.flush().await;
// sender.send(Some(ProtocolItem::Event(Event::Flushed))).unwrap();
// },
// Action::Sync => {
// bus.sync().await;
// sender.send(Some(ProtocolItem::Event(Event::Synchronized(Ok(()))))).unwrap();
// },
// Action::Init(..) => (),
// Action::Stats => (),
// _ => (),
// }
// continue;
// }
// ProtocolItem::Send(mid, msg, req) => {
// if req {
// let res = bus.request_boxed(
// msg.upcast_box(),
// SendOptions::Except(self_id.load(Ordering::SeqCst))
// )
// .await
// .map(|x|x.as_shared_boxed().unwrap())
// .map_err(|x|x.map_msg(|_|()));
// sender.send(Some(ProtocolItem::Event(Event::Response(mid, res)))).unwrap();
// } else {
// let _ = bus.send_boxed(msg.upcast_box(), Default::default())
// .await;
// }
// continue;
// }
// _ => unimplemented!()
// }
// },
// _ => unimplemented!()
// };
// return Some((event, (incoming, bus, sender, self_id)));
// }
// }).right_stream()
// }
other => futures::stream::once(async move {
match other {
RecvDo::Pause => Event::Pause,
RecvDo::Ready => Event::Ready,
RecvDo::Closed => Event::Exited,
_ => unreachable!()
}
})
// .left_stream()
}
})
.flatten()
)
}
}
}