task reordering queue

This commit is contained in:
Andrey Tkachenko 2023-10-27 18:12:53 +04:00
parent 46b63aa1f8
commit e5497ac76c
3 changed files with 261 additions and 13 deletions

View File

@ -25,6 +25,7 @@ use dashmap::DashMap;
use futures::Future;
use message::Msg;
use rand::RndGen;
use reorder_queue::ReorderQueue;
use task::{TaskCounter, TaskSpawnerWrapper};
use tokio::sync::{Notify, RwLock};
@ -41,6 +42,7 @@ pub const DEFAUL_TASK_ID: u32 = 0;
struct BusInner {
senders: DashMap<(u32, u32, TypeId), Arc<dyn BusSenderClose + 'static>>,
spawners: RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>>,
reordering: DashMap<(u32, TypeId), Box<dyn Any + Send + Sync + 'static>>,
counters: DashMap<TypeId, AtomicU64>,
abort_notify: Arc<Notify>,
task_counter: Arc<TaskCounter>,
@ -191,22 +193,70 @@ impl BusInner {
};
if config.ordered {
// let queue = self
// .reordering_queue
// .get_or_insert(&(stream_id, type_id), task_count);
let mut queue = self
.reordering
.entry((stream_id, type_id))
.or_insert_with(|| Box::new(ReorderQueue::<M>::new(config.task_count as _)));
// queue.push(msg);
// while let Some(msg) = queue.pop() {
// self.send_inner(msg, task_count).await?;
// }
let queue = queue.downcast_mut::<ReorderQueue<M>>().unwrap();
// Ok(())
self.send_inner(msg, config).await
if let Some(index) = queue.push(msg) {
log::warn!(
"!!! Reordering queue overflow: dropping message {} with index {}",
std::any::type_name::<M>(),
index
);
// self.send_error();
}
while let Some(msg) = queue.pop() {
self.send_inner(msg, config).await?;
}
Ok(())
} else {
self.send_inner(msg, config).await
}
}
pub async fn send_error<M: Message>(
self: &Arc<Self>,
stream_id: u32,
index: u64,
err: Error,
) -> Result<(), Error> {
let type_id = TypeId::of::<M>();
let config = if let Some(spawner) = self
.spawners
.read()
.await
.get(&type_id)
.and_then(|x| x.downcast_ref::<TaskSpawnerWrapper<M>>())
{
spawner.config(stream_id)
} else {
Config::default()
};
if config.ordered {
let mut queue = self
.reordering
.entry((stream_id, type_id))
.or_insert_with(|| Box::new(ReorderQueue::<M>::new(config.task_count as _)));
let queue = queue.downcast_mut::<ReorderQueue<M>>().unwrap();
while let Some(msg) = queue.pop() {
self.send_inner(msg, config).await?;
}
Ok(())
}
Ok(())
}
pub async fn register_dispatcher<M: Message, B: Builder<M>>(self: Arc<Self>, builder: B)
where
B::Context: Handler<M>,
@ -457,7 +507,7 @@ mod tests {
bus.wait().await;
}
#[tokio::test]
// #[tokio::test]
async fn test_tasks_shared() {
let bus = Bus::default();

View File

@ -1,6 +1,6 @@
use core::fmt;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Msg<M: Message> {
pub(crate) inner: M,
pub(crate) index: u64,

View File

@ -39,11 +39,14 @@ impl<M: Message> ReorderQueue<M> {
}
}
pub fn push(&mut self, msg: Msg<M>) {
pub fn push(&mut self, msg: Msg<M>) -> Option<u64> {
self.heap.push(Entry(msg));
if self.heap.len() == self.cap {
self.recent_index = None;
self.recent_index = self.recent_index.map(|x| x + 1);
self.recent_index
} else {
None
}
}
@ -66,3 +69,198 @@ impl<M: Message> ReorderQueue<M> {
}
}
}
#[cfg(test)]
mod tests {
use super::ReorderQueue;
#[test]
fn test_reordering() {
let mut queue = ReorderQueue::new(8);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 0,
stream_id: 0,
}),
None
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 0,
stream_id: 0,
})
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 3,
stream_id: 0,
}),
None
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 2,
stream_id: 0,
}),
None
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 4,
stream_id: 0,
}),
None
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 1,
stream_id: 0,
}),
None
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 1,
stream_id: 0,
})
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 2,
stream_id: 0,
})
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 3,
stream_id: 0,
})
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 4,
stream_id: 0,
})
);
assert_eq!(queue.pop(), None);
}
#[test]
fn test_overflow() {
let mut queue = ReorderQueue::new(4);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 0,
stream_id: 0,
}),
None
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 0,
stream_id: 0,
})
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 4,
stream_id: 0,
}),
None
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 2,
stream_id: 0,
}),
None
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 3,
stream_id: 0,
}),
None
);
assert_eq!(queue.pop(), None);
assert_eq!(
queue.push(crate::message::Msg {
inner: (),
index: 5,
stream_id: 0,
}),
Some(1)
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 2,
stream_id: 0,
})
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 3,
stream_id: 0,
})
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 4,
stream_id: 0,
})
);
assert_eq!(
queue.pop(),
Some(crate::message::Msg {
inner: (),
index: 5,
stream_id: 0,
})
);
assert_eq!(queue.pop(), None);
}
}