Update registrator

This commit is contained in:
Andrey Tkachenko 2023-04-19 15:04:00 +04:00
parent 71f911a9b7
commit d8cb4fedf6
8 changed files with 164 additions and 138 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.11.0"
version = "0.12.1"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -28,7 +28,11 @@ impl MessageProducer<StartMsg> for Test {
type CloseFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
fn init(&self, _bus: &Bus) -> Self::InitFuture<'_> {
async move { Ok(()) }
async move {
println!("Message producer initialized !!!");
Ok(())
}
}
fn start(&self, msg: &mut MsgCell<StartMsg>, _: &Bus) -> Self::StartFuture<'_> {
@ -97,10 +101,12 @@ impl Handler<Msg> for Test {
async fn run() -> Result<(), Error> {
let bus = Bus::new();
let test = Arc::new(Test {});
bus.register_producer(test.clone(), MaskMatch::all());
bus.register(test, MaskMatch::all());
bus.init().await;
bus.register(Test {})
.handler(MaskMatch::all())
.await?
.producer(MaskMatch::all())
.await?;
bus.send(StartMsg(0)).await?;
bus.send(StartMsg(100)).await?;

View File

@ -52,17 +52,17 @@ impl Handler<Msg> for Test {
async fn run() -> Result<(), Error> {
let bus = Bus::new();
bus.register(Test { inner: 12 })
.handler(MaskMatch::all())
.await?;
let wrapper = HandlerWrapper::new(Arc::new(Test { inner: 12 }));
// bus.register(wrapper, MaskMatch::all());
let res: () = bus.request(Msg(13)).await?.result().await?;
println!("request result got {:?}", res);
// let res: () = bus.request(Msg(13)).await?.result().await?;
// println!("request result got {:?}", res);
bus.send(Msg(12)).await?;
// bus.send(Msg(12)).await?;
// bus.close().await;
// bus.wait().await;
bus.close().await;
bus.wait().await;
Ok(())
}

View File

@ -43,6 +43,57 @@ pub struct BusContext {
name: String,
}
pub struct Registrator<'a, H: Send + Sync + 'static> {
bus: &'a Bus,
r: Arc<H>,
}
impl<'a, H: Send + Sync + 'static> Registrator<'a, H> {
pub async fn handler<M: Message, R: Message>(
self,
mask: MaskMatch,
) -> Result<Registrator<'a, H>, Error>
where
H: IntoAsyncReceiver<M, R> + Send + Sync + 'static,
{
self.bus
.inner
.register(
M::TYPE_TAG(),
R::TYPE_TAG(),
self.r.clone().into_async_receiver().into_abstract_arc(),
mask,
false,
self.bus,
)
.await?;
Ok(self)
}
pub async fn producer<M: Message, R: Message>(
self,
mask: MaskMatch,
) -> Result<Registrator<'a, H>, Error>
where
H: IntoAsyncProducer<M, R> + Send + Sync + 'static,
{
self.bus
.inner
.register(
M::TYPE_TAG(),
R::TYPE_TAG(),
self.r.clone().into_async_producer().into_abstract_arc(),
mask,
true,
self.bus,
)
.await?;
Ok(self)
}
}
#[derive(Clone)]
pub struct Bus {
inner: Arc<BusInner>,
@ -66,6 +117,14 @@ impl Bus {
self.inner.is_closed()
}
#[inline]
pub fn register<H: Send + Sync + 'static>(&self, r: H) -> Registrator<H> {
Registrator {
bus: self,
r: Arc::new(r),
}
}
#[inline]
pub fn try_send<M: Message>(&self, msg: M) -> Result<(), Error> {
let mut msg = MsgCell::new(msg);
@ -76,11 +135,6 @@ impl Bus {
Ok(())
}
#[inline]
pub async fn init(&self) {
self.inner.init(self).await
}
#[inline]
pub async fn send<M: Message>(&self, msg: M) -> Result<(), Error> {
let mut msg = MsgCell::new(msg);
@ -95,46 +149,6 @@ impl Bus {
futures::executor::block_on(self.send(msg))
}
#[inline]
pub fn register<M: Message, R: Message, H: IntoAsyncReceiver<M, R> + Send + Sync + 'static>(
&self,
r: H,
mask: MaskMatch,
) {
self.inner.register(
M::TYPE_TAG(),
R::TYPE_TAG(),
r.into_async_receiver().into_abstract_arc(),
mask,
false,
)
}
#[inline]
pub fn register_producer<
M: Message,
R: Message,
P: IntoAsyncProducer<M, R> + Send + Sync + 'static,
>(
&self,
r: P,
mask: MaskMatch,
) {
log::info!(
"reg producer start: {}, msg: {}",
M::TYPE_TAG(),
R::TYPE_TAG()
);
self.inner.register(
M::TYPE_TAG(),
R::TYPE_TAG(),
r.into_async_producer().into_abstract_arc(),
mask,
true,
)
}
#[inline]
pub async fn request<M: Message, R: Message>(
&self,
@ -173,7 +187,7 @@ impl Bus {
}
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MaskMatch {
pos: u64,
neg: u64,
@ -227,6 +241,20 @@ struct BusReceiver {
mask: MaskMatch,
}
impl std::hash::Hash for BusReceiver {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_usize(Arc::as_ptr(&self.inner) as *const () as _);
}
}
impl PartialEq for BusReceiver {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}
impl Eq for BusReceiver {}
struct BusReceivers {
is_producer: bool,
inner: SegVec<BusReceiver>,
@ -253,6 +281,15 @@ pub struct BusInner {
}
impl BusInner {
pub(crate) async fn close(&self) {
self.processing.close();
}
#[inline]
pub(crate) fn is_closed(&self) -> bool {
self.processing.is_closed()
}
pub(crate) fn new() -> Self {
Self {
state: AtomicU8::new(0),
@ -261,19 +298,17 @@ impl BusInner {
}
}
#[inline]
pub(crate) fn is_closed(&self) -> bool {
self.processing.is_closed()
}
pub(crate) fn register(
pub(crate) async fn register(
&self,
mtt: TypeTag,
rtt: TypeTag,
receiver: Arc<dyn AbstractReceiver>,
mask: MaskMatch,
is_producer: bool,
) {
bus: &Bus,
) -> Result<(), Error> {
receiver.initialize(bus).await?;
self.receivers
.entry((mtt.hash, rtt.hash))
.or_insert_with(|| BusReceivers::new(is_producer))
@ -283,52 +318,38 @@ impl BusInner {
.entry((mtt.hash, 0))
.or_insert_with(|| BusReceivers::new(is_producer))
.add(mask, receiver.clone());
Ok(())
}
pub(crate) async fn init(&self, bus: &Bus) {
let mut vec = Vec::new();
for recvs in self.receivers.iter() {
for recv in recvs.inner.iter().cloned() {
vec.push(async move { (recv.inner.initialize(bus).await, recv) });
}
}
for res in futures::future::join_all(vec.into_iter()).await {
println!("init {:?}", res.0);
}
}
pub(crate) fn try_send(
pub(crate) async fn request<M: Message, R: Message>(
&self,
msg: &mut dyn MessageCell,
options: SendOptions,
msg: M,
bus: &Bus,
) -> Result<(), Error> {
let tt = msg.type_tag();
) -> Result<RequestHandler<M, R>, Error> {
let mtt = M::TYPE_TAG();
let rtt = R::TYPE_TAG();
let receivers = self
.receivers
.get(&(tt.hash, 0))
.ok_or_else(|| ErrorKind::NoSuchReceiver(tt, None))?;
.get(&(mtt.hash, rtt.hash))
.ok_or_else(|| ErrorKind::NoSuchReceiver(mtt.clone(), Some(rtt.clone())))?;
for receiver in receivers.inner.iter() {
if !receiver.mask.test(options.mask) {
continue;
}
if let Some(receiver) = receivers.inner.iter().next() {
let task = receiver
.inner
.send(&mut MsgCell::new(msg), bus.clone())
.await?;
match receiver.inner.try_send_dyn(msg, bus) {
Ok(task) => {
let receiver = receiver.clone();
self.processing.push(task, receiver.inner, false);
}
Err(err) => {
println!("send failed {}", err);
}
}
Ok(RequestHandler {
task,
receiver: receiver.clone(),
bus: bus.clone(),
_m: Default::default(),
})
} else {
Err(ErrorKind::NoSuchReceiver(mtt, Some(rtt)).into())
}
Ok(())
}
pub(crate) async fn send(
@ -365,34 +386,30 @@ impl BusInner {
Ok(())
}
pub(crate) async fn request<M: Message, R: Message>(
pub(crate) fn try_send(
&self,
msg: M,
msg: &mut dyn MessageCell,
options: SendOptions,
bus: &Bus,
) -> Result<RequestHandler<M, R>, Error> {
let mtt = M::TYPE_TAG();
let rtt = R::TYPE_TAG();
) -> Result<(), Error> {
let tt = msg.type_tag();
let receivers = self
.receivers
.get(&(mtt.hash, rtt.hash))
.ok_or_else(|| ErrorKind::NoSuchReceiver(mtt.clone(), Some(rtt.clone())))?;
.get(&(tt.hash, 0))
.ok_or_else(|| ErrorKind::NoSuchReceiver(tt, None))?;
if let Some(receiver) = receivers.inner.iter().next() {
let task = receiver
.inner
.send(&mut MsgCell::new(msg), bus.clone())
.await?;
for receiver in receivers.inner.iter() {
if !receiver.mask.test(options.mask) {
continue;
}
Ok(RequestHandler {
task,
receiver: receiver.clone(),
bus: bus.clone(),
_m: Default::default(),
})
} else {
Err(ErrorKind::NoSuchReceiver(mtt, Some(rtt)).into())
let task = receiver.inner.try_send_dyn(msg, bus)?;
let receiver = receiver.clone();
self.processing.push(task, receiver.inner, false);
}
Ok(())
}
pub(crate) async fn wait(&self, bus: &Bus) {
@ -400,10 +417,6 @@ impl BusInner {
poll_fn(move |cx| pool.poll(cx, bus)).await
}
pub(crate) async fn close(&self) {
self.processing.close();
}
}
pub struct RequestHandler<M: Message, R: Message> {

View File

@ -1,4 +1,4 @@
#![feature(type_alias_impl_trait, impl_trait_in_assoc_type)]
#![feature(type_alias_impl_trait)]
pub mod bus;
pub mod cell;

View File

@ -70,12 +70,15 @@ pub trait ReceiverEx<M: Message, R: Message>: Receiver<M, R> {
type SendFut<'a>: Future<Output = Result<TaskHandler, Error>> + Send + 'a
where
Self: 'a;
type RequestFut<'a>: Future<Output = Result<R, Error>> + Send + 'a
where
Self: 'a;
type ResultFut<'a>: Future<Output = Result<R, Error>> + Send + 'a
where
Self: 'a;
type ProcessFut<'a>: Future<Output = Result<(), Error>> + Send + 'a
where
Self: 'a;

View File

@ -22,7 +22,7 @@ pub trait IntoAsyncProducer<M: Message, R: Message>
where
Self: crate::MessageProducer<M, Message = R> + Send + Sync + 'static,
{
fn into_async_producer(self) -> ProducerWrapper<M, Self>
fn into_async_producer(self: Arc<Self>) -> ProducerWrapper<M, Self>
where
Self: Sized + 'static;
}
@ -30,11 +30,11 @@ where
impl<M: Message, H: MessageProducer<M> + Send + Sync + 'static> IntoAsyncProducer<M, H::Message>
for H
{
fn into_async_producer(self) -> ProducerWrapper<M, H>
fn into_async_producer(self: Arc<Self>) -> ProducerWrapper<M, H>
where
Self: Sized + 'static,
{
ProducerWrapper::new(Arc::new(self))
ProducerWrapper::new(self)
}
}
@ -171,13 +171,17 @@ impl<M: Message, T: MessageProducer<M>> ProducerWrapper<M, T> {
impl<M: Message, T: MessageProducer<M> + 'static> Receiver<M, T::Message>
for ProducerWrapper<M, T>
{
type InitFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
type InitFuture<'a> = T::InitFuture<'a>;
type CloseFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
type FlushFuture<'a> = impl Future<Output = Result<(), Error>> + 'a;
#[inline]
fn close(&self) -> Self::CloseFuture<'_> {
async move { Ok(()) }
async move {
// self.producers.iter()
Ok(())
}
}
#[inline]
@ -186,8 +190,8 @@ impl<M: Message, T: MessageProducer<M> + 'static> Receiver<M, T::Message>
}
#[inline]
fn init(&self, _bus: &Bus) -> Self::InitFuture<'_> {
async move { Ok(()) }
fn init(&self, bus: &Bus) -> Self::InitFuture<'_> {
self.inner.init(bus)
}
fn poll_send(

View File

@ -22,7 +22,7 @@ pub trait IntoAsyncReceiver<M: Message, R: Message>
where
Self: Handler<M, Response = R> + Send + Sync + 'static,
{
fn into_async_receiver(self) -> HandlerWrapper<M, Self>
fn into_async_receiver(self: Arc<Self>) -> HandlerWrapper<M, Self>
where
Self: Sized + 'static;
}
@ -30,11 +30,11 @@ where
impl<M: Message, R: Message, H: Handler<M, Response = R> + Send + Sync + 'static>
IntoAsyncReceiver<M, R> for H
{
fn into_async_receiver(self) -> HandlerWrapper<M, H>
fn into_async_receiver(self: Arc<Self>) -> HandlerWrapper<M, H>
where
Self: Sized + 'static,
{
HandlerWrapper::new(Arc::new(self))
HandlerWrapper::new(self)
}
}