use async_trait::async_trait; use messagebus::{ derive::{Error as MbError, Message}, error, receivers, AsyncHandler, Bus, Message, }; use thiserror::Error; #[derive(Debug, Error, MbError)] enum Error { #[error("Error({0})")] Error(anyhow::Error), } impl From> for Error { fn from(err: error::Error) -> Self { Self::Error(err.into()) } } #[derive(Debug, Clone, Message)] struct MsgF32(pub f32); #[derive(Debug, Clone, Message)] struct MsgF64(pub f64); struct TmpReceiver; #[async_trait] impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); async fn handle(&self, _msg: MsgF64, _bus: &Bus) -> Result { std::thread::sleep(std::time::Duration::from_millis(100)); Ok(()) } } #[async_trait] impl AsyncHandler for TmpReceiver { type Error = Error; type Response = (); async fn handle(&self, _msg: MsgF32, bus: &Bus) -> Result { bus.send(MsgF64(12.0)).await.unwrap(); bus.flush::().await; Ok(()) } } #[tokio::test] async fn test_backpressure() { let (b, poller) = Bus::build() .register(TmpReceiver) .subscribe_async::( 1, receivers::BufferUnorderedConfig { buffer_size: 1, max_parallel: 1, }, ) .done() .build(); b.send(MsgF32(10.0)).await.unwrap(); // b.idle_all().await; b.flush_all().await; b.close().await; poller.await; }