diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index bea8eb0..47bc5dc 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -30,9 +30,10 @@ serde_cbor = "0.11.2" bytes = "1.1.0" quinn-proto = "0.7.3" rustls = "0.19.1" -redis = "0.21.2" +redis = {version = "0.21.2", features = ["aio", "tokio-comp"]} bitflags = "1.3.2" serde_json = "1.0.68" +log = "0.4.14" [dev-dependencies] anyhow = "1.0.44" diff --git a/crates/remote/src/proto.rs b/crates/remote/src/proto.rs index b38f325..774f03f 100644 --- a/crates/remote/src/proto.rs +++ b/crates/remote/src/proto.rs @@ -158,7 +158,7 @@ impl ProtocolItem { } } - pub fn serialize(self, mut body_type: BodyType, body_buff: &mut Vec) -> Result, crate::error::Error> { + pub fn serialize<'a>(&self, mut body_type: BodyType, body_buff: &'a mut Vec) -> Result, crate::error::Error> { let mut argument = 0; let mut type_tag = None; let mut body = None; @@ -174,11 +174,11 @@ impl ProtocolItem { _ => unimplemented!(), } ProtocolItem::Send(mid, msg, req) => { - let msg = msg.as_shared_boxed() - .map_err(|_| crate::error::Error::UnknownCodec)?; + let msg = msg.as_shared_ref() + .ok_or(crate::error::Error::UnknownCodec)?; - argument = mid; - flags.set(ProtocolHeaderFlags::ARGUMENT, req); + argument = *mid; + flags.set(ProtocolHeaderFlags::ARGUMENT, *req); flags.set(ProtocolHeaderFlags::BODY, true); flags.set(ProtocolHeaderFlags::TYPE_TAG, true); type_tag = Some(msg.type_tag()); @@ -188,15 +188,15 @@ impl ProtocolItem { }, ProtocolItem::Event(ev) => match ev { Event::Response(mid, res) => { - argument = mid; + argument = *mid; flags.set(ProtocolHeaderFlags::ARGUMENT, true); flags.set(ProtocolHeaderFlags::BODY, true); flags.set(ProtocolHeaderFlags::TYPE_TAG, true); match res { Ok(msg) => { - let msg = msg.as_shared_boxed() - .map_err(|_| crate::error::Error::UnknownCodec)?; + let msg = msg.as_shared_ref() + .ok_or(crate::error::Error::UnknownCodec)?; type_tag = Some(msg.type_tag()); body = Some(generic_serialize(body_type, &*msg, body_buff)?); @@ -225,7 +225,7 @@ impl ProtocolItem { ProtocolHeaderActionKind::Error } Event::Finished(n) => { - argument = n; + argument = *n; flags.set(ProtocolHeaderFlags::ARGUMENT, true); ProtocolHeaderActionKind::BatchComplete }, diff --git a/crates/remote/src/relays/mod.rs b/crates/remote/src/relays/mod.rs index 78101a2..f210ea6 100644 --- a/crates/remote/src/relays/mod.rs +++ b/crates/remote/src/relays/mod.rs @@ -1,7 +1,6 @@ // #[cfg(feature = "quic")] mod quic; - -// mod redis; +mod redis; use futures::Stream; use messagebus::{error::GenericError, Event, Message, TypeTag}; @@ -26,6 +25,11 @@ impl MessageTable { .push((resp, err)); } + pub fn iter_keys(&self) -> impl Iterator + '_ { + self.table.keys() + .map(|k|k.as_ref()) + } + pub fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { self.table.get(msg).map_or(false, |v| { v.iter().any(|(r, e)| { diff --git a/crates/remote/src/relays/quic/client.rs b/crates/remote/src/relays/quic/client.rs index 33b16f5..f005444 100644 --- a/crates/remote/src/relays/quic/client.rs +++ b/crates/remote/src/relays/quic/client.rs @@ -1,22 +1,20 @@ -use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}}; -use futures::StreamExt; -use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendOptions, SendUntypedReceiver, TypeTag, TypeTagAccept}; -use parking_lot::Mutex; -use quinn::IncomingBiStreams; -use std::{net::SocketAddr, sync::{Arc, atomic::{AtomicU64, Ordering}}}; -use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver}; -use bytes::{Buf, BufMut}; +use crate::{error::Error}; +use futures::{Future, Stream}; +use quinn::{Connecting}; +use std::{net::SocketAddr, pin::Pin, task::{Context, Poll}}; -pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; +use super::WaitIdle; -pub struct QuicClientRelayEndpoint { +pub struct QuicClientEndpoint { + addr: SocketAddr, + host: String, endpoint: quinn::Endpoint, } -impl QuicClientRelayEndpoint { - pub fn new(cert: &str) -> Result { +impl QuicClientEndpoint { + pub fn new(cert: &str, addr: SocketAddr, host: String) -> Result { let mut client_config = quinn::ClientConfigBuilder::default(); - client_config.protocols(ALPN_QUIC_HTTP); + client_config.protocols(super::ALPN_QUIC_HTTP); client_config.enable_keylog(); let cert_der = std::fs::read(cert)?; @@ -29,221 +27,25 @@ impl QuicClientRelayEndpoint { let (endpoint, _) = endpoint.bind(&"0.0.0.0:0".parse().unwrap())?; - Ok(Self { endpoint }) - } -} - -pub struct QuicClientRelay { - // ready_flag: AtomicBool, - self_id: Arc, - addr: SocketAddr, - host: String, - endpoint: QuicClientRelayEndpoint, - outgoing_table: MessageTable, - sender: UnboundedSender, - receiver: Mutex, UnboundedSender)>>, - st_receiver: Mutex>>, -} - -impl QuicClientRelay { - pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { - let endpoint = QuicClientRelayEndpoint::new(cert)?; - let (sender, receiver) = mpsc::unbounded_channel(); - let (st_sender, st_receiver) = mpsc::unbounded_channel(); - - Ok(Self { - // ready_flag: AtomicBool::new(false), - self_id: Arc::new(AtomicU64::new(0)), - addr, - host, - endpoint, - outgoing_table: MessageTable::from(table), - sender, - receiver: Mutex::new(Some((receiver, st_sender))), - st_receiver: Mutex::new(Some(st_receiver)), + Ok(Self { + addr, host, + endpoint }) } } -impl TypeTagAccept for QuicClientRelay { - fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { - self.outgoing_table.accept(msg, resp, err) - } +impl Stream for QuicClientEndpoint { + type Item = Connecting; - fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { - let iter = self.outgoing_table.iter_types(); - - for (m, r, e) in iter { - if cb(m, r, e) { - return; - } - } - } + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Poll::Ready(this.endpoint.connect(&this.addr, &this.host).ok()) + } } -impl SendUntypedReceiver for QuicClientRelay { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { - match msg { - Action::Init(self_id) => { - let (mut rx, recv_stream) = self.receiver.lock().take().unwrap(); - let conn = self.endpoint.endpoint.connect(&self.addr, &self.host).unwrap(); - self.self_id.store(self_id, Ordering::SeqCst); - - tokio::spawn(async move { - let mut body_buff = Vec::new(); - let mut header_buff = Vec::new(); - let conn = conn.await.unwrap(); - - recv_stream.send(conn.bi_streams).unwrap(); - - while let Some(r) = rx.recv().await { - header_buff.clear(); - body_buff.clear(); - - let (mut send, _) = conn.connection.open_bi().await.unwrap(); - let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap(); - - header_buff.put(&b"MBUS"[..]); - header_buff.put_u16(1); - header_buff.put_u16(0); - header_buff.put_u64(header_buff.len() as _); - serde_cbor::to_writer(&mut header_buff, &pkt).unwrap(); - - send.write_all(&header_buff).await.unwrap(); - send.finish().await.unwrap(); - - println!("sent"); - } - }); - } - - other => self.sender.send(other.into()).unwrap(), - } - - Ok(()) +impl<'a> WaitIdle<'a> for QuicClientEndpoint { + type Fut = Pin + Send + 'a>> ; + fn wait_idle(&'a self) -> Self::Fut { + Box::pin(self.endpoint.wait_idle()) } - - fn send_msg( - &self, - mid: u64, - msg: Box, - req: bool, - _bus: &Bus, - ) -> Result<(), messagebus::error::Error>> { - match msg.as_shared_boxed() { - Ok(msg) => { - if let Err(err) = self.sender.send((mid, msg, req).into()) { - Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box())) - } else { - Ok(()) - } - } - - Err(msg) => Err(messagebus::error::Error::TryAgain(msg)), - } - } -} - -impl ReciveUntypedReceiver for QuicClientRelay { - type Stream = GenericEventStream; - - fn event_stream(&self, bus: Bus) -> Self::Stream { - let self_id = self.self_id.clone(); - let sender = self.sender.clone(); - let mut recv = self.st_receiver.lock().take().unwrap(); - - Box::pin( - futures::stream::poll_fn(move |cx|recv.poll_recv(cx)) - .map(move |uni_streams| { - let self_id = self_id.clone(); - let bus = bus.clone(); - let sender = sender.clone(); - // let buff = Bytes::new(); - - futures::stream::unfold((true, uni_streams, bus, sender, self_id), |(first, mut uni_streams, bus, sender, self_id)| async move { - loop { - if first { - return Some((Event::Ready, (false, uni_streams, bus, sender, self_id))); - } - - let (_, recv) = match uni_streams.next().await? { - Ok(recv) => recv, - Err(err) => { - println!("error: {}", err); - return None; - } - }; - - let buff = recv - .read_to_end(usize::max_value()) - .await - .unwrap(); - - // assert_eq!(&buff[0..4], b"MBUS"); - - let mut reader = &buff[4..]; - - let version = reader.get_u16(); - let content_type = reader.get_u16(); - let body_size = reader.get_u64(); - - println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size); - - let event = match content_type { - 0 => { // CBOR - let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap(); - - match proto.deserialize(&bus).unwrap() { - ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), - ProtocolItem::Action(action) => { - match action { - Action::Close => { - println!("warning: Close recevied - ignoring!"); - sender.send(ProtocolItem::Event(Event::Exited)).unwrap(); - }, - Action::Flush => { - bus.flush().await; - sender.send(ProtocolItem::Event(Event::Flushed)).unwrap(); - }, - Action::Sync => { - bus.sync().await; - sender.send(ProtocolItem::Event(Event::Synchronized(Ok(())))).unwrap(); - }, - Action::Init(..) => (), - Action::Stats => (), - _ => (), - } - continue; - } - ProtocolItem::Send(mid, msg, req) => { - if req { - let res = bus.request_boxed( - msg.upcast_box(), - SendOptions::Except(self_id.load(Ordering::SeqCst)) - ) - .await - .map(|x|x.as_shared_boxed().unwrap()) - .map_err(|x|x.map_msg(|_|())); - - sender.send(ProtocolItem::Event(Event::Response(mid, res))).unwrap(); - } else { - let _ = bus.send_boxed(msg.upcast_box(), Default::default()) - .await; - } - - continue; - } - _ => unimplemented!() - } - }, - _ => unimplemented!() - }; - - return Some((event, (false, uni_streams, bus, sender, self_id))); - } - }) - }) - .flatten() - ) - } -} +} \ No newline at end of file diff --git a/crates/remote/src/relays/quic/mod.rs b/crates/remote/src/relays/quic/mod.rs index 50e6a94..1dc8980 100644 --- a/crates/remote/src/relays/quic/mod.rs +++ b/crates/remote/src/relays/quic/mod.rs @@ -1,87 +1,362 @@ mod client; mod server; -pub use client::QuicClientRelay; -use messagebus::{Bus, ReciveUntypedReceiver}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +pub use client::QuicClientEndpoint; +use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendOptions, SendUntypedReceiver, TypeTag, TypeTagAccept}; use parking_lot::Mutex; -use quinn::IncomingUniStreams; -pub use server::QuicServerRelay; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use futures::StreamExt; -use bytes::{Buf}; - -use crate::proto::{ProtocolItem, ProtocolPacket}; - -use super::GenericEventStream; - -pub struct QuicRelay { - sender: UnboundedSender, - receiver: Mutex, UnboundedSender)>>, - st_receiver: Mutex>>, -} +use quinn::{Connecting, IncomingBiStreams}; +pub use server::QuicServerEndpoint; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use futures::{Future, Stream, StreamExt, pin_mut}; +use bytes::{Buf, BufMut}; pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; -impl ReciveUntypedReceiver for QuicRelay { +use crate::proto::{BodyType, ProtocolItem, ProtocolPacket}; + +pub type QuicClientRelay = QuicRelay; +pub type QuicServerRelay = QuicRelay; + +use super::{GenericEventStream, MessageTable}; + + +pub trait WaitIdle<'a>: Sync { + type Fut: Future + Send + 'a; + fn wait_idle(&'a self) -> Self::Fut; +} + +#[derive(Debug)] +enum RecvDo { + Pause, + Ready, + Closed, + Incoming(IncomingBiStreams), +} + +pub struct QuicRelay { + base: Mutex>, + self_id: Arc, + outgoing_table: MessageTable, + + item_sender: UnboundedSender>, + item_receiver: Mutex>>>, + event_sender: UnboundedSender, + event_receiver: Mutex>>, +} + +impl QuicRelay { + pub fn new(cert: &str, addr: SocketAddr, host: String, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { + let (item_sender, item_receiver) = mpsc::unbounded_channel(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); + + Ok(QuicRelay { + base: Mutex::new(Some(QuicClientEndpoint::new(cert, addr, host)?)), + self_id: Arc::new(AtomicU64::new(0)), + outgoing_table: MessageTable::from(table), + item_sender, + item_receiver: Mutex::new(Some(item_receiver)), + event_sender, + event_receiver: Mutex::new(Some(event_receiver)), + }) + } +} + +impl QuicRelay { + pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { + let (item_sender, item_receiver) = mpsc::unbounded_channel(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); + + Ok(QuicRelay { + base: Mutex::new(Some(QuicServerEndpoint::new(key_path, cert_path, &addr )?)), + self_id: Arc::new(AtomicU64::new(0)), + outgoing_table: MessageTable::from(table), + item_sender, + item_receiver: Mutex::new(Some(item_receiver)), + event_sender, + event_receiver: Mutex::new(Some(event_receiver)), + }) + } +} + +impl TypeTagAccept for QuicRelay +where B: Stream + Send + 'static +{ + fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { + self.outgoing_table.accept(msg, resp, err) + } + + fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { + let iter = self.outgoing_table.iter_types(); + + for (m, r, e) in iter { + if cb(m, r, e) { + return; + } + } + } +} + +impl SendUntypedReceiver for QuicRelay + where B: for<'a> WaitIdle<'a> + Stream + Send + 'static +{ + fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { + match msg { + Action::Init(self_id) => { + let event_sender = self.event_sender.clone(); + let mut rx = self.item_receiver.lock().take().unwrap(); + + let incoming = self.base.lock().take().unwrap(); + self.self_id.store(self_id, Ordering::SeqCst); + + tokio::spawn(async move { + pin_mut!(incoming); + + let mut body_buff = Vec::new(); + let mut header_buff = Vec::new(); + let mut item = None; + + loop { + println!("begin"); + let conn = match incoming.next().await { + Some(x) => x, + None => { + log::warn!("No more connections. Message {:?} has been lost!", item); + break; + } + }; + + let conn = match conn.await { + Ok(conn) => conn, + Err(err) => { + log::warn!("connection dropped with err {}. waiting next connection", err); + continue; + } + }; + + event_sender.send(RecvDo::Ready).unwrap(); + event_sender.send(RecvDo::Incoming(conn.bi_streams)).unwrap(); + + loop { + let r = if let Some(r) = item.take() { + r + } else { + match tokio::time::timeout(Duration::from_secs(1), rx.recv()).await { + Ok(Some(Some(r))) => r, + Ok(None) | Ok(Some(None)) => { + conn.connection.close(0u32.into(), b"done"); + incoming.wait_idle().await; + break; + }, + Err(_) => { + println!("PING"); + let (mut send, _) = match conn.connection.open_bi().await { + Ok(x) => x, + Err(err) => { + println!("err {}", err); + break; + } + }; + let _ = send.finish().await; + continue; + } + } + }; + + header_buff.clear(); + body_buff.clear(); + + let (mut send, _) = match conn.connection.open_bi().await { + Ok(x) => x, + Err(err) => { + println!("err {}", err); + break; + } + }; + + let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap(); + + header_buff.put(&b"MBUS"[..]); + header_buff.put_u16(1); + header_buff.put_u16(0); + header_buff.put_u64(header_buff.len() as _); + serde_cbor::to_writer(&mut header_buff, &pkt).unwrap(); + + let result = send.write_all(&header_buff).await; + let result = if result.is_ok() { + send.finish().await + } else { + result + }; + + if let Err(err) = result { + item = Some(r); + log::warn!("broken connection err {}. try with next connection", err); + break; + } + + println!("1"); + } + + println!("2"); + + event_sender.send(RecvDo::Pause).unwrap(); + } + + println!("exit main loop"); + }); + } + + Action::Close => { + self.item_sender.send(None).unwrap(); + self.event_sender.send(RecvDo::Closed).unwrap(); + } + + other => self.item_sender.send(Some(other.into())).unwrap(), + } + + Ok(()) + } + + fn send_msg( + &self, + mid: u64, + msg: Box, + req: bool, + _bus: &Bus, + ) -> Result<(), messagebus::error::Error>> { + match msg.as_shared_boxed() { + Ok(msg) => { + if let Err(err) = self.item_sender.send(Some((mid, msg, req).into())) { + Err(messagebus::error::Error::TryAgain(err.0.unwrap().unwrap_send().unwrap().1.upcast_box())) + } else { + Ok(()) + } + } + + Err(msg) => Err(messagebus::error::Error::TryAgain(msg)), + } + } +} + +impl ReciveUntypedReceiver for QuicRelay + where B: Send +{ type Stream = GenericEventStream; fn event_stream(&self, bus: Bus) -> Self::Stream { - let mut recv = self.st_receiver.lock().take().unwrap(); + let self_id = self.self_id.clone(); + let sender = self.item_sender.clone(); + let mut recv = self.event_receiver.lock().take().unwrap(); Box::pin( futures::stream::poll_fn(move |cx|recv.poll_recv(cx)) - .map(move |uni_streams| { + .map(move |recv_do| { + let self_id = self_id.clone(); let bus = bus.clone(); - uni_streams.filter_map(move |recv| { - - let bus = bus.clone(); - let mut buff = Vec::new(); + let sender = sender.clone(); - async move { - let mut recv = recv.ok()?; + match recv_do { + RecvDo::Incoming(incoming) => { + futures::stream::unfold((incoming, bus, sender, self_id), |(mut incoming, bus, sender, self_id)| async move { + loop { + let (_, recv) = match incoming.next().await? { + Ok(recv) => recv, + Err(err) => { + println!("error: {}", err); + return None; + } + }; + + let buff = recv + .read_to_end(usize::max_value()) + .await + .unwrap(); + + // assert_eq!(&buff[0..4], b"MBUS"); - println!("1"); - - unsafe { buff.set_len(16) }; - recv.read_exact(&mut buff).await.unwrap(); - - println!("{:?}", buff); - - let mut reader = &buff[..]; - let mut sign = [0u8; 4]; - reader.copy_to_slice(&mut sign); - assert!(&sign != b"MBUS"); - - let version = reader.get_u16(); - assert!(version == 1); - - let content_type = reader.get_u16(); - - let body_size = reader.get_u64(); - let diff = buff.capacity() as i64 - body_size as i64; - if diff < 0 { - buff.reserve(-diff as usize); - } - - unsafe { buff.set_len(body_size as usize); } - recv.read_exact(&mut buff).await.unwrap(); - - println!("{:?}", buff); - - let event = match content_type { - 0 => { // CBOR - let proto: ProtocolPacket = serde_cbor::from_slice(&buff).unwrap(); - - match proto.deserialize(&bus).unwrap() { - ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), - _ => unimplemented!() + if buff.is_empty() { + println!("PONG"); + continue; } - }, - _ => unimplemented!() - }; - Some(event) + + let mut reader = &buff[4..]; + + let version = reader.get_u16(); + let content_type = reader.get_u16(); + let body_size = reader.get_u64(); + + println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size); + + let event = match content_type { + 0 => { // CBOR + let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap(); + match proto.deserialize(&bus).unwrap() { + ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), + ProtocolItem::Action(action) => { + match action { + Action::Close => { + println!("warning: Close recevied - ignoring!"); + sender.send(Some(ProtocolItem::Event(Event::Exited))).unwrap(); + }, + Action::Flush => { + bus.flush().await; + sender.send(Some(ProtocolItem::Event(Event::Flushed))).unwrap(); + }, + Action::Sync => { + bus.sync().await; + sender.send(Some(ProtocolItem::Event(Event::Synchronized(Ok(()))))).unwrap(); + }, + Action::Init(..) => (), + Action::Stats => (), + _ => (), + } + continue; + } + ProtocolItem::Send(mid, msg, req) => { + if req { + let res = bus.request_boxed( + msg.upcast_box(), + SendOptions::Except(self_id.load(Ordering::SeqCst)) + ) + .await + .map(|x|x.as_shared_boxed().unwrap()) + .map_err(|x|x.map_msg(|_|())); + + sender.send(Some(ProtocolItem::Event(Event::Response(mid, res)))).unwrap(); + } else { + let _ = bus.send_boxed(msg.upcast_box(), Default::default()) + .await; + } + + continue; + } + _ => unimplemented!() + } + }, + _ => unimplemented!() + }; + + return Some((event, (incoming, bus, sender, self_id))); + } + }).right_stream() } - }) + + other => futures::stream::once(async move { + match other { + RecvDo::Pause => Event::Pause, + RecvDo::Ready => Event::Ready, + RecvDo::Closed => Event::Exited, + _ => unreachable!() + } + }).left_stream() + } + + }) .flatten() ) diff --git a/crates/remote/src/relays/quic/server.rs b/crates/remote/src/relays/quic/server.rs index 4eabe19..5e0a30d 100644 --- a/crates/remote/src/relays/quic/server.rs +++ b/crates/remote/src/relays/quic/server.rs @@ -1,20 +1,16 @@ -use crate::{error::Error, proto::{BodyType, ProtocolItem, ProtocolPacket}, relays::{GenericEventStream, MessageTable}}; -use futures::StreamExt; -use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendOptions, SendUntypedReceiver, TypeTag, TypeTagAccept}; -use parking_lot::Mutex; -use quinn::IncomingBiStreams; -use std::{net::SocketAddr, sync::{Arc, atomic::{AtomicU64, Ordering}}}; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; -use bytes::{Buf, BufMut}; +use crate::error::Error; +use futures::{Future, Stream}; +use quinn::Connecting; +use std::{net::SocketAddr, pin::Pin, sync::Arc, task::{Context, Poll}}; -pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"]; +use super::WaitIdle; -pub struct QuicServerRelayEndpoint { - // endpoint: Mutex>, - incoming: Mutex>, +pub struct QuicServerEndpoint { + endpoint: quinn::Endpoint, + incoming: quinn::Incoming, } -impl QuicServerRelayEndpoint { +impl QuicServerEndpoint { pub fn new(key_path: &str, cert_path: &str, addr: &SocketAddr) -> Result { let mut transport_config = quinn::TransportConfig::default(); transport_config.max_concurrent_uni_streams(0)?; @@ -24,7 +20,7 @@ impl QuicServerRelayEndpoint { let mut server_config = quinn::ServerConfigBuilder::new(server_config); - server_config.protocols(ALPN_QUIC_HTTP); + server_config.protocols(super::ALPN_QUIC_HTTP); server_config.enable_keylog(); let key = std::fs::read(key_path)?; @@ -40,229 +36,27 @@ impl QuicServerRelayEndpoint { let mut endpoint = quinn::Endpoint::builder(); endpoint.listen(server_config.build()); - let (_, incoming) = endpoint.bind(addr)?; + let (endpoint, incoming) = endpoint.bind(addr)?; Ok(Self { - // endpoint: Mutex::new(Some(endpoint)), - incoming: Mutex::new(Some(incoming)) + endpoint, + incoming }) } } -pub struct QuicServerRelay { - // ready_flag: AtomicBool, - self_id: Arc, - endpoint: QuicServerRelayEndpoint, - outgoing_table: MessageTable, - sender: UnboundedSender, - receiver: Mutex, UnboundedSender)>>, - st_receiver: Mutex>>, +impl Stream for QuicServerEndpoint { + type Item = Connecting; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + unsafe { Pin::new_unchecked(&mut this.incoming) }.poll_next(cx) + } } -impl QuicServerRelay { - pub fn new(key_path: &str, cert_path: &str, addr: SocketAddr, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { - let endpoint = QuicServerRelayEndpoint::new(key_path, cert_path, &addr)?; - let (sender, receiver) = mpsc::unbounded_channel(); - let (st_sender, st_receiver) = mpsc::unbounded_channel(); - - Ok(Self { - // ready_flag: AtomicBool::new(false), - self_id: Arc::new(AtomicU64::new(0)), - endpoint, - outgoing_table: MessageTable::from(table), - sender, - receiver: Mutex::new(Some((receiver, st_sender))), - st_receiver: Mutex::new(Some(st_receiver)), - }) - } -} - -impl TypeTagAccept for QuicServerRelay { - fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { - self.outgoing_table.accept(msg, resp, err) - } - - fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { - let iter = self.outgoing_table.iter_types(); - - for (m, r, e) in iter { - if cb(m, r, e) { - return; - } - } - } -} - -impl SendUntypedReceiver for QuicServerRelay { - fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { - match msg { - Action::Init(self_id) => { - let (mut rx, recv_stream) = self.receiver.lock().take().unwrap(); - let mut incoming = self.endpoint.incoming.lock().take().unwrap(); - self.self_id.store(self_id, Ordering::SeqCst); - - tokio::spawn(async move { - let mut body_buff = Vec::new(); - let mut header_buff = Vec::new(); - - loop { - let conn = match incoming.next().await { - Some(x) => x, - None => todo!("message lost!!!") - }; - - println!("new connection"); - - let conn = conn.await.unwrap(); - - recv_stream.send(conn.bi_streams).unwrap(); - - while let Some(r) = rx.recv().await { - header_buff.clear(); - body_buff.clear(); - - let (mut send, _) = conn.connection.open_bi().await.unwrap(); - let pkt = r.serialize(BodyType::Cbor, &mut body_buff).unwrap(); - - header_buff.put(&b"MBUS"[..]); - header_buff.put_u16(1); - header_buff.put_u16(0); - header_buff.put_u64(header_buff.len() as _); - serde_cbor::to_writer(&mut header_buff, &pkt).unwrap(); - - send.write_all(&header_buff).await.unwrap(); - send.finish().await.unwrap(); - } - } - }); - } - - other => self.sender.send(other.into()).unwrap(), - } - - Ok(()) - } - - fn send_msg( - &self, - mid: u64, - msg: Box, - req: bool, - _bus: &Bus, - ) -> Result<(), messagebus::error::Error>> { - match msg.as_shared_boxed() { - Ok(msg) => { - if let Err(err) = self.sender.send((mid, msg, req).into()) { - Err(messagebus::error::Error::TryAgain(err.0.unwrap_send().unwrap().1.upcast_box())) - } else { - Ok(()) - } - } - - Err(msg) => Err(messagebus::error::Error::TryAgain(msg)), - } - } -} - -impl ReciveUntypedReceiver for QuicServerRelay { - type Stream = GenericEventStream; - - fn event_stream(&self, bus: Bus) -> Self::Stream { - let self_id = self.self_id.clone(); - let sender = self.sender.clone(); - let mut recv = self.st_receiver.lock().take().unwrap(); - - Box::pin( - futures::stream::poll_fn(move |cx|recv.poll_recv(cx)) - .map(move |uni_streams| { - let self_id = self_id.clone(); - let bus = bus.clone(); - let sender = sender.clone(); - // let buff = Bytes::new(); - - futures::stream::unfold((true, uni_streams, bus, sender, self_id), |(first, mut uni_streams, bus, sender, self_id)| async move { - loop { - if first { - return Some((Event::Ready, (false, uni_streams, bus, sender, self_id))); - } - - let (_, recv) = match uni_streams.next().await? { - Ok(recv) => recv, - Err(err) => { - println!("error: {}", err); - return None; - } - }; - - let buff = recv - .read_to_end(usize::max_value()) - .await - .unwrap(); - - // assert_eq!(&buff[0..4], b"MBUS"); - - let mut reader = &buff[4..]; - - let version = reader.get_u16(); - let content_type = reader.get_u16(); - let body_size = reader.get_u64(); - - println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size); - - let event = match content_type { - 0 => { // CBOR - let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap(); - match proto.deserialize(&bus).unwrap() { - ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), - ProtocolItem::Action(action) => { - match action { - Action::Close => { - println!("warning: Close recevied - ignoring!"); - sender.send(ProtocolItem::Event(Event::Exited)).unwrap(); - }, - Action::Flush => { - bus.flush().await; - sender.send(ProtocolItem::Event(Event::Flushed)).unwrap(); - }, - Action::Sync => { - bus.sync().await; - sender.send(ProtocolItem::Event(Event::Synchronized(Ok(())))).unwrap(); - }, - Action::Init(..) => (), - Action::Stats => (), - _ => (), - } - continue; - } - ProtocolItem::Send(mid, msg, req) => { - if req { - let res = bus.request_boxed( - msg.upcast_box(), - SendOptions::Except(self_id.load(Ordering::SeqCst)) - ) - .await - .map(|x|x.as_shared_boxed().unwrap()) - .map_err(|x|x.map_msg(|_|())); - - sender.send(ProtocolItem::Event(Event::Response(mid, res))).unwrap(); - } else { - let _ = bus.send_boxed(msg.upcast_box(), Default::default()) - .await; - } - - continue; - } - _ => unimplemented!() - } - }, - _ => unimplemented!() - }; - - return Some((event, (false, uni_streams, bus, sender, self_id))); - } - }) - }) - .flatten() - ) +impl<'a> WaitIdle<'a> for QuicServerEndpoint { + type Fut = Pin + Send + 'a>> ; + fn wait_idle(&'a self) -> Self::Fut { + Box::pin(self.endpoint.wait_idle()) } } \ No newline at end of file diff --git a/crates/remote/src/relays/redis/mod.rs b/crates/remote/src/relays/redis/mod.rs index 805acaf..fa5e730 100644 --- a/crates/remote/src/relays/redis/mod.rs +++ b/crates/remote/src/relays/redis/mod.rs @@ -1,44 +1,61 @@ + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use messagebus::{Action, Bus, Event, Message, ReciveUntypedReceiver, SendUntypedReceiver, TypeTag, TypeTagAccept}; - -use crate::error::Error; - -use super::{GenericEventStream, MessageTable}; +use parking_lot::Mutex; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use futures::StreamExt; +use crate::proto::{BodyType, ProtocolItem}; +use super::{GenericEventStream, MessageTable}; +use redis::AsyncCommands; + + +#[derive(Debug)] +enum RecvDo { + Pause, + Ready, + Closed, +} + + pub struct RedisRelay { - in_table: MessageTable, - out_table: MessageTable, + client: Arc, + self_id: Arc, + table: MessageTable, + item_sender: UnboundedSender>, + item_receiver: Mutex>>>, + event_sender: UnboundedSender, + event_receiver: Mutex>>, } impl RedisRelay { - fn new(uri: &str) -> Result { - let client = redis::Client::open("redis://127.0.0.1/")?; - unimplemented!() + pub fn new(path: &str, table: Vec<(TypeTag, TypeTag, TypeTag)>) -> Result { + let client = redis::Client::open(path)?; + let (item_sender, item_receiver) = mpsc::unbounded_channel(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); - - // let mut publish_conn = client.get_async_connection().await?; - // let mut pubsub_conn = client.get_async_connection().await?.into_pubsub(); - - // pubsub_conn.subscribe("wavephone").await?; - // let mut pubsub_stream = pubsub_conn.on_message(); - - // publish_conn.publish("wavephone", "banana").await?; - - // let pubsub_msg: String = pubsub_stream.next().await.unwrap().get_payload()?; - // assert_eq!(&pubsub_msg, "banana"); - - // Ok(()) + Ok(RedisRelay { + client: Arc::new(client), + self_id: Arc::new(AtomicU64::new(0)), + table: MessageTable::from(table), + item_sender, + item_receiver: Mutex::new(Some(item_receiver)), + event_sender, + event_receiver: Mutex::new(Some(event_receiver)), + }) } -} +} impl TypeTagAccept for RedisRelay { fn accept(&self, msg: &TypeTag, resp: Option<&TypeTag>, err: Option<&TypeTag>) -> bool { - self.out_table.accept(msg, resp, err) + self.table.accept(msg, resp, err) } fn iter_types(&self, cb: &mut dyn FnMut(&TypeTag, &TypeTag, &TypeTag) -> bool) { - let iter = self.out_table.iter_types(); + let iter = self.table.iter_types(); for (m, r, e) in iter { if cb(m, r, e) { @@ -51,26 +68,48 @@ impl TypeTagAccept for RedisRelay { impl SendUntypedReceiver for RedisRelay { fn send(&self, msg: Action, _bus: &Bus) -> Result<(), messagebus::error::Error> { match msg { - Action::Init => { - // let (sender, mut rx) = self.receiver_send.lock().take().unwrap(); - // let conn = self.endpoint.incoming(); + Action::Init(self_id) => { + let event_sender = self.event_sender.clone(); + let mut rx = self.item_receiver.lock().take().unwrap(); - // tokio::spawn(async move { - // let mut conn = conn.await.unwrap(); - // sender.send((conn.recv, conn.connection)).unwrap(); - // let mut buf1 = Vec::new(); - // let mut buf2 = Vec::new(); + let client = self.client.clone(); + self.self_id.store(self_id, Ordering::SeqCst); - // while let Some(r) = rx.recv().await { - // r.serialize(&mut buf1, &mut buf2, &mut conn.send) - // .await - // .unwrap(); + tokio::spawn(async move { + let mut connection = client.get_tokio_connection().await.unwrap(); + let mut body_buff = Vec::new(); + let mut header_buff = Vec::new(); + // let mut item = None; - // } - // }); + event_sender.send(RecvDo::Ready).unwrap(); + + while let Some(Some(item)) = rx.recv().await { + header_buff.clear(); + body_buff.clear(); + + let pkt = item.serialize(BodyType::Cbor, &mut body_buff).unwrap(); + + serde_cbor::to_writer(&mut header_buff, &pkt).unwrap(); + + let channel = match &item { + ProtocolItem::Action(_) => "mbus_action".into(), + ProtocolItem::Send(_, msg, _) => format!("mbus_request::{}", msg.type_tag()), + ProtocolItem::Event(ev) => "mbus_response::".into(), + _ => unreachable!() + }; + + let () = connection.publish(channel, &header_buff).await.unwrap(); + } + + }); } - other => self.sender.send(other.into()).unwrap(), + Action::Close => { + self.item_sender.send(None).unwrap(); + // self.event_sender.send(RecvDo::Closed).unwrap(); + } + + other => self.item_sender.send(Some(other.into())).unwrap(), } Ok(()) @@ -85,8 +124,8 @@ impl SendUntypedReceiver for RedisRelay { ) -> Result<(), messagebus::error::Error>> { match msg.as_shared_boxed() { Ok(msg) => { - if let Err(err) = self.sender.send((req, mid, msg).into()) { - Err(messagebus::error::Error::TryAgain(err.0.unwrap_msg().unwrap())) + if let Err(err) = self.item_sender.send(Some((mid, msg, req).into())) { + Err(messagebus::error::Error::TryAgain(err.0.unwrap().unwrap_send().unwrap().1.upcast_box())) } else { Ok(()) } @@ -101,24 +140,117 @@ impl ReciveUntypedReceiver for RedisRelay { type Stream = GenericEventStream; fn event_stream(&self, bus: Bus) -> Self::Stream { - let recevier = self.recevier.lock().take().unwrap(); + let self_id = self.self_id.clone(); + let sender = self.item_sender.clone(); + let mut recv = self.event_receiver.lock().take().unwrap(); Box::pin( - futures::stream::unfold((recevier, bus), |(recv, bus)| async move { - if let Ok(r) = recv.next().await? { - let stream = futures::stream::unfold((true, r, bus.clone()), |(first, r, bus)| async move { - if first { - return Some((Event::Ready, (false, r, bus))); - } - - Some((Event::Pause, (false, r, bus))) - }); + futures::stream::poll_fn(move |cx|recv.poll_recv(cx)) + .map(move |recv_do| { + let self_id = self_id.clone(); + let bus = bus.clone(); + let sender = sender.clone(); - Some((stream.left_stream(), (recv, bus))) - } else { - Some((futures::stream::once(async move { Event::Pause }).right_stream(), (recv, bus))) - } - }).flatten() + match recv_do { + // RecvDo::Incoming(incoming) => { + // futures::stream::unfold((incoming, bus, sender, self_id), |(mut incoming, bus, sender, self_id)| async move { + // loop { + // let (_, recv) = match incoming.next().await? { + // Ok(recv) => recv, + // Err(err) => { + // println!("error: {}", err); + // return None; + // } + // }; + + // let buff = recv + // .read_to_end(usize::max_value()) + // .await + // .unwrap(); + + // // assert_eq!(&buff[0..4], b"MBUS"); + + // if buff.is_empty() { + // println!("PONG"); + // continue; + // } + + // let mut reader = &buff[4..]; + + // let version = reader.get_u16(); + // let content_type = reader.get_u16(); + // let body_size = reader.get_u64(); + + // println!("inbound packet {}: v: {}; ct: {}; bs: {}", String::from_utf8_lossy(&buff[0..4]), version, content_type, body_size); + + // let event = match content_type { + // 0 => { // CBOR + // let proto: ProtocolPacket = serde_cbor::from_slice(&buff[16..]).unwrap(); + // match proto.deserialize(&bus).unwrap() { + // ProtocolItem::Event(ev) => ev.map_msg(|msg|msg.upcast_box()), + // ProtocolItem::Action(action) => { + // match action { + // Action::Close => { + // println!("warning: Close recevied - ignoring!"); + // sender.send(Some(ProtocolItem::Event(Event::Exited))).unwrap(); + // }, + // Action::Flush => { + // bus.flush().await; + // sender.send(Some(ProtocolItem::Event(Event::Flushed))).unwrap(); + // }, + // Action::Sync => { + // bus.sync().await; + // sender.send(Some(ProtocolItem::Event(Event::Synchronized(Ok(()))))).unwrap(); + // }, + // Action::Init(..) => (), + // Action::Stats => (), + // _ => (), + // } + // continue; + // } + // ProtocolItem::Send(mid, msg, req) => { + // if req { + // let res = bus.request_boxed( + // msg.upcast_box(), + // SendOptions::Except(self_id.load(Ordering::SeqCst)) + // ) + // .await + // .map(|x|x.as_shared_boxed().unwrap()) + // .map_err(|x|x.map_msg(|_|())); + + // sender.send(Some(ProtocolItem::Event(Event::Response(mid, res)))).unwrap(); + // } else { + // let _ = bus.send_boxed(msg.upcast_box(), Default::default()) + // .await; + // } + + // continue; + // } + // _ => unimplemented!() + // } + // }, + // _ => unimplemented!() + // }; + + // return Some((event, (incoming, bus, sender, self_id))); + // } + // }).right_stream() + // } + + other => futures::stream::once(async move { + match other { + RecvDo::Pause => Event::Pause, + RecvDo::Ready => Event::Ready, + RecvDo::Closed => Event::Exited, + _ => unreachable!() + } + }) + // .left_stream() + } + + + }) + .flatten() ) } -} +} \ No newline at end of file