improve completion cancelation

This commit is contained in:
Pascal Kuthe 2024-10-22 00:53:28 +02:00 committed by Philipp Mildenberger
parent 381732197e
commit d73f158b43
6 changed files with 324 additions and 84 deletions

View File

@ -1,15 +1,18 @@
use std::borrow::Borrow;
use std::future::Future; use std::future::Future;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
pub use oneshot::channel as cancelation; use tokio::sync::Notify;
use tokio::sync::oneshot;
pub type CancelTx = oneshot::Sender<()>; pub async fn cancelable_future<T>(
pub type CancelRx = oneshot::Receiver<()>; future: impl Future<Output = T>,
cancel: impl Borrow<TaskHandle>,
pub async fn cancelable_future<T>(future: impl Future<Output = T>, cancel: CancelRx) -> Option<T> { ) -> Option<T> {
tokio::select! { tokio::select! {
biased; biased;
_ = cancel => { _ = cancel.borrow().canceled() => {
None None
} }
res = future => { res = future => {
@ -17,3 +20,266 @@ pub async fn cancelable_future<T>(future: impl Future<Output = T>, cancel: Cance
} }
} }
} }
#[derive(Default, Debug)]
struct Shared {
state: AtomicU64,
// notify has some features that we don't really need here because it
// supports waking single tasks (notify_one) and does it's own (more
// complicated) state tracking, we could reimplement the waiter linked list
// with modest effort and reduce memory consumption by one word/8 bytes and
// reduce code complexity/number of atomic operations.
//
// I don't think that's worth the complexity (unsafe code).
//
// if we only cared about async code then we could also only use a notify
// (without the generation count), this would be equivalent (or maybe more
// correct if we want to allow cloning the TX) but it would be extremly slow
// to frequently check for cancelation from sync code
notify: Notify,
}
impl Shared {
fn generation(&self) -> u32 {
self.state.load(Relaxed) as u32
}
fn num_running(&self) -> u32 {
(self.state.load(Relaxed) >> 32) as u32
}
/// increments the generation count and sets num_running
/// to the provided value, this operation is not with
/// regard to the generation counter (doesn't use fetch_add)
/// so the calling code must ensure it cannot execute concurrently
/// to maintain correctness (but not safety)
fn inc_generation(&self, num_running: u32) -> (u32, u32) {
let state = self.state.load(Relaxed);
let generation = state as u32;
let prev_running = (state >> 32) as u32;
// no need to create a new generation if the refcount is zero (fastaph)
if prev_running == 0 && num_running == 0 {
return (generation, 0);
}
let new_generation = generation.saturating_add(1);
self.state.store(
new_generation as u64 | ((num_running as u64) << 32),
Relaxed,
);
self.notify.notify_waiters();
(new_generation, prev_running)
}
fn inc_running(&self, generation: u32) {
let mut state = self.state.load(Relaxed);
loop {
let current_generation = state as u32;
if current_generation != generation {
break;
}
let off = 1 << 32;
let res = self.state.compare_exchange_weak(
state,
state.saturating_add(off),
Relaxed,
Relaxed,
);
match res {
Ok(_) => break,
Err(new_state) => state = new_state,
}
}
}
fn dec_running(&self, generation: u32) {
let mut state = self.state.load(Relaxed);
loop {
let current_generation = state as u32;
if current_generation != generation {
break;
}
let num_running = (state >> 32) as u32;
// running can't be zero here, that would mean we misscounted somewhere
assert_ne!(num_running, 0);
let off = 1 << 32;
let res = self
.state
.compare_exchange_weak(state, state - off, Relaxed, Relaxed);
match res {
Ok(_) => break,
Err(new_state) => state = new_state,
}
}
}
}
// this intentionally doesn't implement clone and requires amutable reference
// for cancelation to avoid races (in inc_generation)
/// A task controller allows managing a single subtask enabling the contorller
/// to cancel the subtask and to check wether it is still running. For efficency
/// reasons the controller can be reused/restarted, in that case the previous
/// task is automatically cancelled.
///
/// If the controller is dropped the subtasks are automatically canceled.
#[derive(Default, Debug)]
pub struct TaskController {
shared: Arc<Shared>,
}
impl TaskController {
pub fn new() -> Self {
TaskController::default()
}
/// Cancels the active task (handle)
///
/// returns wether any tasks were still running before the canellation
pub fn cancel(&mut self) -> bool {
self.shared.inc_generation(0).1 != 0
}
/// checks wether there are any task handles
/// that haven't been dropped (or canceled) yet
pub fn is_running(&self) -> bool {
self.shared.num_running() != 0
}
/// Starts a new task and cancels the previous task (handles)
pub fn restart(&mut self) -> TaskHandle {
TaskHandle {
generation: self.shared.inc_generation(1).0,
shared: self.shared.clone(),
}
}
}
impl Drop for TaskController {
fn drop(&mut self) {
self.cancel();
}
}
/// A handle that is used to link a task with a task controller, it can be
/// used to cancel async futures very efficently but can also be checked for
/// cancaellation very quickly (single atomic read) in blocking code. The
/// handle can be cheaply cloned (referenced counted).
///
/// The TaskController can check wether a task is "running" by inspecting the
/// refcount of the (current) tasks handeles. Therefore, if that information
/// is important ensure that the handle is not dropped until the task fully
/// completes
pub struct TaskHandle {
shared: Arc<Shared>,
generation: u32,
}
impl Clone for TaskHandle {
fn clone(&self) -> Self {
self.shared.inc_running(self.generation);
TaskHandle {
shared: self.shared.clone(),
generation: self.generation,
}
}
}
impl Drop for TaskHandle {
fn drop(&mut self) {
self.shared.dec_running(self.generation);
}
}
impl TaskHandle {
/// waits until [`TaskController::cancel`] is called for the corresponding
/// [`TaskController`]. Immidietly returns if `cancel` was already called since
pub async fn canceled(&self) {
let notified = self.shared.notify.notified();
if !self.is_canceled() {
notified.await
}
}
pub fn is_canceled(&self) -> bool {
self.generation != self.shared.generation()
}
}
#[cfg(test)]
mod tests {
use std::future::poll_fn;
use futures_executor::block_on;
use tokio::task::yield_now;
use crate::{cancelable_future, TaskController};
#[test]
fn immidiate_cancel() {
let mut controller = TaskController::new();
let handle = controller.restart();
controller.cancel();
assert!(handle.is_canceled());
controller.restart();
assert!(handle.is_canceled());
let res = block_on(cancelable_future(
poll_fn(|_cx| std::task::Poll::Ready(())),
handle,
));
assert!(res.is_none());
}
#[test]
fn running_count() {
let mut controller = TaskController::new();
let handle = controller.restart();
assert!(controller.is_running());
assert!(!handle.is_canceled());
drop(handle);
assert!(!controller.is_running());
assert!(!controller.cancel());
let handle = controller.restart();
assert!(!handle.is_canceled());
assert!(controller.is_running());
let handle2 = handle.clone();
assert!(!handle.is_canceled());
assert!(controller.is_running());
drop(handle2);
assert!(!handle.is_canceled());
assert!(controller.is_running());
assert!(controller.cancel());
assert!(handle.is_canceled());
assert!(!controller.is_running());
}
#[test]
fn no_cancel() {
let mut controller = TaskController::new();
let handle = controller.restart();
assert!(!handle.is_canceled());
let res = block_on(cancelable_future(
poll_fn(|_cx| std::task::Poll::Ready(())),
handle,
));
assert!(res.is_some());
}
#[test]
fn delayed_cancel() {
let mut controller = TaskController::new();
let handle = controller.restart();
let mut hit = false;
let res = block_on(cancelable_future(
async {
controller.cancel();
hit = true;
yield_now().await;
},
handle,
));
assert!(res.is_none());
assert!(hit);
}
}

View File

@ -32,7 +32,7 @@
//! to helix-view in the future if we manage to detach the compositor from its rendering backend. //! to helix-view in the future if we manage to detach the compositor from its rendering backend.
use anyhow::Result; use anyhow::Result;
pub use cancel::{cancelable_future, cancelation, CancelRx, CancelTx}; pub use cancel::{cancelable_future, TaskController, TaskHandle};
pub use debounce::{send_blocking, AsyncHook}; pub use debounce::{send_blocking, AsyncHook};
pub use redraw::{ pub use redraw::{
lock_frame, redraw_requested, request_redraw, start_frame, RenderLockGuard, RequestRedrawOnDrop, lock_frame, redraw_requested, request_redraw, start_frame, RenderLockGuard, RequestRedrawOnDrop,

View File

@ -1,5 +1,4 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -8,9 +7,7 @@
use futures_util::FutureExt; use futures_util::FutureExt;
use helix_core::chars::char_is_word; use helix_core::chars::char_is_word;
use helix_core::syntax::LanguageServerFeature; use helix_core::syntax::LanguageServerFeature;
use helix_event::{ use helix_event::{cancelable_future, register_hook, send_blocking, TaskController, TaskHandle};
cancelable_future, cancelation, register_hook, send_blocking, CancelRx, CancelTx,
};
use helix_lsp::lsp; use helix_lsp::lsp;
use helix_lsp::util::pos_to_lsp_pos; use helix_lsp::util::pos_to_lsp_pos;
use helix_stdx::rope::RopeSliceExt; use helix_stdx::rope::RopeSliceExt;
@ -59,12 +56,8 @@ pub(super) struct CompletionHandler {
/// currently active trigger which will cause a /// currently active trigger which will cause a
/// completion request after the timeout /// completion request after the timeout
trigger: Option<Trigger>, trigger: Option<Trigger>,
/// A handle for currently active completion request. in_flight: Option<Trigger>,
/// This can be used to determine whether the current task_controller: TaskController,
/// request is still active (and new triggers should be
/// ignored) and can also be used to abort the current
/// request (by dropping the handle)
request: Option<CancelTx>,
config: Arc<ArcSwap<Config>>, config: Arc<ArcSwap<Config>>,
} }
@ -72,8 +65,9 @@ impl CompletionHandler {
pub fn new(config: Arc<ArcSwap<Config>>) -> CompletionHandler { pub fn new(config: Arc<ArcSwap<Config>>) -> CompletionHandler {
Self { Self {
config, config,
request: None, task_controller: TaskController::new(),
trigger: None, trigger: None,
in_flight: None,
} }
} }
} }
@ -86,6 +80,9 @@ fn handle_event(
event: Self::Event, event: Self::Event,
_old_timeout: Option<Instant>, _old_timeout: Option<Instant>,
) -> Option<Instant> { ) -> Option<Instant> {
if self.in_flight.is_some() && !self.task_controller.is_running() {
self.in_flight = None;
}
match event { match event {
CompletionEvent::AutoTrigger { CompletionEvent::AutoTrigger {
cursor: trigger_pos, cursor: trigger_pos,
@ -96,7 +93,7 @@ fn handle_event(
// but people may create weird keymaps/use the mouse so lets be extra careful // but people may create weird keymaps/use the mouse so lets be extra careful
if self if self
.trigger .trigger
.as_ref() .or(self.in_flight)
.map_or(true, |trigger| trigger.doc != doc || trigger.view != view) .map_or(true, |trigger| trigger.doc != doc || trigger.view != view)
{ {
self.trigger = Some(Trigger { self.trigger = Some(Trigger {
@ -109,7 +106,7 @@ fn handle_event(
} }
CompletionEvent::TriggerChar { cursor, doc, view } => { CompletionEvent::TriggerChar { cursor, doc, view } => {
// immediately request completions and drop all auto completion requests // immediately request completions and drop all auto completion requests
self.request = None; self.task_controller.cancel();
self.trigger = Some(Trigger { self.trigger = Some(Trigger {
pos: cursor, pos: cursor,
view, view,
@ -119,7 +116,6 @@ fn handle_event(
} }
CompletionEvent::ManualTrigger { cursor, doc, view } => { CompletionEvent::ManualTrigger { cursor, doc, view } => {
// immediately request completions and drop all auto completion requests // immediately request completions and drop all auto completion requests
self.request = None;
self.trigger = Some(Trigger { self.trigger = Some(Trigger {
pos: cursor, pos: cursor,
view, view,
@ -132,21 +128,21 @@ fn handle_event(
} }
CompletionEvent::Cancel => { CompletionEvent::Cancel => {
self.trigger = None; self.trigger = None;
self.request = None; self.task_controller.cancel();
} }
CompletionEvent::DeleteText { cursor } => { CompletionEvent::DeleteText { cursor } => {
// if we deleted the original trigger, abort the completion // if we deleted the original trigger, abort the completion
if matches!(self.trigger, Some(Trigger{ pos, .. }) if cursor < pos) { if matches!(self.trigger.or(self.in_flight), Some(Trigger{ pos, .. }) if cursor < pos)
{
self.trigger = None; self.trigger = None;
self.request = None; self.task_controller.cancel();
} }
} }
} }
self.trigger.map(|trigger| { self.trigger.map(|trigger| {
// if the current request was closed forget about it // if the current request was closed forget about it
// otherwise immediately restart the completion request // otherwise immediately restart the completion request
let cancel = self.request.take().map_or(false, |req| !req.is_closed()); let timeout = if trigger.kind == TriggerKind::Auto {
let timeout = if trigger.kind == TriggerKind::Auto && !cancel {
self.config.load().editor.completion_timeout self.config.load().editor.completion_timeout
} else { } else {
// we want almost instant completions for trigger chars // we want almost instant completions for trigger chars
@ -161,17 +157,17 @@ fn handle_event(
fn finish_debounce(&mut self) { fn finish_debounce(&mut self) {
let trigger = self.trigger.take().expect("debounce always has a trigger"); let trigger = self.trigger.take().expect("debounce always has a trigger");
let (tx, rx) = cancelation(); self.in_flight = Some(trigger);
self.request = Some(tx); let handle = self.task_controller.restart();
dispatch_blocking(move |editor, compositor| { dispatch_blocking(move |editor, compositor| {
request_completion(trigger, rx, editor, compositor) request_completion(trigger, handle, editor, compositor)
}); });
} }
} }
fn request_completion( fn request_completion(
mut trigger: Trigger, mut trigger: Trigger,
cancel: CancelRx, handle: TaskHandle,
editor: &mut Editor, editor: &mut Editor,
compositor: &mut Compositor, compositor: &mut Compositor,
) { ) {
@ -202,7 +198,6 @@ fn request_completion(
// necessary from our side too. // necessary from our side too.
trigger.pos = cursor; trigger.pos = cursor;
let trigger_text = text.slice(..cursor); let trigger_text = text.slice(..cursor);
let cancel_completion = Arc::new(AtomicBool::new(false));
let mut seen_language_servers = HashSet::new(); let mut seen_language_servers = HashSet::new();
let mut futures: FuturesUnordered<_> = doc let mut futures: FuturesUnordered<_> = doc
@ -270,12 +265,7 @@ fn request_completion(
} }
.boxed() .boxed()
}) })
.chain(path_completion( .chain(path_completion(cursor, text.clone(), doc, handle.clone()))
cursor,
text.clone(),
doc,
cancel_completion.clone(),
))
.collect(); .collect();
let future = async move { let future = async move {
@ -296,18 +286,13 @@ fn request_completion(
let ui = compositor.find::<ui::EditorView>().unwrap(); let ui = compositor.find::<ui::EditorView>().unwrap();
ui.last_insert.1.push(InsertEvent::RequestCompletion); ui.last_insert.1.push(InsertEvent::RequestCompletion);
tokio::spawn(async move { tokio::spawn(async move {
let items = match cancelable_future(future, cancel).await { let items = cancelable_future(future, &handle).await;
Some(v) => v, let Some(items) = items.filter(|items| !items.is_empty()) else {
None => {
cancel_completion.store(true, std::sync::atomic::Ordering::Relaxed);
Vec::new()
}
};
if items.is_empty() {
return; return;
} };
dispatch(move |editor, compositor| { dispatch(move |editor, compositor| {
show_completion(editor, compositor, items, trigger, savepoint) show_completion(editor, compositor, items, trigger, savepoint);
drop(handle)
}) })
.await .await
}); });

View File

@ -3,15 +3,12 @@
fs, fs,
path::{Path, PathBuf}, path::{Path, PathBuf},
str::FromStr as _, str::FromStr as _,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
}; };
use futures_util::{future::BoxFuture, FutureExt as _}; use futures_util::{future::BoxFuture, FutureExt as _};
use helix_core as core; use helix_core as core;
use helix_core::Transaction; use helix_core::Transaction;
use helix_event::TaskHandle;
use helix_stdx::path::{self, canonicalize, fold_home_dir, get_path_suffix}; use helix_stdx::path::{self, canonicalize, fold_home_dir, get_path_suffix};
use helix_view::Document; use helix_view::Document;
use url::Url; use url::Url;
@ -22,7 +19,7 @@ pub(crate) fn path_completion(
cursor: usize, cursor: usize,
text: core::Rope, text: core::Rope,
doc: &Document, doc: &Document,
cancel: Arc<AtomicBool>, handle: TaskHandle,
) -> Option<BoxFuture<'static, anyhow::Result<Vec<CompletionItem>>>> { ) -> Option<BoxFuture<'static, anyhow::Result<Vec<CompletionItem>>>> {
if !doc.path_completion_enabled() { if !doc.path_completion_enabled() {
return None; return None;
@ -66,7 +63,7 @@ pub(crate) fn path_completion(
} }
})?; })?;
if cancel.load(Ordering::Relaxed) { if handle.is_canceled() {
return None; return None;
} }
@ -75,10 +72,6 @@ pub(crate) fn path_completion(
return Vec::new(); return Vec::new();
}; };
if cancel.load(Ordering::Relaxed) {
return Vec::new();
}
read_dir read_dir
.filter_map(Result::ok) .filter_map(Result::ok)
.filter_map(|dir_entry| { .filter_map(|dir_entry| {
@ -88,7 +81,7 @@ pub(crate) fn path_completion(
.and_then(|md| Some((dir_entry.file_name().into_string().ok()?, md))) .and_then(|md| Some((dir_entry.file_name().into_string().ok()?, md)))
}) })
.map_while(|(file_name, md)| { .map_while(|(file_name, md)| {
if cancel.load(Ordering::Relaxed) { if handle.is_canceled() {
return None; return None;
} }

View File

@ -4,7 +4,7 @@
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use helix_event::{send_blocking, AsyncHook, CancelRx}; use helix_event::{send_blocking, AsyncHook, TaskController, TaskHandle};
use helix_view::Editor; use helix_view::Editor;
use super::LspCompletionItem; use super::LspCompletionItem;
@ -31,11 +31,7 @@ impl ResolveHandler {
pub fn new() -> ResolveHandler { pub fn new() -> ResolveHandler {
ResolveHandler { ResolveHandler {
last_request: None, last_request: None,
resolver: ResolveTimeout { resolver: ResolveTimeout::default().spawn(),
next_request: None,
in_flight: None,
}
.spawn(),
} }
} }
@ -101,7 +97,8 @@ struct ResolveRequest {
#[derive(Default)] #[derive(Default)]
struct ResolveTimeout { struct ResolveTimeout {
next_request: Option<ResolveRequest>, next_request: Option<ResolveRequest>,
in_flight: Option<(helix_event::CancelTx, Arc<LspCompletionItem>)>, in_flight: Option<Arc<LspCompletionItem>>,
task_controller: TaskController,
} }
impl AsyncHook for ResolveTimeout { impl AsyncHook for ResolveTimeout {
@ -121,7 +118,7 @@ fn handle_event(
} else if self } else if self
.in_flight .in_flight
.as_ref() .as_ref()
.is_some_and(|(_, old_request)| old_request.item == request.item.item) .is_some_and(|old_request| old_request.item == request.item.item)
{ {
self.next_request = None; self.next_request = None;
None None
@ -135,14 +132,14 @@ fn finish_debounce(&mut self) {
let Some(request) = self.next_request.take() else { let Some(request) = self.next_request.take() else {
return; return;
}; };
let (tx, rx) = helix_event::cancelation(); let token = self.task_controller.restart();
self.in_flight = Some((tx, request.item.clone())); self.in_flight = Some(request.item.clone());
tokio::spawn(request.execute(rx)); tokio::spawn(request.execute(token));
} }
} }
impl ResolveRequest { impl ResolveRequest {
async fn execute(self, cancel: CancelRx) { async fn execute(self, cancel: TaskHandle) {
let future = self.ls.resolve_completion_item(&self.item.item); let future = self.ls.resolve_completion_item(&self.item.item);
let Some(resolved_item) = helix_event::cancelable_future(future, cancel).await else { let Some(resolved_item) = helix_event::cancelable_future(future, cancel).await else {
return; return;

View File

@ -2,9 +2,7 @@
use std::time::Duration; use std::time::Duration;
use helix_core::syntax::LanguageServerFeature; use helix_core::syntax::LanguageServerFeature;
use helix_event::{ use helix_event::{cancelable_future, register_hook, send_blocking, TaskController, TaskHandle};
cancelable_future, cancelation, register_hook, send_blocking, CancelRx, CancelTx,
};
use helix_lsp::lsp::{self, SignatureInformation}; use helix_lsp::lsp::{self, SignatureInformation};
use helix_stdx::rope::RopeSliceExt; use helix_stdx::rope::RopeSliceExt;
use helix_view::document::Mode; use helix_view::document::Mode;
@ -22,11 +20,11 @@
use crate::ui::Popup; use crate::ui::Popup;
use crate::{job, ui}; use crate::{job, ui};
#[derive(Debug)] #[derive(Debug, PartialEq, Eq)]
enum State { enum State {
Open, Open,
Closed, Closed,
Pending { request: CancelTx }, Pending,
} }
/// debounce timeout in ms, value taken from VSCode /// debounce timeout in ms, value taken from VSCode
@ -37,6 +35,7 @@ enum State {
pub(super) struct SignatureHelpHandler { pub(super) struct SignatureHelpHandler {
trigger: Option<SignatureHelpInvoked>, trigger: Option<SignatureHelpInvoked>,
state: State, state: State,
task_controller: TaskController,
} }
impl SignatureHelpHandler { impl SignatureHelpHandler {
@ -44,6 +43,7 @@ pub fn new() -> SignatureHelpHandler {
SignatureHelpHandler { SignatureHelpHandler {
trigger: None, trigger: None,
state: State::Closed, state: State::Closed,
task_controller: TaskController::new(),
} }
} }
} }
@ -76,12 +76,11 @@ fn handle_event(
} }
SignatureHelpEvent::RequestComplete { open } => { SignatureHelpEvent::RequestComplete { open } => {
// don't cancel rerequest that was already triggered // don't cancel rerequest that was already triggered
if let State::Pending { request } = &self.state { if self.state == State::Pending && self.task_controller.is_running() {
if !request.is_closed() { return timeout;
return timeout;
}
} }
self.state = if open { State::Open } else { State::Closed }; self.state = if open { State::Open } else { State::Closed };
self.task_controller.cancel();
return timeout; return timeout;
} }
@ -94,16 +93,16 @@ fn handle_event(
fn finish_debounce(&mut self) { fn finish_debounce(&mut self) {
let invocation = self.trigger.take().unwrap(); let invocation = self.trigger.take().unwrap();
let (tx, rx) = cancelation(); self.state = State::Pending;
self.state = State::Pending { request: tx }; let handle = self.task_controller.restart();
job::dispatch_blocking(move |editor, _| request_signature_help(editor, invocation, rx)) job::dispatch_blocking(move |editor, _| request_signature_help(editor, invocation, handle))
} }
} }
pub fn request_signature_help( pub fn request_signature_help(
editor: &mut Editor, editor: &mut Editor,
invoked: SignatureHelpInvoked, invoked: SignatureHelpInvoked,
cancel: CancelRx, cancel: TaskHandle,
) { ) {
let (view, doc) = current!(editor); let (view, doc) = current!(editor);