diff --git a/helix-event/src/cancel.rs b/helix-event/src/cancel.rs index f027be80e..f80ca3d9b 100644 --- a/helix-event/src/cancel.rs +++ b/helix-event/src/cancel.rs @@ -1,15 +1,18 @@ +use std::borrow::Borrow; use std::future::Future; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; -pub use oneshot::channel as cancelation; -use tokio::sync::oneshot; +use tokio::sync::Notify; -pub type CancelTx = oneshot::Sender<()>; -pub type CancelRx = oneshot::Receiver<()>; - -pub async fn cancelable_future(future: impl Future, cancel: CancelRx) -> Option { +pub async fn cancelable_future( + future: impl Future, + cancel: impl Borrow, +) -> Option { tokio::select! { biased; - _ = cancel => { + _ = cancel.borrow().canceled() => { None } res = future => { @@ -17,3 +20,266 @@ pub async fn cancelable_future(future: impl Future, cancel: Cance } } } + +#[derive(Default, Debug)] +struct Shared { + state: AtomicU64, + // notify has some features that we don't really need here because it + // supports waking single tasks (notify_one) and does it's own (more + // complicated) state tracking, we could reimplement the waiter linked list + // with modest effort and reduce memory consumption by one word/8 bytes and + // reduce code complexity/number of atomic operations. + // + // I don't think that's worth the complexity (unsafe code). + // + // if we only cared about async code then we could also only use a notify + // (without the generation count), this would be equivalent (or maybe more + // correct if we want to allow cloning the TX) but it would be extremly slow + // to frequently check for cancelation from sync code + notify: Notify, +} + +impl Shared { + fn generation(&self) -> u32 { + self.state.load(Relaxed) as u32 + } + + fn num_running(&self) -> u32 { + (self.state.load(Relaxed) >> 32) as u32 + } + + /// increments the generation count and sets num_running + /// to the provided value, this operation is not with + /// regard to the generation counter (doesn't use fetch_add) + /// so the calling code must ensure it cannot execute concurrently + /// to maintain correctness (but not safety) + fn inc_generation(&self, num_running: u32) -> (u32, u32) { + let state = self.state.load(Relaxed); + let generation = state as u32; + let prev_running = (state >> 32) as u32; + // no need to create a new generation if the refcount is zero (fastaph) + if prev_running == 0 && num_running == 0 { + return (generation, 0); + } + let new_generation = generation.saturating_add(1); + self.state.store( + new_generation as u64 | ((num_running as u64) << 32), + Relaxed, + ); + self.notify.notify_waiters(); + (new_generation, prev_running) + } + + fn inc_running(&self, generation: u32) { + let mut state = self.state.load(Relaxed); + loop { + let current_generation = state as u32; + if current_generation != generation { + break; + } + let off = 1 << 32; + let res = self.state.compare_exchange_weak( + state, + state.saturating_add(off), + Relaxed, + Relaxed, + ); + match res { + Ok(_) => break, + Err(new_state) => state = new_state, + } + } + } + + fn dec_running(&self, generation: u32) { + let mut state = self.state.load(Relaxed); + loop { + let current_generation = state as u32; + if current_generation != generation { + break; + } + let num_running = (state >> 32) as u32; + // running can't be zero here, that would mean we misscounted somewhere + assert_ne!(num_running, 0); + let off = 1 << 32; + let res = self + .state + .compare_exchange_weak(state, state - off, Relaxed, Relaxed); + match res { + Ok(_) => break, + Err(new_state) => state = new_state, + } + } + } +} + +// this intentionally doesn't implement clone and requires amutable reference +// for cancelation to avoid races (in inc_generation) + +/// A task controller allows managing a single subtask enabling the contorller +/// to cancel the subtask and to check wether it is still running. For efficency +/// reasons the controller can be reused/restarted, in that case the previous +/// task is automatically cancelled. +/// +/// If the controller is dropped the subtasks are automatically canceled. +#[derive(Default, Debug)] +pub struct TaskController { + shared: Arc, +} + +impl TaskController { + pub fn new() -> Self { + TaskController::default() + } + /// Cancels the active task (handle) + /// + /// returns wether any tasks were still running before the canellation + pub fn cancel(&mut self) -> bool { + self.shared.inc_generation(0).1 != 0 + } + + /// checks wether there are any task handles + /// that haven't been dropped (or canceled) yet + pub fn is_running(&self) -> bool { + self.shared.num_running() != 0 + } + + /// Starts a new task and cancels the previous task (handles) + pub fn restart(&mut self) -> TaskHandle { + TaskHandle { + generation: self.shared.inc_generation(1).0, + shared: self.shared.clone(), + } + } +} + +impl Drop for TaskController { + fn drop(&mut self) { + self.cancel(); + } +} + +/// A handle that is used to link a task with a task controller, it can be +/// used to cancel async futures very efficently but can also be checked for +/// cancaellation very quickly (single atomic read) in blocking code. The +/// handle can be cheaply cloned (referenced counted). +/// +/// The TaskController can check wether a task is "running" by inspecting the +/// refcount of the (current) tasks handeles. Therefore, if that information +/// is important ensure that the handle is not dropped until the task fully +/// completes +pub struct TaskHandle { + shared: Arc, + generation: u32, +} + +impl Clone for TaskHandle { + fn clone(&self) -> Self { + self.shared.inc_running(self.generation); + TaskHandle { + shared: self.shared.clone(), + generation: self.generation, + } + } +} + +impl Drop for TaskHandle { + fn drop(&mut self) { + self.shared.dec_running(self.generation); + } +} + +impl TaskHandle { + /// waits until [`TaskController::cancel`] is called for the corresponding + /// [`TaskController`]. Immidietly returns if `cancel` was already called since + pub async fn canceled(&self) { + let notified = self.shared.notify.notified(); + if !self.is_canceled() { + notified.await + } + } + + pub fn is_canceled(&self) -> bool { + self.generation != self.shared.generation() + } +} + +#[cfg(test)] +mod tests { + use std::future::poll_fn; + + use futures_executor::block_on; + use tokio::task::yield_now; + + use crate::{cancelable_future, TaskController}; + + #[test] + fn immidiate_cancel() { + let mut controller = TaskController::new(); + let handle = controller.restart(); + controller.cancel(); + assert!(handle.is_canceled()); + controller.restart(); + assert!(handle.is_canceled()); + + let res = block_on(cancelable_future( + poll_fn(|_cx| std::task::Poll::Ready(())), + handle, + )); + assert!(res.is_none()); + } + + #[test] + fn running_count() { + let mut controller = TaskController::new(); + let handle = controller.restart(); + assert!(controller.is_running()); + assert!(!handle.is_canceled()); + drop(handle); + assert!(!controller.is_running()); + assert!(!controller.cancel()); + let handle = controller.restart(); + assert!(!handle.is_canceled()); + assert!(controller.is_running()); + let handle2 = handle.clone(); + assert!(!handle.is_canceled()); + assert!(controller.is_running()); + drop(handle2); + assert!(!handle.is_canceled()); + assert!(controller.is_running()); + assert!(controller.cancel()); + assert!(handle.is_canceled()); + assert!(!controller.is_running()); + } + + #[test] + fn no_cancel() { + let mut controller = TaskController::new(); + let handle = controller.restart(); + assert!(!handle.is_canceled()); + + let res = block_on(cancelable_future( + poll_fn(|_cx| std::task::Poll::Ready(())), + handle, + )); + assert!(res.is_some()); + } + + #[test] + fn delayed_cancel() { + let mut controller = TaskController::new(); + let handle = controller.restart(); + + let mut hit = false; + let res = block_on(cancelable_future( + async { + controller.cancel(); + hit = true; + yield_now().await; + }, + handle, + )); + assert!(res.is_none()); + assert!(hit); + } +} diff --git a/helix-event/src/lib.rs b/helix-event/src/lib.rs index de018a79d..8aa6b52fa 100644 --- a/helix-event/src/lib.rs +++ b/helix-event/src/lib.rs @@ -32,7 +32,7 @@ //! to helix-view in the future if we manage to detach the compositor from its rendering backend. use anyhow::Result; -pub use cancel::{cancelable_future, cancelation, CancelRx, CancelTx}; +pub use cancel::{cancelable_future, TaskController, TaskHandle}; pub use debounce::{send_blocking, AsyncHook}; pub use redraw::{ lock_frame, redraw_requested, request_redraw, start_frame, RenderLockGuard, RequestRedrawOnDrop, diff --git a/helix-term/src/handlers/completion.rs b/helix-term/src/handlers/completion.rs index edd54c5e3..f3223487c 100644 --- a/helix-term/src/handlers/completion.rs +++ b/helix-term/src/handlers/completion.rs @@ -1,5 +1,4 @@ use std::collections::HashSet; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; @@ -8,9 +7,7 @@ use futures_util::FutureExt; use helix_core::chars::char_is_word; use helix_core::syntax::LanguageServerFeature; -use helix_event::{ - cancelable_future, cancelation, register_hook, send_blocking, CancelRx, CancelTx, -}; +use helix_event::{cancelable_future, register_hook, send_blocking, TaskController, TaskHandle}; use helix_lsp::lsp; use helix_lsp::util::pos_to_lsp_pos; use helix_stdx::rope::RopeSliceExt; @@ -59,12 +56,8 @@ pub(super) struct CompletionHandler { /// currently active trigger which will cause a /// completion request after the timeout trigger: Option, - /// A handle for currently active completion request. - /// This can be used to determine whether the current - /// request is still active (and new triggers should be - /// ignored) and can also be used to abort the current - /// request (by dropping the handle) - request: Option, + in_flight: Option, + task_controller: TaskController, config: Arc>, } @@ -72,8 +65,9 @@ impl CompletionHandler { pub fn new(config: Arc>) -> CompletionHandler { Self { config, - request: None, + task_controller: TaskController::new(), trigger: None, + in_flight: None, } } } @@ -86,6 +80,9 @@ fn handle_event( event: Self::Event, _old_timeout: Option, ) -> Option { + if self.in_flight.is_some() && !self.task_controller.is_running() { + self.in_flight = None; + } match event { CompletionEvent::AutoTrigger { cursor: trigger_pos, @@ -96,7 +93,7 @@ fn handle_event( // but people may create weird keymaps/use the mouse so lets be extra careful if self .trigger - .as_ref() + .or(self.in_flight) .map_or(true, |trigger| trigger.doc != doc || trigger.view != view) { self.trigger = Some(Trigger { @@ -109,7 +106,7 @@ fn handle_event( } CompletionEvent::TriggerChar { cursor, doc, view } => { // immediately request completions and drop all auto completion requests - self.request = None; + self.task_controller.cancel(); self.trigger = Some(Trigger { pos: cursor, view, @@ -119,7 +116,6 @@ fn handle_event( } CompletionEvent::ManualTrigger { cursor, doc, view } => { // immediately request completions and drop all auto completion requests - self.request = None; self.trigger = Some(Trigger { pos: cursor, view, @@ -132,21 +128,21 @@ fn handle_event( } CompletionEvent::Cancel => { self.trigger = None; - self.request = None; + self.task_controller.cancel(); } CompletionEvent::DeleteText { cursor } => { // if we deleted the original trigger, abort the completion - if matches!(self.trigger, Some(Trigger{ pos, .. }) if cursor < pos) { + if matches!(self.trigger.or(self.in_flight), Some(Trigger{ pos, .. }) if cursor < pos) + { self.trigger = None; - self.request = None; + self.task_controller.cancel(); } } } self.trigger.map(|trigger| { // if the current request was closed forget about it // otherwise immediately restart the completion request - let cancel = self.request.take().map_or(false, |req| !req.is_closed()); - let timeout = if trigger.kind == TriggerKind::Auto && !cancel { + let timeout = if trigger.kind == TriggerKind::Auto { self.config.load().editor.completion_timeout } else { // we want almost instant completions for trigger chars @@ -161,17 +157,17 @@ fn handle_event( fn finish_debounce(&mut self) { let trigger = self.trigger.take().expect("debounce always has a trigger"); - let (tx, rx) = cancelation(); - self.request = Some(tx); + self.in_flight = Some(trigger); + let handle = self.task_controller.restart(); dispatch_blocking(move |editor, compositor| { - request_completion(trigger, rx, editor, compositor) + request_completion(trigger, handle, editor, compositor) }); } } fn request_completion( mut trigger: Trigger, - cancel: CancelRx, + handle: TaskHandle, editor: &mut Editor, compositor: &mut Compositor, ) { @@ -202,7 +198,6 @@ fn request_completion( // necessary from our side too. trigger.pos = cursor; let trigger_text = text.slice(..cursor); - let cancel_completion = Arc::new(AtomicBool::new(false)); let mut seen_language_servers = HashSet::new(); let mut futures: FuturesUnordered<_> = doc @@ -270,12 +265,7 @@ fn request_completion( } .boxed() }) - .chain(path_completion( - cursor, - text.clone(), - doc, - cancel_completion.clone(), - )) + .chain(path_completion(cursor, text.clone(), doc, handle.clone())) .collect(); let future = async move { @@ -296,18 +286,13 @@ fn request_completion( let ui = compositor.find::().unwrap(); ui.last_insert.1.push(InsertEvent::RequestCompletion); tokio::spawn(async move { - let items = match cancelable_future(future, cancel).await { - Some(v) => v, - None => { - cancel_completion.store(true, std::sync::atomic::Ordering::Relaxed); - Vec::new() - } - }; - if items.is_empty() { + let items = cancelable_future(future, &handle).await; + let Some(items) = items.filter(|items| !items.is_empty()) else { return; - } + }; dispatch(move |editor, compositor| { - show_completion(editor, compositor, items, trigger, savepoint) + show_completion(editor, compositor, items, trigger, savepoint); + drop(handle) }) .await }); diff --git a/helix-term/src/handlers/completion/path.rs b/helix-term/src/handlers/completion/path.rs index 2bd04c65f..b7b605073 100644 --- a/helix-term/src/handlers/completion/path.rs +++ b/helix-term/src/handlers/completion/path.rs @@ -3,15 +3,12 @@ fs, path::{Path, PathBuf}, str::FromStr as _, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, }; use futures_util::{future::BoxFuture, FutureExt as _}; use helix_core as core; use helix_core::Transaction; +use helix_event::TaskHandle; use helix_stdx::path::{self, canonicalize, fold_home_dir, get_path_suffix}; use helix_view::Document; use url::Url; @@ -22,7 +19,7 @@ pub(crate) fn path_completion( cursor: usize, text: core::Rope, doc: &Document, - cancel: Arc, + handle: TaskHandle, ) -> Option>>> { if !doc.path_completion_enabled() { return None; @@ -66,7 +63,7 @@ pub(crate) fn path_completion( } })?; - if cancel.load(Ordering::Relaxed) { + if handle.is_canceled() { return None; } @@ -75,10 +72,6 @@ pub(crate) fn path_completion( return Vec::new(); }; - if cancel.load(Ordering::Relaxed) { - return Vec::new(); - } - read_dir .filter_map(Result::ok) .filter_map(|dir_entry| { @@ -88,7 +81,7 @@ pub(crate) fn path_completion( .and_then(|md| Some((dir_entry.file_name().into_string().ok()?, md))) }) .map_while(|(file_name, md)| { - if cancel.load(Ordering::Relaxed) { + if handle.is_canceled() { return None; } diff --git a/helix-term/src/handlers/completion/resolve.rs b/helix-term/src/handlers/completion/resolve.rs index cc3156017..802d6f51d 100644 --- a/helix-term/src/handlers/completion/resolve.rs +++ b/helix-term/src/handlers/completion/resolve.rs @@ -4,7 +4,7 @@ use tokio::sync::mpsc::Sender; use tokio::time::{Duration, Instant}; -use helix_event::{send_blocking, AsyncHook, CancelRx}; +use helix_event::{send_blocking, AsyncHook, TaskController, TaskHandle}; use helix_view::Editor; use super::LspCompletionItem; @@ -31,11 +31,7 @@ impl ResolveHandler { pub fn new() -> ResolveHandler { ResolveHandler { last_request: None, - resolver: ResolveTimeout { - next_request: None, - in_flight: None, - } - .spawn(), + resolver: ResolveTimeout::default().spawn(), } } @@ -101,7 +97,8 @@ struct ResolveRequest { #[derive(Default)] struct ResolveTimeout { next_request: Option, - in_flight: Option<(helix_event::CancelTx, Arc)>, + in_flight: Option>, + task_controller: TaskController, } impl AsyncHook for ResolveTimeout { @@ -121,7 +118,7 @@ fn handle_event( } else if self .in_flight .as_ref() - .is_some_and(|(_, old_request)| old_request.item == request.item.item) + .is_some_and(|old_request| old_request.item == request.item.item) { self.next_request = None; None @@ -135,14 +132,14 @@ fn finish_debounce(&mut self) { let Some(request) = self.next_request.take() else { return; }; - let (tx, rx) = helix_event::cancelation(); - self.in_flight = Some((tx, request.item.clone())); - tokio::spawn(request.execute(rx)); + let token = self.task_controller.restart(); + self.in_flight = Some(request.item.clone()); + tokio::spawn(request.execute(token)); } } impl ResolveRequest { - async fn execute(self, cancel: CancelRx) { + async fn execute(self, cancel: TaskHandle) { let future = self.ls.resolve_completion_item(&self.item.item); let Some(resolved_item) = helix_event::cancelable_future(future, cancel).await else { return; diff --git a/helix-term/src/handlers/signature_help.rs b/helix-term/src/handlers/signature_help.rs index aaa97b9a0..e4f7e935a 100644 --- a/helix-term/src/handlers/signature_help.rs +++ b/helix-term/src/handlers/signature_help.rs @@ -2,9 +2,7 @@ use std::time::Duration; use helix_core::syntax::LanguageServerFeature; -use helix_event::{ - cancelable_future, cancelation, register_hook, send_blocking, CancelRx, CancelTx, -}; +use helix_event::{cancelable_future, register_hook, send_blocking, TaskController, TaskHandle}; use helix_lsp::lsp::{self, SignatureInformation}; use helix_stdx::rope::RopeSliceExt; use helix_view::document::Mode; @@ -22,11 +20,11 @@ use crate::ui::Popup; use crate::{job, ui}; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] enum State { Open, Closed, - Pending { request: CancelTx }, + Pending, } /// debounce timeout in ms, value taken from VSCode @@ -37,6 +35,7 @@ enum State { pub(super) struct SignatureHelpHandler { trigger: Option, state: State, + task_controller: TaskController, } impl SignatureHelpHandler { @@ -44,6 +43,7 @@ pub fn new() -> SignatureHelpHandler { SignatureHelpHandler { trigger: None, state: State::Closed, + task_controller: TaskController::new(), } } } @@ -76,12 +76,11 @@ fn handle_event( } SignatureHelpEvent::RequestComplete { open } => { // don't cancel rerequest that was already triggered - if let State::Pending { request } = &self.state { - if !request.is_closed() { - return timeout; - } + if self.state == State::Pending && self.task_controller.is_running() { + return timeout; } self.state = if open { State::Open } else { State::Closed }; + self.task_controller.cancel(); return timeout; } @@ -94,16 +93,16 @@ fn handle_event( fn finish_debounce(&mut self) { let invocation = self.trigger.take().unwrap(); - let (tx, rx) = cancelation(); - self.state = State::Pending { request: tx }; - job::dispatch_blocking(move |editor, _| request_signature_help(editor, invocation, rx)) + self.state = State::Pending; + let handle = self.task_controller.restart(); + job::dispatch_blocking(move |editor, _| request_signature_help(editor, invocation, handle)) } } pub fn request_signature_help( editor: &mut Editor, invoked: SignatureHelpInvoked, - cancel: CancelRx, + cancel: TaskHandle, ) { let (view, doc) = current!(editor);