Rework idle detection

This commit is contained in:
Andrey Tkachenko 2021-12-16 16:12:28 +04:00
parent b06eac6504
commit 73aeefb46f
10 changed files with 80 additions and 182 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.9.12"
version = "0.9.13"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -88,6 +88,7 @@ impl<T, F, P, B> RegisterEntry<UnsyncEntry, T, F, P, B> {
true,
inner,
);
let poller2 = receiver.start_polling();
self.receivers.insert(receiver);
self.pollers.push(poller(self.item.clone()));

View File

@ -165,6 +165,7 @@ impl Bus {
let fuse_count = 32i32;
let mut breaked = false;
let mut iters = 0usize;
for _ in 0..fuse_count {
iters += 1;
let mut flushed = false;
@ -264,15 +265,12 @@ impl Bus {
}
pub async fn sync_all(&self) {
let _handle = self.inner.maintain.lock().await;
for r in self.inner.receivers.iter() {
r.sync(self).await;
}
}
pub async fn sync<M: Message>(&self) {
let _handle = self.inner.maintain.lock().await;
let receivers =
self.select_receivers(M::type_tag_(), Default::default(), None, None, false);
@ -282,8 +280,6 @@ impl Bus {
}
pub async fn sync2<M1: Message, M2: Message>(&self) {
let _handle = self.inner.maintain.lock().await;
let receivers1 =
self.select_receivers(M1::type_tag_(), Default::default(), None, None, false);
@ -295,21 +291,68 @@ impl Bus {
}
}
pub async fn idle_all(&self) {
for r in self.inner.receivers.iter() {
r.flush(self).await;
r.idle().await;
}
}
pub async fn idle<M: Message>(&self) {
let receivers =
self.select_receivers(M::type_tag_(), Default::default(), None, None, false);
for r in receivers {
r.flush(self).await;
r.idle().await;
}
}
pub async fn idle2<M1: Message, M2: Message>(&self) {
let receivers1 =
self.select_receivers(M1::type_tag_(), Default::default(), None, None, false);
let receivers2 =
self.select_receivers(M2::type_tag_(), Default::default(), None, None, false);
for r in receivers1.chain(receivers2) {
r.flush(self).await;
r.idle().await;
}
}
#[inline]
pub async fn flush_and_sync_all(&self) {
pub async fn flush_and_sync_all(&self, force: bool) {
if !force {
self.idle_all().await;
}
println!("flushing all begin");
self.flush_all().await;
self.sync_all().await;
}
#[inline]
pub async fn flush_and_sync<M: Message>(&self) {
pub async fn flush_and_sync<M: Message>(&self, force: bool) {
if !force {
self.idle::<M>().await;
}
println!("flushing 1 begin");
self.flush::<M>().await;
self.sync::<M>().await;
}
#[inline]
pub async fn flush_and_sync2<M1: Message, M2: Message>(&self) {
pub async fn flush_and_sync2<M1: Message, M2: Message>(&self, force: bool) {
if !force {
self.idle2::<M1, M2>().await;
}
println!("flushing 2 begin");
self.flush2::<M1, M2>().await;
self.sync2::<M1, M2>().await;
}
fn try_reserve(&self, tt: &TypeTag, rs: &[Receiver]) -> Option<SmallVec<[Permit; 32]>> {
let mut permits = SmallVec::<[Permit; 32]>::new();

View File

@ -168,8 +168,6 @@ pub enum Event<M, E: StdSyncSendError> {
Exited,
Ready,
Pause,
IdleBegin,
IdleEnd,
}
impl<M, E: StdSyncSendError> Event<M, E> {
@ -185,8 +183,6 @@ impl<M, E: StdSyncSendError> Event<M, E> {
Event::Exited => Event::Exited,
Event::Ready => Event::Ready,
Event::Pause => Event::Pause,
Event::IdleBegin => Event::IdleBegin,
Event::IdleEnd => Event::IdleEnd,
}
}
}
@ -250,7 +246,11 @@ where
}
Event::Synchronized(_res) => self.context.synchronized.notify_waiters(),
Event::Response(mid, resp) => {
self.context.processing.fetch_sub(1, Ordering::SeqCst);
let prev_value = self.context.processing.fetch_sub(1, Ordering::SeqCst);
if prev_value == 1 { // last task completes
self.context.idle.notify_waiters();
}
self.context.response.notify_one();
match self.response(mid, resp) {
@ -275,15 +275,6 @@ where
}
}
Event::IdleBegin => {
self.context.idling_flag.store(true, Ordering::SeqCst);
self.context.idle.notify_waiters();
}
Event::IdleEnd => {
self.context.idling_flag.store(false, Ordering::SeqCst);
}
_ => unimplemented!(),
}
}
@ -490,7 +481,7 @@ where
}
fn is_idling(&self) -> bool {
self.context.idling_flag.load(Ordering::SeqCst)
self.context.processing.load(Ordering::SeqCst) == 0
}
fn need_flush(&self) -> bool {
@ -715,7 +706,6 @@ struct ReceiverContext {
processing: AtomicI64,
need_flush: AtomicBool,
ready_flag: AtomicBool,
idling_flag: AtomicBool,
flushed: Notify,
synchronized: Notify,
closed: Notify,
@ -784,7 +774,6 @@ impl Receiver {
processing: AtomicI64::new(0),
need_flush: AtomicBool::new(false),
ready_flag: AtomicBool::new(false),
idling_flag: AtomicBool::new(true),
init_sent: AtomicBool::new(false),
flushed: Notify::new(),
synchronized: Notify::new(),
@ -1087,8 +1076,6 @@ impl Receiver {
#[inline]
pub async fn flush(&self, bus: &Bus) {
self.idle().await;
let notify = self.inner.flush_notify().notified();
if self.inner.send_action(bus, Action::Flush).is_ok() {

View File

@ -46,45 +46,10 @@ macro_rules! buffer_unordered_poller_macro {
R: Message,
E: StdSyncSendError,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<$t>().unwrap();
let semaphore = Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel));
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) => {
#[allow(clippy::redundant_closure_call)]

View File

@ -51,47 +51,13 @@ macro_rules! buffer_unordered_batch_poller_macro {
M: Message,
R: Message,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<$t>().unwrap();
let semaphore = Arc::new(tokio::sync::Semaphore::new(cfg.max_parallel));
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let semaphore = semaphore.clone();
@ -109,8 +75,14 @@ macro_rules! buffer_unordered_batch_poller_macro {
let buffer_clone = buffer.drain(..).collect();
#[allow(clippy::redundant_closure_call)]
let _ =
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
let _ = ($st1)(
buffer_mid_clone,
buffer_clone,
bus,
ut,
task_permit,
stx,
);
}
}
Request::Action(Action::Init(..)) => {
@ -128,8 +100,14 @@ macro_rules! buffer_unordered_batch_poller_macro {
let task_permit = semaphore.clone().acquire_owned().await;
#[allow(clippy::redundant_closure_call)]
let _ =
($st1)(buffer_mid_clone, buffer_clone, bus, ut, task_permit, stx);
let _ = ($st1)(
buffer_mid_clone,
buffer_clone,
bus,
ut,
task_permit,
stx,
);
}
let _ = semaphore.acquire_many(cfg.max_parallel as _).await;

View File

@ -48,46 +48,12 @@ macro_rules! batch_synchronized_poller_macro {
M: Message,
R: Message,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut buffer_mid = Vec::with_capacity(cfg.batch_size);
let mut buffer = Vec::with_capacity(cfg.batch_size);
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
let bus = bus.clone();
let ut = ut.clone();
let stx = stx.clone();

View File

@ -38,42 +38,9 @@ macro_rules! synchronized_poller_macro {
M: Message,
R: Message,
{
use futures::{future, pin_mut, select, FutureExt};
let ut = ut.downcast::<Mutex<T>>().unwrap();
let mut idling = true;
loop {
let wait_fut = async move {
if idling {
let () = future::pending().await;
} else {
let _ = tokio::time::sleep(std::time::Duration::from_millis(100))
.fuse()
.await;
}
};
pin_mut!(wait_fut);
let msg = select! {
m = rx.recv().fuse() => if let Some(msg) = m {
msg
} else {
break;
},
_ = wait_fut.fuse() => {
idling = true;
stx.send(Event::IdleBegin).unwrap();
continue;
}
};
if idling {
stx.send(Event::IdleEnd).unwrap();
}
idling = false;
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) =>
{

View File

@ -311,15 +311,6 @@ where
}
}
}
Event::IdleBegin => {
self.context.idling_flag.store(true, Ordering::SeqCst);
self.context.idle.notify_waiters();
}
Event::IdleEnd => {
self.context.idling_flag.store(false, Ordering::SeqCst);
}
_ => unimplemented!(),
}
}

View File

@ -79,7 +79,7 @@ async fn test_sync() {
b.send(MsgU16(11u16)).await.unwrap();
b.send(MsgU32(32u32)).await.unwrap();
b.flush_and_sync_all().await;
b.flush_and_sync_all(false).await;
b.close().await;
poller.await;
}