cleanup receivers

This commit is contained in:
Andrey Tkachenko 2021-09-16 12:09:09 +04:00
parent 81e5376fe2
commit ac12bb08ab
15 changed files with 126 additions and 194 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "messagebus"
version = "0.9.3"
version = "0.9.4"
authors = ["Andrey Tkachenko <andrey@aidev.ru>"]
repository = "https://github.com/andreytkachenko/messagebus.git"
keywords = ["futures", "async", "tokio", "message", "bus"]

View File

@ -118,7 +118,6 @@ impl Bus {
pub async fn close(&self) {
let _handle = self.inner.maintain.lock().await;
self.inner.closed.store(true, Ordering::SeqCst);
for (_, rs) in &self.inner.receivers {
@ -167,11 +166,12 @@ impl Bus {
}
pub async fn flush_and_sync(&self) {
self.flush().await;
let _handle = self.inner.maintain.lock().await;
for (_, rs) in &self.inner.receivers {
for r in rs {
let err = tokio::time::timeout(Duration::from_secs(60), r.sync(self)).await;
let err = tokio::time::timeout(Duration::from_secs(30), r.sync(self)).await;
if let Err(err) = err {
error!("Sync timeout on {}: {}", r.name(), err);

View File

@ -20,12 +20,22 @@ use super::{BufferUnorderedConfig, BufferUnorderedStats};
use futures::Future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_poller_macro!(
T,
AsyncHandler,
|msg, bus, ut: Arc<T>| async move { ut.handle(msg, &bus).await },
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>, task_permit, flush_permit| {
tokio::spawn(async move {
let resp = ut.handle(msg, &bus).await;
drop(task_permit);
drop(flush_permit);
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
})
},
|bus, ut: Arc<T>| { async move { ut.sync(&bus).await } }
);

View File

@ -67,22 +67,13 @@ macro_rules! buffer_unordered_poller_macro {
Request::Request(mid, msg, _req) => {
let bus = bus.clone();
let ut = ut.clone();
let state = state.clone();
let stx = stx.clone();
let state = state.clone();
let task_permit = state.semaphore.acquire_owned().await;
let flush_permit = state.flush_lock.read_owned().await;
let task_permit = state.semaphore.acquire_owned().await;
let _ = tokio::spawn(async move {
let resp = ($st1)(msg, bus, ut)
.await;
drop(task_permit);
drop(flush_permit);
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
});
let _ = ($st1)(mid, msg, bus, ut, stx, task_permit, flush_permit);
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
Request::Action(Action::Close) => { rx.close(); }
@ -95,6 +86,7 @@ macro_rules! buffer_unordered_poller_macro {
let lock = state.flush_lock.write().await;
let resp = ($st2)(bus.clone(), ut.clone()).await;
drop(lock);
stx.send(Event::Synchronized(resp.map_err(Error::Other)))
.unwrap();
}

View File

@ -19,16 +19,21 @@ use crate::{
use futures::Future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_poller_macro!(
T,
Handler,
|msg, bus, ut: Arc<T>| async move {
tokio::task::spawn_blocking(move || ut.handle(msg, &bus))
.await
.unwrap()
},
|mid, msg, bus, ut: Arc<T>, stx: UnboundedSender<_>, task_permit, flush_permit|
tokio::task::spawn_blocking(move || {
let resp = ut.handle(msg, &bus);
drop(task_permit);
drop(flush_permit);
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
}),
|bus, ut: Arc<T>| async move {
tokio::task::spawn_blocking(move || ut.sync(&bus))
.await

View File

@ -19,12 +19,21 @@ use crate::{
use super::{BufferUnorderedBatchedConfig, BufferUnorderedBatchedStats};
use futures::Future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_batch_poller_macro!(
T,
AsyncBatchHandler,
|msgs, bus, ut: Arc<T>| async move { ut.handle(msgs, &bus).await },
|mids: Vec<_>, msgs, bus, ut: Arc<T>, flush_permit, task_permit, stx: UnboundedSender<_>| {
tokio::spawn(async move {
let resp = ut.handle(msgs, &bus).await;
drop(task_permit);
drop(flush_permit);
crate::process_batch_result!(resp, mids, stx);
})
},
|bus, ut: Arc<T>| { async move { ut.sync(&bus).await } }
);

View File

@ -88,41 +88,7 @@ macro_rules! buffer_unordered_batch_poller_macro {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = tokio::spawn(async move {
let resp = ($st1)(buffer_clone, bus, ut).await;
drop(task_permit);
drop(flush_permit);
let mids = buffer_mid_clone.into_iter();
match resp {
Ok(re) => {
let mut mids = mids.into_iter();
let mut re = re.into_iter();
while let Some((mid, _req)) = mids.next() {
if let Some(r) = re.next() {
stx.send(Event::Response(mid, Ok(r)))
.unwrap();
} else {
stx.send(Event::Response(mid, Err(Error::NoResponse)))
.unwrap();
}
}
}
Err(er) => {
for (mid, _req) in mids {
stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
)).unwrap();
}
stx.send(Event::Error(er)).unwrap();
}
}
});
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, flush_permit, task_permit, stx);
}
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
@ -131,47 +97,13 @@ macro_rules! buffer_unordered_batch_poller_macro {
let stx_clone = stx.clone();
if !buffer_mid.is_empty() {
let task_permit = state.semaphore.acquire_owned().await;
let flush_permit = state.flush_lock.clone().read_owned().await;
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = tokio::spawn(async move {
let resp = ($st1)(buffer_clone, bus, ut).await;
drop(task_permit);
drop(flush_permit);
let flush_permit = state.flush_lock.clone().read_owned().await;
let task_permit = state.semaphore.acquire_owned().await;
let mids = buffer_mid_clone.into_iter();
match resp {
Ok(re) => {
let mut mids = mids.into_iter();
let mut re = re.into_iter();
while let Some((mid, _req)) = mids.next() {
if let Some(r) = re.next() {
stx.send(Event::Response(mid, Ok(r)))
.unwrap();
} else {
stx.send(Event::Response(mid, Err(Error::NoResponse)))
.unwrap();
}
}
}
Err(er) => {
for (mid, _req) in mids {
stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
)).unwrap();
}
stx.send(Event::Error(er)).unwrap();
}
}
});
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, flush_permit, task_permit, stx);
}
state.flush_lock.write().await;
@ -191,6 +123,5 @@ macro_rules! buffer_unordered_batch_poller_macro {
}
}
}
};
}

View File

@ -19,15 +19,20 @@ use crate::{
use futures::Future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, UnboundedSender};
buffer_unordered_batch_poller_macro!(
T,
BatchHandler,
|msgs, bus, ut: Arc<T>| async move {
tokio::task::spawn_blocking(move || ut.handle(msgs, &bus))
.await
.unwrap()
|mids: Vec<_>, msgs, bus, ut: Arc<T>, flush_permit, task_permit, stx: UnboundedSender<_>| {
tokio::task::spawn_blocking(move || {
let resp = ut.handle(msgs, &bus);
drop(task_permit);
drop(flush_permit);
crate::process_batch_result!(resp, mids, stx);
})
},
|bus, ut: Arc<T>| {
async move {

View File

@ -15,6 +15,41 @@ pub use synchronize_batched::{
use crate::receiver::Action;
#[macro_export]
macro_rules! process_batch_result {
($resp: expr, $mids: expr, $stx: expr) => {
let mids = $mids;
match $resp {
Ok(re) => {
let mut mids = mids.into_iter();
let mut re = re.into_iter();
while let Some((mid, _req)) = mids.next() {
if let Some(r) = re.next() {
$stx.send(Event::Response(mid, Ok(r)))
.unwrap();
} else {
$stx.send(Event::Response(mid, Err(Error::NoResponse)))
.unwrap();
}
}
}
Err(er) => {
for (mid, _req) in mids {
$stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
)).unwrap();
}
$stx.send(Event::Error(er)).unwrap();
}
}
};
}
#[derive(Debug)]
pub(crate) enum Request<M> {
Action(Action),

View File

@ -17,13 +17,17 @@ use futures::Future;
use super::SynchronizedBatchedConfig;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
batch_synchronized_poller_macro! {
T,
AsyncBatchSynchronizedHandler,
|msgs, bus, ut: Arc<Mutex<T>>| async move {
ut.lock().await.handle(msgs, &bus).await
|mids: Vec<_>, msgs, bus, ut: Arc<Mutex<T>>, stx: UnboundedSender<_>| {
tokio::spawn(async move {
let resp = ut.lock().await.handle(msgs, &bus).await;
crate::process_batch_result!(resp, mids, stx);
})
},
|bus, ut: Arc<Mutex<T>>| { async move { ut.lock().await.sync(&bus).await } }
}

View File

@ -69,37 +69,7 @@ macro_rules! batch_synchronized_poller_macro {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
tokio::spawn(async move {
let resp = ($st1)(buffer_clone, bus, ut).await;
let mids = buffer_mid_clone.into_iter();
match resp {
Ok(re) => {
let mut mids = mids.into_iter();
let mut re = re.into_iter();
while let Some((mid, _req)) = mids.next() {
if let Some(r) = re.next() {
stx.send(Event::Response(mid, Ok(r)))
.unwrap();
} else {
stx.send(Event::Response(mid, Err(Error::NoResponse)))
.unwrap();
}
}
}
Err(er) => {
for (mid, _req) in mids {
stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
)).unwrap();
}
stx.send(Event::Error(er)).unwrap();
}
}
}).await.unwrap();
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
}
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
@ -111,37 +81,7 @@ macro_rules! batch_synchronized_poller_macro {
let buffer_mid_clone = buffer_mid.drain(..).collect::<Vec<_>>();
let buffer_clone = buffer.drain(..).collect();
let _ = tokio::spawn(async move {
let resp = ($st1)(buffer_clone, bus, ut).await;
let mids = buffer_mid_clone.into_iter();
match resp {
Ok(re) => {
let mut mids = mids.into_iter();
let mut re = re.into_iter();
while let Some((mid, _req)) = mids.next() {
if let Some(r) = re.next() {
stx.send(Event::Response(mid, Ok(r)))
.unwrap();
} else {
stx.send(Event::Response(mid, Err(Error::NoResponse)))
.unwrap();
}
}
}
Err(er) => {
for (mid, _req) in mids {
stx.send(Event::Response(
mid,
Err(Error::Other(er.clone())),
)).unwrap();
}
stx.send(Event::Error(er)).unwrap();
}
}
});
let _ = ($st1)(buffer_mid_clone, buffer_clone, bus, ut, stx);
}
stx_clone.send(Event::Flushed).unwrap();

View File

@ -15,15 +15,17 @@ use crate::{
use super::SynchronizedBatchedConfig;
use futures::{executor::block_on, Future};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
batch_synchronized_poller_macro! {
T,
BatchSynchronizedHandler,
|msgs, bus, ut: Arc<Mutex<T>>| async move {
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msgs, &bus))
.await
.unwrap()
|mids: Vec<_>, msgs, bus, ut: Arc<Mutex<T>>, stx: UnboundedSender<_>| {
tokio::task::spawn_blocking(move || {
let resp = block_on(ut.lock()).handle(msgs, &bus);
crate::process_batch_result!(resp, mids, stx);
})
},
|bus, ut: Arc<Mutex<T>>| async move {
tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus))

View File

@ -15,14 +15,20 @@ use crate::{
receivers::Request,
AsyncSynchronizedHandler, Bus, Message, Untyped,
};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
synchronized_poller_macro! {
T,
AsyncSynchronizedHandler,
|msg, bus, ut: Arc<Mutex<T>>| async move {
ut.lock().await.handle(msg, &bus).await
|mid, msg, bus, ut: Arc<Mutex<T>>, stx: UnboundedSender<_>| {
tokio::spawn(async move {
let resp = ut.lock().await.handle(msg, &bus).await;
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
})
},
|bus, ut: Arc<Mutex<T>>| async move {
ut.lock().await.sync(&bus).await
}

View File

@ -45,19 +45,9 @@ macro_rules! synchronized_poller_macro {
while let Some(msg) = rx.recv().await {
match msg {
Request::Request(mid, msg, _req) => {
let bus = bus.clone();
let ut = ut.clone();
let stx = stx.clone();
tokio::spawn(async move {
let resp = ($st1)(msg, bus, ut)
.await;
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
})
.await
.unwrap();
($st1)(mid, msg, bus.clone(), ut.clone(), stx.clone())
.await
.unwrap()
}
Request::Action(Action::Init) => { stx.send(Event::Ready).unwrap(); }
Request::Action(Action::Close) => { rx.close(); }

View File

@ -15,15 +15,18 @@ use crate::{
receivers::Request,
Bus, Message, SynchronizedHandler, Untyped,
};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{Mutex, mpsc::{self, UnboundedSender}};
synchronized_poller_macro! {
T,
SynchronizedHandler,
|msg, bus, ut: Arc<Mutex<T>>| async move {
tokio::task::spawn_blocking(move || block_on(ut.lock()).handle(msg, &bus))
.await
.unwrap()
|mid, msg, bus, ut: Arc<Mutex<T>>, stx: UnboundedSender<_>| {
tokio::task::spawn_blocking(move || {
let resp = block_on(ut.lock()).handle(msg, &bus);
stx.send(Event::Response(mid, resp.map_err(Error::Other)))
.unwrap();
})
},
|bus, ut: Arc<Mutex<T>>| async move {
tokio::task::spawn_blocking(move || block_on(ut.lock()).sync(&bus))