fix flush

This commit is contained in:
Andrey Tkachenko 2021-09-07 19:01:53 +04:00
parent e81a70a197
commit 21e28ffa12
5 changed files with 28 additions and 5 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.9.0"
version = "0.9.1"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -121,6 +121,7 @@ pub trait ReceiverTrait: TypeTagAccept + Send + Sync {
fn is_init_sent(&self) -> bool;
fn is_ready(&self) -> bool;
fn need_flush(&self) -> bool;
fn set_need_flush(&self);
fn try_reserve(&self, tt: &TypeTag) -> Option<Permit>;
fn reserve_notify(&self, tt: &TypeTag) -> Arc<Notify>;
@ -153,6 +154,7 @@ pub enum Action {
pub enum Event<M, E: StdSyncSendError> {
Response(u64, Result<M, Error<(), E>>),
Synchronized(Result<(), Error<(), E>>),
Error(E),
InitFailed(Error<(), E>),
Stats(Stats),
Flushed,
@ -192,6 +194,7 @@ where
let event = poll_fn(move |ctx| this.inner.poll_events(ctx, &bus)).await;
match event {
Event::Error(err) => error!("Batch Error: {}", err),
Event::Pause => self.context.ready_flag.store(false, Ordering::SeqCst),
Event::Ready => {
self.context.ready.notify_waiters();
@ -207,14 +210,17 @@ where
self.context.closed.notify_waiters();
break;
}
Event::Flushed => self.context.flushed.notify_waiters(),
Event::Flushed => {
self.context.need_flush.store(false, Ordering::SeqCst);
self.context.flushed.notify_waiters();
}
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
self.context.processing.fetch_sub(1, Ordering::SeqCst);
self.context.response.notify_one();
if let Err(err) = self.response(mid, resp) {
error!("Response error: {}", err);
error!("Response Error: {}", err);
}
}
@ -328,7 +334,7 @@ where
S: SendUntypedReceiver + SendTypedReceiver<M> + ReciveTypedReceiver<R, E> + 'static,
{
fn name(&self) -> &str {
std::any::type_name::<Self>()
std::any::type_name::<S>()
}
fn typed(&self) -> Option<AnyReceiver<'_>> {
@ -372,6 +378,10 @@ where
Ok(SendUntypedReceiver::send(&self.inner, action, bus)?)
}
fn set_need_flush(&self) {
self.context.need_flush.store(true, Ordering::SeqCst);
}
fn close_notify(&self) -> &Notify {
&self.context.closed
}
@ -758,6 +768,7 @@ impl Receiver {
};
permit.fuse = true;
self.inner.set_need_flush();
res
}
@ -781,6 +792,7 @@ impl Receiver {
.map_err(|err| err.map_msg(|b| *b.as_any_boxed().downcast::<M>().unwrap()))
.map(|_| ())
};
self.inner.set_need_flush();
res
}
@ -794,6 +806,7 @@ impl Receiver {
mut permit: Permit,
) -> Result<(), Error<Box<dyn Message>>> {
let res = self.inner.send_boxed(mid, msg, bus);
self.inner.set_need_flush();
permit.fuse = true;
res
}

View File

@ -155,6 +155,7 @@ macro_rules! buffer_unordered_batch_poller_macro {
))
.ok();
}
stx.send(Event::Error(er)).ok();
}
},
Poll::Ready(None) => break,

View File

@ -86,6 +86,7 @@ macro_rules! batch_synchronized_poller_macro {
stx.send(Event::Response(mid, Err(Error::Other(er.clone()))))
.ok();
}
stx.send(Event::Error(er)).ok();
}
},
}

View File

@ -124,6 +124,10 @@ where
self.context.need_flush.load(Ordering::SeqCst)
}
fn set_need_flush(&self) {
self.context.need_flush.store(true, Ordering::SeqCst);
}
fn stats(&self) -> Stats {
unimplemented!()
}
@ -219,6 +223,7 @@ where
let event = poll_fn(move |ctx| this.inner.poll_events(ctx, &bus)).await;
match event {
Event::Error(err) => error!("Batch Error: {}", err),
Event::Pause => self.context.ready_flag.store(false, Ordering::SeqCst),
Event::Ready => {
self.context.ready.notify_waiters();
@ -234,7 +239,10 @@ where
self.context.closed.notify_waiters();
break;
}
Event::Flushed => self.context.flushed.notify_waiters(),
Event::Flushed => {
self.context.need_flush.store(true, Ordering::SeqCst);
self.context.flushed.notify_waiters()
}
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
let tt = if let Ok(bm) = &resp {