From 800da160ac1af938e23f424477f80d7559836d14 Mon Sep 17 00:00:00 2001 From: Andrey Tkachenko Date: Tue, 28 Feb 2023 14:51:32 +0400 Subject: [PATCH] Queue step 1 --- Cargo.toml | 1 + examples/producer.rs | 10 ++- examples/run.rs | 7 +- src/bus.rs | 46 ++++++++-- src/cell.rs | 147 +++++++++++++++++++++++++----- src/handler.rs | 4 +- src/lib.rs | 88 ++++++++++++++++++ src/message.rs | 5 +- src/receiver.rs | 26 +++--- src/receivers/mod.rs | 5 +- src/receivers/producer.rs | 6 +- src/receivers/queue.rs | 183 ++++++++++++++++++++------------------ src/receivers/wrapper.rs | 4 + src/type_tag.rs | 8 +- 14 files changed, 394 insertions(+), 146 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 33f3363..222193b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ tokio = { version = "1.17.0", features = ["sync", "rt" ] } smallvec = { version = "1.10.0", features = ["const_new"] } tokio-util = "0.7.7" arc-swap = "1.6.0" +once_cell = "1.17.1" [dev-dependencies] tokio = { version = "1.17.0", features = ["full"] } diff --git a/examples/producer.rs b/examples/producer.rs index fd7005b..53fdbf0 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -86,6 +86,10 @@ impl Message for Msg { { Some(Self(self.0)) } + + fn is_cloneable(&self) -> bool { + false + } } #[derive(Debug, Clone)] struct StartMsg; @@ -152,6 +156,10 @@ impl Message for StartMsg { { Some(Self) } + + fn is_cloneable(&self) -> bool { + false + } } struct Test { @@ -190,7 +198,7 @@ impl Handler for Test { type FlushFuture<'a> = impl Future> + 'a; fn handle(&self, msg: &mut MsgCell, bus: &Bus) -> Self::HandleFuture<'_> { - let msg = msg.take().unwrap(); + let msg = msg.get(); async move { tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/examples/run.rs b/examples/run.rs index 0a7324d..c826cc5 100644 --- a/examples/run.rs +++ b/examples/run.rs @@ -9,7 +9,6 @@ use messagebus::{ error::Error, handler::Handler, message::{Message, SharedMessage}, - receiver::IntoAbstractReceiver, receivers::wrapper::HandlerWrapper, type_tag::{TypeTag, TypeTagInfo}, }; @@ -79,6 +78,10 @@ impl Message for Msg { { Some(Self(self.0)) } + + fn is_cloneable(&self) -> bool { + false + } } struct Test { @@ -91,7 +94,7 @@ impl Handler for Test { type FlushFuture<'a> = impl Future> + 'a; fn handle(&self, msg: &mut MsgCell, _bus: &Bus) -> Self::HandleFuture<'_> { - let msg = msg.take().unwrap(); + let msg = msg.get(); async move { println!("msg {msg:?}"); diff --git a/src/bus.rs b/src/bus.rs index fedbcb4..29ae440 100644 --- a/src/bus.rs +++ b/src/bus.rs @@ -16,6 +16,8 @@ use crate::{ receiver::{AbstractReceiver, IntoAbstractReceiver, Receiver}, }; +pub use crate::handler::*; + pub struct TaskHandlerVTable { pub drop: fn(Arc, usize), } @@ -262,9 +264,17 @@ impl BusReceivers { } } + #[inline] pub fn add(&mut self, mask: MaskMatch, inner: Arc) { self.inner.push(BusReceiver { inner, mask }) } + + #[inline] + pub fn iter(&self, mask: u64) -> impl Iterator> + '_ { + self.inner + .iter() + .filter_map(move |x| x.mask.test(mask).then(move || &x.inner)) + } } impl From> for BusReceivers { @@ -335,10 +345,16 @@ impl BusInner { continue; } - let task = receiver.inner.try_send_dyn(msg, bus)?; + match receiver.inner.try_send_dyn(msg, bus) { + Ok(task) => { + let receiver = receiver.clone(); + self.processing.push(task, receiver.inner, false); + } - let receiver = receiver.clone(); - self.processing.push(task, receiver.inner, false); + Err(err) => { + println!("send failed {}", err); + } + } } Ok(()) @@ -362,10 +378,16 @@ impl BusInner { continue; } - let task = receiver.inner.send_dyn(msg, bus.clone()).await?; + match receiver.inner.send_dyn(msg, bus.clone()).await { + Ok(task) => { + let receiver = receiver.clone(); + self.processing.push(task, receiver.inner, false); + } - let receiver = receiver.clone(); - self.processing.push(task, receiver.inner, false); + Err(err) => { + println!("send failed {}", err); + } + } } Ok(()) @@ -389,10 +411,16 @@ impl BusInner { continue; } - let task = receiver.inner.send_dyn(msg, bus.clone()).await?; + match receiver.inner.send_dyn(msg, bus.clone()).await { + Ok(task) => { + let receiver = receiver.clone(); + self.processing.push(task, receiver.inner, true); + } - let receiver = receiver.clone(); - self.processing.push(task, receiver.inner, true); + Err(err) => { + println!("send failed {}", err); + } + } } Ok(()) diff --git a/src/cell.rs b/src/cell.rs index 23e7fd1..cf763f5 100644 --- a/src/cell.rs +++ b/src/cell.rs @@ -7,6 +7,7 @@ pub trait MessageCell: Send + 'static { fn type_tag(&self) -> TypeTag; fn as_any(&self) -> &dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any; + fn finalize(&mut self); fn deserialize_from(&mut self, de: &mut dyn erased_serde::Deserializer) -> Result<(), Error>; } @@ -49,7 +50,7 @@ impl dyn MessageCell { pub fn take_cell(&mut self) -> Result, Error> { match self.as_any_mut().downcast_mut::>() { - Some(cell) => Ok(MsgCell(cell.take().ok())), + Some(cell) => Ok(MsgCell(cell.0.take())), None => Err(Error::MessageDynamicCastFail( self.type_tag(), T::TYPE_TAG(), @@ -59,7 +60,7 @@ impl dyn MessageCell { pub fn take(&mut self) -> Result { match self.as_any_mut().downcast_mut::>() { - Some(cell) => cell.take(), + Some(cell) => cell.take().ok_or(Error::EmptyMessageCellError), None => Err(Error::MessageDynamicCastFail( self.type_tag(), T::TYPE_TAG(), @@ -67,15 +68,15 @@ impl dyn MessageCell { } } - pub fn cloned(&self) -> Result, Error> { - match self.as_any().downcast_ref::>() { - Some(cell) => Ok(cell.clone()), - None => Err(Error::MessageDynamicCastFail( - self.type_tag(), - T::TYPE_TAG(), - )), - } - } + // pub fn cloned(&self) -> Result, Error> { + // match self.as_any().downcast_ref::>() { + // Some(cell) => Ok(cell.clone()), + // None => Err(Error::MessageDynamicCastFail( + // self.type_tag(), + // T::TYPE_TAG(), + // )), + // } + // } } #[derive(Debug, Clone)] @@ -133,21 +134,107 @@ impl MessageCell for ResultCell { R::TYPE_TAG() } + fn finalize(&mut self) {} + fn deserialize_from(&mut self, _de: &mut dyn erased_serde::Deserializer) -> Result<(), Error> { Ok(()) } } -#[derive(Debug, Clone)] -pub struct MsgCell(Option); +#[derive(Debug)] +enum MsgCellInner { + Empty, + Clonable(M), + Message(M), +} + +impl MsgCellInner { + pub fn as_ref(&self) -> Option<&M> { + match self { + MsgCellInner::Empty => None, + MsgCellInner::Clonable(x) => Some(x), + MsgCellInner::Message(x) => Some(x), + } + } + + #[inline] + fn get(&mut self) -> M { + match std::mem::replace(self, MsgCellInner::Empty) { + MsgCellInner::Empty => panic!("!!!"), + MsgCellInner::Clonable(m) => { + if let Some(x) = m.try_clone() { + *self = MsgCellInner::Clonable(m); + x + } else { + m + } + } + MsgCellInner::Message(m) => m, + } + } + + #[inline] + fn take(&mut self) -> MsgCellInner { + std::mem::replace(self, MsgCellInner::Empty) + } + + #[inline] + fn take_option(self) -> Option { + match self { + MsgCellInner::Empty => None, + MsgCellInner::Clonable(m) => Some(m), + MsgCellInner::Message(m) => Some(m), + } + } + + #[inline] + fn finalize(&mut self) { + *self = match std::mem::replace(self, MsgCellInner::Empty) { + MsgCellInner::Empty => MsgCellInner::Empty, + MsgCellInner::Clonable(m) => MsgCellInner::Message(m), + MsgCellInner::Message(m) => MsgCellInner::Message(m), + } + } + + #[inline] + fn put(&mut self, val: M) { + *self = MsgCellInner::new(val); + } + + #[inline] + fn unwrap_or(self, err: T) -> Result { + match self { + MsgCellInner::Empty => Err(err), + MsgCellInner::Clonable(m) => Ok(m), + MsgCellInner::Message(m) => Ok(m), + } + } + + #[inline] + fn is_empty(&self) -> bool { + matches!(self, MsgCellInner::Empty) + } + + #[inline] + fn new(msg: M) -> MsgCellInner { + if msg.is_cloneable() { + MsgCellInner::Clonable(msg) + } else { + MsgCellInner::Message(msg) + } + } +} + +#[derive(Debug)] +pub struct MsgCell(MsgCellInner); impl MsgCell { pub fn new(msg: M) -> Self { - MsgCell(Some(msg)) + MsgCell(MsgCellInner::new(msg)) } pub fn empty() -> Self { - MsgCell(None) + MsgCell(MsgCellInner::Empty) } #[inline] @@ -156,8 +243,13 @@ impl MsgCell { } #[inline] - pub fn take(&mut self) -> Result { - self.0.take().ok_or(Error::EmptyMessageCellError) + pub fn get(&mut self) -> M { + self.0.get() + } + + #[inline] + pub fn take(&mut self) -> Option { + self.0.take().take_option() } #[inline] @@ -167,7 +259,12 @@ impl MsgCell { #[inline] pub fn put(&mut self, val: M) { - self.0.replace(val); + self.0.put(val); + } + + #[inline] + pub fn make_value(&mut self) { + self.0.finalize() } #[inline] @@ -175,15 +272,15 @@ impl MsgCell { self } - #[inline] - pub fn clone(&self) -> MsgCell { - MsgCell(self.0.as_ref().and_then(|x| x.try_clone())) - } + // #[inline] + // pub fn clone(&self) -> MsgCell { + // MsgCell(MsgCellInner::) + // } } impl MessageCell for MsgCell { fn is_empty(&self) -> bool { - self.0.is_none() + self.0.is_empty() } fn as_any(&self) -> &dyn Any { @@ -198,6 +295,10 @@ impl MessageCell for MsgCell { M::TYPE_TAG() } + fn finalize(&mut self) { + self.0.finalize() + } + fn deserialize_from(&mut self, _de: &mut dyn erased_serde::Deserializer) -> Result<(), Error> { Ok(()) } diff --git a/src/handler.rs b/src/handler.rs index 3bdb834..4f638f0 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -2,7 +2,7 @@ use futures::Future; use crate::{bus::Bus, cell::MsgCell, error::Error, message::Message}; -pub trait Handler { +pub trait Handler: Send + Sync { type Response: Message; type HandleFuture<'a>: Future> + Send + 'a where @@ -16,7 +16,7 @@ pub trait Handler { fn flush(&mut self, bus: &Bus) -> Self::FlushFuture<'_>; } -pub trait MessageProducer { +pub trait MessageProducer: Send + Sync { type Message: Message; type StartFuture<'a>: Future> + Send + 'a diff --git a/src/lib.rs b/src/lib.rs index c8bf0c9..a74420b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,4 +11,92 @@ pub mod receiver; pub mod receivers; pub mod type_tag; +pub use bus::Bus; +pub use handler::*; +pub use message::*; + mod wakelist; +#[macro_export] +macro_rules! derive_message_clone { + ($const_name: ident, $struct_name: ty, $name: literal) => { + lazy_static::lazy_static! { + static ref $const_name: $crate::type_tag::TypeTag = $crate::type_tag::TypeTagInfo::parse($name).unwrap().into(); + } + + impl $crate::Message for $struct_name { + fn TYPE_TAG() -> $crate::type_tag::TypeTag + where + Self: Sized, + { + $const_name.clone() + } + + fn type_tag(&self) -> $crate::type_tag::TypeTag { + $const_name.clone() + } + + fn type_layout(&self) -> std::alloc::Layout { + std::alloc::Layout::new::() + } + + fn as_any_ref(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + fn as_any_boxed(self: Box) -> Box { + self + } + + fn as_any_arc(self: std::sync::Arc) -> std::sync::Arc { + self + } + + fn as_shared_ref(&self) -> Option<&dyn $crate::message::SharedMessage> { + None + } + + fn as_shared_mut(&mut self) -> Option<&mut dyn $crate::message::SharedMessage> { + None + } + + fn as_shared_boxed( + self: Box, + ) -> Result, Box> { + Err(self) + } + + fn as_shared_arc( + self: std::sync::Arc, + ) -> Option> { + None + } + + fn try_clone_into(&self, into: &mut dyn $crate::cell::MessageCell) -> bool { + into.into_typed::() + .map(|c| c.put(self.clone())) + .is_ok() + } + + fn try_clone_boxed(&self) -> Option> { + Some(Box::new(self.clone())) + } + + fn is_cloneable(&self) -> bool { + true + } + + fn try_clone(&self) -> Option + where + Self: Sized, + { + Some(self.clone()) + } + } + }; +} + +derive_message_clone!(VOID, (), "void"); diff --git a/src/message.rs b/src/message.rs index b22b604..3ba7e4b 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,7 +1,7 @@ use core::fmt; use std::{alloc::Layout, any::Any, sync::Arc}; -use crate::type_tag::TypeTag; +use crate::{cell::MessageCell, type_tag::TypeTag}; pub trait ErrorMessage: Message {} @@ -24,8 +24,9 @@ pub trait Message: fmt::Debug + Unpin + Send + Sync + 'static { fn as_shared_boxed(self: Box) -> Result, Box>; fn as_shared_arc(self: Arc) -> Option>; - fn try_clone_into(&self, into: &mut dyn Message) -> bool; + fn try_clone_into(&self, into: &mut dyn MessageCell) -> bool; fn try_clone_boxed(&self) -> Option>; + fn is_cloneable(&self) -> bool; fn try_clone(&self) -> Option where diff --git a/src/receiver.rs b/src/receiver.rs index 5be9fc2..ee0be93 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -15,7 +15,7 @@ use crate::{ type_tag::TypeTagQuery, }; -pub trait Receiver { +pub trait Receiver: Send + Sync { fn poll_send( &self, msg: &mut MsgCell, @@ -33,35 +33,35 @@ pub trait Receiver { } pub trait ReceiverEx: Receiver { - type SendFut<'a>: Future> + 'a + type SendFut<'a>: Future> + Send + 'a where Self: 'a; - type RequestFut<'a>: Future> + 'a + type RequestFut<'a>: Future> + Send + 'a where Self: 'a; - type ResultFut<'a>: Future> + 'a + type ResultFut<'a>: Future> + Send + 'a where Self: 'a; - type ProcessFut<'a>: Future> + 'a + type ProcessFut<'a>: Future> + Send + 'a where Self: 'a; - fn try_send(&self, msg: &mut MsgCell, bus: &Bus) -> Result<(), Error>; + fn try_send(&self, msg: &mut MsgCell, bus: &Bus) -> Result; fn send(&self, msg: MsgCell, bus: Bus) -> Self::SendFut<'_>; fn request(&self, msg: MsgCell, bus: Bus) -> Self::RequestFut<'_>; fn process(&self, task: TaskHandler, bus: Bus) -> Self::ProcessFut<'_>; fn result(&self, task: TaskHandler, bus: Bus) -> Self::ResultFut<'_>; } -impl + 'static> ReceiverEx for H { - type SendFut<'a> = impl Future> + 'a; - type RequestFut<'a> = impl Future> + 'a; - type ResultFut<'a> = impl Future> + 'a; - type ProcessFut<'a> = impl Future> + 'a; +impl + Send + Sync + 'static> ReceiverEx for H { + type SendFut<'a> = impl Future> + Send + 'a; + type RequestFut<'a> = impl Future> + Send + 'a; + type ResultFut<'a> = impl Future> + Send + 'a; + type ProcessFut<'a> = impl Future> + Send + 'a; - fn try_send(&self, cell: &mut MsgCell, bus: &Bus) -> Result<(), Error> { + fn try_send(&self, cell: &mut MsgCell, bus: &Bus) -> Result { match self.poll_send(cell, None, bus) { - Poll::Ready(_) => Ok(()), + Poll::Ready(handler) => handler, Poll::Pending => Err(Error::TrySendError), } } diff --git a/src/receivers/mod.rs b/src/receivers/mod.rs index 62cb1c0..bf52132 100644 --- a/src/receivers/mod.rs +++ b/src/receivers/mod.rs @@ -1,10 +1,9 @@ // pub mod limit; // pub mod handle; // pub mod dispatcher; -// pub mod queue; -pub mod spawner; -// pub mod unordered; pub mod producer; +pub mod queue; +pub mod spawner; pub mod wrapper; // pub use queue::*; diff --git a/src/receivers/producer.rs b/src/receivers/producer.rs index 196c0d0..ae7de71 100644 --- a/src/receivers/producer.rs +++ b/src/receivers/producer.rs @@ -19,7 +19,7 @@ use crate::{ }; type SendFuture + 'static> = - impl Future>; + impl Future> + Send; pub struct ProducerWrapper + 'static> { inner: Arc, @@ -320,6 +320,10 @@ mod tests { { Some(Self(self.0)) } + + fn is_cloneable(&self) -> bool { + false + } } struct Test { diff --git a/src/receivers/queue.rs b/src/receivers/queue.rs index 8917777..dbd06f6 100644 --- a/src/receivers/queue.rs +++ b/src/receivers/queue.rs @@ -1,5 +1,6 @@ use std::{ marker::PhantomData, + pin::Pin, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, @@ -7,84 +8,71 @@ use std::{ task::{ready, Context, Poll, Waker}, }; -use arc_swap::{ArcSwapAny, ArcSwapOption}; -use crossbeam::{atomic::AtomicCell, queue::ArrayQueue}; -use futures::task::AtomicWaker; +use crossbeam::queue::ArrayQueue; +use futures::{task::AtomicWaker, Future}; use parking_lot::Mutex; -use sharded_slab::Slab; use crate::{ bus::{Bus, TaskHandler}, cell::{MsgCell, ResultCell}, error::Error, message::Message, - receiver::Receiver, + receiver::{Receiver, ReceiverEx}, + wakelist::WakeList, }; -enum QueueTaskInner { - Vacant, - Sending(MsgCell), - WaitingResult, -} - -struct QueueTask { +struct QueueItem { index: usize, - waker: AtomicWaker, - generation: AtomicU64, - message: Mutex>, + generation: u64, + message: MsgCell, } -impl QueueTask { - fn check_task(&self, handler: &TaskHandler) -> bool { - false +impl QueueItem { + fn new(index: usize, generation: u64, message: MsgCell) -> QueueItem { + Self { + index, + generation, + message, + } } } +type SendFuture + 'static> = + impl Future> + Send; + pub struct Queue + 'static> { - inner: T, - - queue: ArrayQueue, + inner: Arc, free: ArrayQueue, - enqueued: AtomicUsize, - + queue: ArrayQueue>, + wakers: Arc<[AtomicWaker]>, + limit: usize, + wakelist: Mutex, + current: Mutex>>>>, generation_sequence: AtomicU64, - - tasks: Arc<[Arc>]>, - current_task: AtomicUsize, - _m: PhantomData<(M, R, T)>, } impl + 'static> Queue { pub fn new(inner: T, limit: usize) -> Self { let free = ArrayQueue::new(limit); for i in 0..limit { - free.push(i); + let _ = free.push(i); } Self { - inner, + limit, + inner: Arc::new(inner), free, queue: ArrayQueue::new(limit), - enqueued: AtomicUsize::new(0), - generation_sequence: AtomicU64::new(0), - tasks: (0..limit) - .map(|index| { - Arc::new(QueueTask { - index, - waker: AtomicWaker::new(), - generation: AtomicU64::new(0), - message: Mutex::new(QueueTaskInner::Vacant), - }) - }) - .collect(), - current_task: AtomicUsize::new(usize::MAX), - _m: PhantomData::default(), + wakers: (0..limit).map(|_| AtomicWaker::new()).collect(), + wakelist: Mutex::new(WakeList::new()), + current: Mutex::new(Box::pin(None)), + generation_sequence: AtomicU64::new(1), } } - #[inline] - fn update_task_waker(&self, _task: &TaskHandler, _wakerr: &Waker) { - // TODO + #[inline(always)] + fn next_gen(&self) -> u64 { + self.generation_sequence.fetch_add(1, Ordering::Relaxed) } } @@ -95,30 +83,59 @@ impl<'a, M: Message, R: Message, T: Receiver + 'static> Receiver for cx: Option<&mut Context<'_>>, bus: &Bus, ) -> Poll> { - if let Some(index) = self.free.pop() { - if let Ok(_) = self.current_task.compare_exchange( - usize::MAX, - index, - Ordering::Relaxed, - Ordering::Relaxed, - ) { - // fast track - // self.tasks[index].start(msg); - let th = TaskHandler::new(vtable, self.tasks.clone(), index); + // trying fast track + if self.free.is_full() { + if let Some(mut lock) = self.current.try_lock() { + if lock.is_none() { + let inner = self.inner.clone(); + let bus = bus.clone(); - Poll::Ready(Ok(th)) + enum Val { + Task(TaskHandler), + Cell(MsgCell), + } + + let val = if let Ok(task) = inner.try_send(msg, &bus) { + Val::Task(task) + } else { + Val::Cell(msg.take_cell()) + }; + + drop( + unsafe { (&mut *lock).as_mut().get_unchecked_mut() }.replace(async move { + match val { + Val::Task(task) => inner.result(task, bus).await, + Val::Cell(cell) => inner.request(cell, bus).await, + } + }), + ); + + let task = todo!(); + + return Poll::Ready(Ok(task)); + } } } - if current.is_none() { - let Some(index) = self.free.pop() else { - // TODO - // self.send_wakelist.push(); - return Poll::Pending; - }; + // enqueuing the message + if let Some(index) = self.free.pop() { + let generation = self.next_gen(); + assert!(self + .queue + .push(QueueItem { + index, + generation, + message: msg.take_cell(), + }) + .is_ok()); - // .start(msg); - drop(current); + let task = todo!(); + + return Poll::Ready(Ok(task)); + } + + if let Some(cx) = cx { + self.wakelist.lock().push(cx.waker().clone()); } Poll::Pending @@ -126,34 +143,18 @@ impl<'a, M: Message, R: Message, T: Receiver + 'static> Receiver for fn poll_result( &self, - task: &TaskHandler, - _resp: Option<&mut ResultCell>, + task: &mut TaskHandler, + resp: Option<&mut ResultCell>, cx: &mut Context<'_>, bus: &Bus, ) -> Poll> { - let current_task = self.current_task.lock(); - let index = if let Some(hash) = *current_task { - if hash != task.hash() { - self.update_task_waker(task, cx.waker()); - return Poll::Pending; - } + let mut lock = self.current.lock(); + let mb_fut = lock.as_mut(); + if let Some(fut) = mb_fut.as_pin_mut() { + let result = ready!(fut.poll(cx)); + } - task.index() - } else { - if let Some(index) = self.queue.pop() { - index - } else { - return Poll::Ready(Err(Error::TrySendError)); - } - }; - - let entry = self.tasks.get(index).unwrap(); - - // let task = self.tasks.remove(idx) - - let res = ready!(self - .inner - .poll_send(&mut *entry.message.lock(), Some(cx), bus))?; + drop(unsafe { lock.as_mut().get_unchecked_mut() }.take()); Poll::Pending } @@ -241,6 +242,10 @@ mod tests { { Some(Self(self.0)) } + + fn is_cloneable(&self) -> bool { + false + } } struct Test { diff --git a/src/receivers/wrapper.rs b/src/receivers/wrapper.rs index 81987a2..139661b 100644 --- a/src/receivers/wrapper.rs +++ b/src/receivers/wrapper.rs @@ -230,6 +230,10 @@ mod tests { None } + fn is_cloneable(&self) -> bool { + false + } + fn try_clone(&self) -> Option where Self: Sized, diff --git a/src/type_tag.rs b/src/type_tag.rs index 764509c..9e3071a 100644 --- a/src/type_tag.rs +++ b/src/type_tag.rs @@ -32,11 +32,17 @@ impl<'a> AsRef> for TypeTag { impl From> for TypeTag { fn from(info: TypeTagInfo<'static>) -> Self { + Arc::new(info).into() + } +} + +impl From>> for TypeTag { + fn from(info: Arc>) -> Self { let mut hasher = DefaultHasher::new(); info.hash(&mut hasher); Self { - info: Arc::new(info), + info, hash: hasher.finish(), } }