Add request_boxed_we

This commit is contained in:
Andrey Tkachenko 2021-08-12 13:16:30 +04:00
parent 6d8dd039b3
commit 7332c04e53
6 changed files with 168 additions and 21 deletions

View File

@ -6,7 +6,7 @@ use proc_macro::TokenStream;
use quote::quote;
use std::fmt::Write;
use syn::parse::{Parse, ParseStream};
use syn::{parenthesized, LitStr, Result};
use syn::{parenthesized, Result};
use syn::{punctuated::Punctuated, token::Comma, DeriveInput};
fn shared_part(_ast: &syn::DeriveInput, has_shared: bool) -> proc_macro2::TokenStream {
@ -55,10 +55,12 @@ fn clone_part(ast: &syn::DeriveInput, has_clone: bool) -> proc_macro2::TokenStre
}
}
fn type_tag_part(ast: &syn::DeriveInput, type_tag: Option<LitStr>) -> proc_macro2::TokenStream {
fn type_tag_part(ast: &syn::DeriveInput, type_tag: Option<String>, namespace: Option<String>) -> proc_macro2::TokenStream {
let class_name = &ast.ident;
let name = if let Some(tt) = type_tag {
tt.value()
tt
} else if let Some(ns) = namespace {
format!("{}::{}", ns, class_name)
} else {
class_name.to_string()
};
@ -189,14 +191,15 @@ impl Parse for Tags {
}
}
#[proc_macro_derive(Message, attributes(type_tag, message))]
#[proc_macro_derive(Message, attributes(type_tag, message, namespace))]
pub fn derive_message(input: TokenStream) -> TokenStream {
let mut tags = Tags::default();
let mut type_tag = None;
let mut namespace = None;
let ast: DeriveInput = syn::parse(input).unwrap();
let name = &ast.ident;
let (impl_generics, ty_generics, where_clause) = ast.generics.split_for_impl();
let (_, ty_generics, where_clause) = ast.generics.split_for_impl();
for attr in &ast.attrs {
if let Some(i) = attr.path.get_ident() {
match i.to_string().as_str() {
@ -207,7 +210,12 @@ pub fn derive_message(input: TokenStream) -> TokenStream {
"type_tag" => {
let tt: TypeTag = syn::parse2(attr.tokens.clone()).unwrap();
type_tag = Some(tt.inner);
type_tag = Some(tt.inner.value());
}
"namespace" => {
let tt: TypeTag = syn::parse2(attr.tokens.clone()).unwrap();
namespace = Some(tt.inner.value());
}
_ => (),
@ -215,7 +223,19 @@ pub fn derive_message(input: TokenStream) -> TokenStream {
}
}
let type_tag_part = type_tag_part(&ast, type_tag);
let mut impl_generics = ast.generics.clone();
for mut param in impl_generics.params.pairs_mut() {
match &mut param.value_mut() {
syn::GenericParam::Lifetime(_) => {}
syn::GenericParam::Type(param) => {
let bound: syn::TypeParamBound = syn::parse_str("messagebus::MessageBounds").unwrap();
param.bounds.push(bound);
}
syn::GenericParam::Const(_param) => {}
}
}
let type_tag_part = type_tag_part(&ast, type_tag, namespace);
let shared_part = shared_part(&ast, tags.has_shared);
let clone_part = clone_part(&ast, tags.has_clone);
@ -236,16 +256,23 @@ pub fn derive_message(input: TokenStream) -> TokenStream {
tokens.into()
}
#[proc_macro_derive(Error, attributes(type_tag))]
#[proc_macro_derive(Error, attributes(type_tag, namespace))]
pub fn derive_error(input: TokenStream) -> TokenStream {
let mut type_tag = None;
let mut namespace = None;
let ast: DeriveInput = syn::parse(input).unwrap();
for attr in &ast.attrs {
if let Some(i) = attr.path.get_ident() {
match i.to_string().as_str() {
"type_tag" => {
let tt: TypeTag = syn::parse2(attr.tokens.clone()).unwrap();
type_tag = Some(tt.inner);
type_tag = Some(tt.inner.value());
}
"namespace" => {
let tt: TypeTag = syn::parse2(attr.tokens.clone()).unwrap();
namespace = Some(tt.inner.value());
}
_ => (),
@ -253,7 +280,7 @@ pub fn derive_error(input: TokenStream) -> TokenStream {
}
}
let type_tag_part = type_tag_part(&ast, type_tag);
let type_tag_part = type_tag_part(&ast, type_tag, namespace);
let tokens = quote! {
#type_tag_part
};

View File

@ -244,6 +244,7 @@ impl Module {
>(
mut self,
) -> Self {
println!("insert {}", M::type_tag_());
self.message_types.insert(
M::type_tag_(),
MessageTypeDescriptor {

View File

@ -1,5 +1,5 @@
use core::fmt;
use std::{any::type_name, borrow::Cow};
use std::{any::type_name};
use thiserror::Error;
use tokio::sync::oneshot;
@ -26,7 +26,7 @@ impl GenericError {
pub fn from_any<T: TypeTagged + fmt::Display>(err: T) -> Self {
GenericError {
type_tag: err.type_tag(),
description: format!("{}", err),
description: format!("{}[{}]", err.type_tag(), err),
}
}
}
@ -40,7 +40,7 @@ impl fmt::Display for GenericError {
impl std::error::Error for GenericError {}
impl TypeTagged for GenericError {
fn type_name(&self) -> Cow<str> {
fn type_tag_() -> TypeTag {
type_name::<GenericError>().into()
}
@ -48,10 +48,7 @@ impl TypeTagged for GenericError {
type_name::<GenericError>().into()
}
fn type_tag_() -> TypeTag
where
Self: Sized,
{
fn type_name(&self) -> TypeTag {
type_name::<GenericError>().into()
}
}
@ -156,6 +153,14 @@ impl<M: fmt::Debug + 'static, E: StdSyncSendError> Error<M, E> {
Error::NotReady => Error::NotReady,
}
}
pub fn try_unwrap(self) -> Option<E> {
match self {
Error::Other(inner) => Some(inner),
Error::OtherBoxed(_) => None,
_ => None,
}
}
}
impl<M: Message, E: StdSyncSendError> Error<M, E> {
@ -242,3 +247,12 @@ impl Error<Box<dyn Message>> {
}
}
}
// impl<M: fmt::Debug> Error<M> {
// pub fn downcast<E>(self) -> Result<E, Self> {
// match self {
// Error::OtherBoxed(inner) => Ok(),
// err => Err(err)
// }
// }
// }

View File

@ -492,6 +492,38 @@ impl Bus {
}
}
pub async fn request_boxed_we<E: StdSyncSendError>(
&self,
req: Box<dyn Message>,
options: SendOptions,
) -> Result<Box<dyn Message>, Error<Box<dyn Message>, E>> {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SendError::Closed(req).into());
}
let tt = req.type_tag();
let eid = E::type_tag_();
let mut iter = self.select_receivers(&tt, options, None, Some(&eid));
if let Some(rc) = iter.next() {
let (mid, rx) = rc.add_response_waiter_boxed_we().map_err(|x| {
x.map_err(|_| unimplemented!())
.map_msg(|_| unimplemented!())
})?;
rc.send_boxed(
self,
mid | 1 << (usize::BITS - 1),
req,
rc.reserve(&tt).await,
).map_err(|x| x.map_err(|_| unimplemented!()))?;
rx.await.map_err(|x| x.specify::<Box<dyn Message>>())
} else {
Err(Error::NoReceivers)
}
}
pub async fn send_deserialize_one<'a, 'b: 'a, 'c: 'a>(
&'a self,
tt: TypeTag,

View File

@ -70,6 +70,13 @@ pub trait WrapperReturnTypeOnly<R: Message>: Send + Sync {
) -> Result<u64, Error>;
}
pub trait WrapperErrorTypeOnly<E: StdSyncSendError>: Send + Sync {
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error<(), E>>>,
) -> Result<u64, Error>;
}
pub trait WrapperReturnTypeAndError<R: Message, E: StdSyncSendError>: Send + Sync {
fn start_polling_events(
self: Arc<Self>,
@ -252,6 +259,10 @@ where
Waiter::Boxed(sender) => sender
.send(resp.map_err(|e| e.into_dyn()).map(|x| x.into_boxed()))
.unwrap(),
Waiter::BoxedWithError(sender) => sender
.send(resp.map(|x| x.into_boxed()))
.unwrap(),
}
}
@ -277,6 +288,24 @@ where
}
}
impl<M, R, E, S> WrapperErrorTypeOnly<E> for ReceiverWrapper<M, R, E, S>
where
M: Message,
R: Message,
E: StdSyncSendError,
S: ReciveTypedReceiver<R, E> + Send + Sync + 'static,
{
fn add_response_listener(
&self,
listener: oneshot::Sender<Result<Box<dyn Message>, Error<(), E>>>,
) -> Result<u64, Error> {
Ok(self
.waiters
.insert(Waiter::BoxedWithError(listener))
.ok_or_else(|| Error::AddListenerError)? as _)
}
}
impl<M, R, E, S> TypeTagAccept for ReceiverWrapper<M, R, E, S>
where
M: Message,
@ -481,6 +510,7 @@ unsafe impl Send for AnyReceiver<'_> {}
pub struct AnyWrapperRef<'a> {
data: *mut (),
wrapper_r: (TypeId, *mut ()),
wrapper_e: (TypeId, *mut ()),
wrapper_re: (TypeId, *mut ()),
_m: PhantomData<&'a usize>,
}
@ -490,12 +520,14 @@ impl<'a> AnyWrapperRef<'a> {
where
R: Message,
E: StdSyncSendError,
S: WrapperReturnTypeOnly<R> + WrapperReturnTypeAndError<R, E> + 'static,
S: WrapperReturnTypeOnly<R> + WrapperErrorTypeOnly<E> + WrapperReturnTypeAndError<R, E> + 'static,
{
let wrapper_r = rcvr as &(dyn WrapperReturnTypeOnly<R>);
let wrapper_e = rcvr as &(dyn WrapperErrorTypeOnly<E>);
let wrapper_re = rcvr as &(dyn WrapperReturnTypeAndError<R, E>);
let wrapper_r: TraitObject = unsafe { mem::transmute(wrapper_r) };
let wrapper_e: TraitObject = unsafe { mem::transmute(wrapper_e) };
let wrapper_re: TraitObject = unsafe { mem::transmute(wrapper_re) };
Self {
@ -504,6 +536,10 @@ impl<'a> AnyWrapperRef<'a> {
TypeId::of::<dyn WrapperReturnTypeOnly<R>>(),
wrapper_r.vtable,
),
wrapper_e: (
TypeId::of::<dyn WrapperErrorTypeOnly<E>>(),
wrapper_e.vtable,
),
wrapper_re: (
TypeId::of::<dyn WrapperReturnTypeAndError<R, E>>(),
wrapper_re.vtable,
@ -526,6 +562,20 @@ impl<'a> AnyWrapperRef<'a> {
})
}
#[inline]
pub fn cast_error_only<E: StdSyncSendError>(&'a self) -> Option<&'a dyn WrapperErrorTypeOnly<E>> {
if self.wrapper_e.0 != TypeId::of::<dyn WrapperErrorTypeOnly<E>>() {
return None;
}
Some(unsafe {
mem::transmute(TraitObject {
data: self.data,
vtable: self.wrapper_e.1,
})
})
}
#[inline]
pub fn cast_ret_and_error<R: Message, E: StdSyncSendError>(
&'a self,
@ -591,6 +641,7 @@ enum Waiter<R: Message, E: StdSyncSendError> {
WithErrorType(oneshot::Sender<Result<R, Error<(), E>>>),
WithoutErrorType(oneshot::Sender<Result<R, Error>>),
Boxed(oneshot::Sender<Result<Box<dyn Message>, Error>>),
BoxedWithError(oneshot::Sender<Result<Box<dyn Message>, Error<(), E>>>),
}
#[derive(Clone)]
@ -768,6 +819,28 @@ impl Receiver {
}))
}
#[inline]
pub(crate) fn add_response_waiter_boxed_we<E: StdSyncSendError>(
&self,
) -> Result<(u64, impl Future<Output = Result<Box<dyn Message>, Error<(), E>>>), Error> {
if let Some(any_wrapper) = self.inner.wrapper() {
let (tx, rx) = oneshot::channel();
let mid = any_wrapper
.cast_error_only::<E>()
.unwrap()
.add_response_listener(tx)?;
Ok((mid, async move {
match rx.await {
Ok(x) => x,
Err(err) => Err(Error::from(err)),
}
}))
} else {
unimplemented!()
}
}
#[inline]
pub(crate) fn add_response_waiter<R: Message>(
&self,

View File

@ -17,12 +17,12 @@ impl<M: Message> From<error::Error<M>> for Error {
}
#[derive(Debug, Clone, Message)]
#[type_tag("api::Msg")]
pub struct Msg<F: MessageBounds + Clone>(pub F);
#[namespace("api")]
pub struct Msg<F>(pub F);
#[derive(Debug, Clone, Message)]
#[type_tag("api::Query")]
pub struct Qqq<F: MessageBounds + Clone, G: MessageBounds + Clone, H: MessageBounds + Clone>(
pub struct Qqq<F, G, H>(
pub F,
pub G,
pub H,