diff --git a/Cargo.lock b/Cargo.lock index e5ffee0bd..2c9d1f316 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,6 +389,7 @@ dependencies = [ "signal-hook", "signal-hook-tokio", "tokio", + "tokio-stream", "toml", ] diff --git a/helix-dap/src/client.rs b/helix-dap/src/client.rs index f2d5b865b..bc7a93d8f 100644 --- a/helix-dap/src/client.rs +++ b/helix-dap/src/client.rs @@ -1,5 +1,5 @@ use crate::{ - transport::{Event, Payload, Request, Transport}, + transport::{Payload, Request, Transport}, types::*, Result, }; @@ -9,19 +9,13 @@ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, process::Stdio, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, }; use tokio::{ io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter}, net::TcpStream, process::{Child, Command}, - sync::{ - mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}, - Mutex, - }, + sync::mpsc::{channel, unbounded_channel, UnboundedReceiver, UnboundedSender}, time, }; @@ -32,8 +26,6 @@ pub struct Client { server_tx: UnboundedSender, request_counter: AtomicU64, capabilities: Option, - awaited_events: Arc>>>, - // pub breakpoints: HashMap>, // TODO: multiple threads support @@ -46,8 +38,9 @@ pub fn streams( tx: Box, id: usize, process: Option, - ) -> Result { + ) -> Result<(Self, UnboundedReceiver)> { let (server_rx, server_tx) = Transport::start(rx, tx, id); + let (client_rx, client_tx) = unbounded_channel(); let client = Self { id, @@ -55,24 +48,30 @@ pub fn streams( server_tx, request_counter: AtomicU64::new(0), capabilities: None, - awaited_events: Arc::new(Mutex::new(HashMap::default())), // breakpoints: HashMap::new(), stack_pointer: None, }; - tokio::spawn(Self::recv(Arc::clone(&client.awaited_events), server_rx)); + tokio::spawn(Self::recv(server_rx, client_rx)); - Ok(client) + Ok((client, client_tx)) } - pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result { + pub async fn tcp( + addr: std::net::SocketAddr, + id: usize, + ) -> Result<(Self, UnboundedReceiver)> { let stream = TcpStream::connect(addr).await?; let (rx, tx) = stream.into_split(); Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None) } - pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result { + pub fn stdio( + cmd: &str, + args: Vec<&str>, + id: usize, + ) -> Result<(Self, UnboundedReceiver)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -114,7 +113,7 @@ pub async fn tcp_process( args: Vec<&str>, port_format: &str, id: usize, - ) -> Result { + ) -> Result<(Self, UnboundedReceiver)> { let port = Self::get_port().await.unwrap(); let process = Command::new(cmd) @@ -145,43 +144,22 @@ pub async fn tcp_process( ) } - async fn recv( - awaited_events: Arc>>>, - mut server_rx: UnboundedReceiver, - ) { + async fn recv(mut server_rx: UnboundedReceiver, client_tx: UnboundedSender) { while let Some(msg) = server_rx.recv().await { match msg { Payload::Event(ev) => { - let name = ev.event.clone(); - let hashmap = awaited_events.lock().await; - let tx = hashmap.get(&name); - - match tx { - Some(tx) => match tx.send(ev).await { - Ok(_) => (), - Err(_) => error!( - "Tried sending event into a closed channel (name={:?})", - name - ), - }, - None => { - info!("unhandled event {}", name); - // client_tx.send(Payload::Event(ev)).expect("Failed to send"); - } - } + client_tx.send(Payload::Event(ev)).expect("Failed to send"); } Payload::Response(_) => unreachable!(), - Payload::Request(_) => todo!(), + Payload::Request(req) => { + client_tx + .send(Payload::Request(req)) + .expect("Failed to send"); + } } } } - pub async fn listen_for_event(&self, name: String) -> Receiver { - let (rx, tx) = channel(1); - self.awaited_events.lock().await.insert(name.clone(), rx); - tx - } - pub fn id(&self) -> usize { self.id } @@ -248,8 +226,6 @@ pub async fn disconnect(&mut self) -> Result<()> { } pub async fn launch(&mut self, args: serde_json::Value) -> Result<()> { - // TODO: buffer these until initialized arrives - let response = self.request::(args).await?; log::error!("launch response {}", response); @@ -257,8 +233,6 @@ pub async fn launch(&mut self, args: serde_json::Value) -> Result<()> { } pub async fn attach(&mut self, args: serde_json::Value) -> Result<()> { - // TODO: buffer these until initialized arrives - let response = self.request::(args).await?; log::error!("attach response {}", response); diff --git a/helix-term/Cargo.toml b/helix-term/Cargo.toml index 6ed60e028..c0b2cfe56 100644 --- a/helix-term/Cargo.toml +++ b/helix-term/Cargo.toml @@ -34,7 +34,7 @@ num_cpus = "1" tui = { path = "../helix-tui", package = "helix-tui", default-features = false, features = ["crossterm"] } crossterm = { version = "0.20", features = ["event-stream"] } signal-hook = "0.3" - +tokio-stream = "0.1" futures-util = { version = "0.3", features = ["std", "async-await"], default-features = false } # Logging diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs index 011dafeef..dad6df2bc 100644 --- a/helix-term/src/application.rs +++ b/helix-term/src/application.rs @@ -1,4 +1,5 @@ use helix_core::syntax; +use helix_dap::Payload; use helix_lsp::{lsp, util::lsp_pos_to_pos, LspProgressMap}; use helix_view::{theme, Editor}; @@ -184,6 +185,29 @@ pub async fn event_loop(&mut self) { last_render = Instant::now(); } } + Some(payload) = self.editor.debugger_events.next() => { + let mut debugger = self.editor.debugger.as_mut().unwrap(); + match payload { + Payload::Event(ev) => { + match &ev.event[..] { + "stopped" => { + let main = debugger + .threads() + .await + .ok() + .and_then(|threads| threads.get(0).cloned()); + if let Some(main) = main { + let (bt, _) = debugger.stack_trace(main.id).await.unwrap(); + debugger.stack_pointer = bt.get(0).cloned(); + } + } + _ => {} + } + }, + Payload::Response(_) => unreachable!(), + Payload::Request(_) => todo!(), + } + } Some(callback) = self.jobs.futures.next() => { self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback); self.render(); diff --git a/helix-term/src/commands.rs b/helix-term/src/commands.rs index 9a8e1ee68..34a146c23 100644 --- a/helix-term/src/commands.rs +++ b/helix-term/src/commands.rs @@ -24,17 +24,17 @@ }; use insert::*; use movement::Movement; -use tokio::sync::{mpsc::Receiver, Mutex}; use crate::{ compositor::{self, Component, Compositor}, ui::{self, FilePicker, Picker, Popup, Prompt, PromptEvent}, }; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::job::{self, Job, Jobs}; use futures_util::FutureExt; +use std::num::NonZeroUsize; use std::{collections::HashMap, fmt, future::Future}; -use std::{num::NonZeroUsize, sync::Arc}; use std::{ borrow::Cow, @@ -4255,8 +4255,8 @@ fn dap_start(cx: &mut Context) { // look up config for filetype // if multiple available, open picker - let debugger = Client::tcp_process("dlv", vec!["dap"], "-l 127.0.0.1:{}", 0); - let mut debugger = block_on(debugger).unwrap(); + let started = Client::tcp_process("dlv", vec!["dap"], "-l 127.0.0.1:{}", 0); + let (mut debugger, events) = block_on(started).unwrap(); let request = debugger.initialize("go".to_owned()); let _ = block_on(request).unwrap(); @@ -4268,32 +4268,10 @@ fn dap_start(cx: &mut Context) { let request = debugger.launch(to_value(args).unwrap()); let _ = block_on(request).unwrap(); - let stopped = block_on(debugger.listen_for_event("stopped".to_owned())); - let debugger = Arc::new(Mutex::new(debugger)); - tokio::spawn(dap_listen_stopped(stopped, Arc::clone(&debugger))); - // TODO: either await "initialized" or buffer commands until event is received cx.editor.debugger = Some(debugger); -} - -async fn dap_listen_stopped( - mut stopped: Receiver, - debugger: Arc>, -) { - loop { - stopped.recv().await; - - let mut dbg = debugger.lock().await; - let main = dbg - .threads() - .await - .ok() - .and_then(|threads| threads.get(0).cloned()); - if let Some(main) = main { - let (a, _) = dbg.stack_trace(main.id).await.unwrap(); - dbg.stack_pointer = a.get(0).cloned(); - } - } + let stream = UnboundedReceiverStream::new(events); + cx.editor.debugger_events.push(stream); } fn dap_toggle_breakpoint(cx: &mut Context) { @@ -4321,7 +4299,6 @@ fn dap_toggle_breakpoint(cx: &mut Context) { // we shouldn't really allow editing while debug is running though if let Some(debugger) = &mut cx.editor.debugger { - let mut debugger = block_on(debugger.lock()); let breakpoints = debugger.breakpoints.entry(path.clone()).or_default(); if let Some(pos) = breakpoints.iter().position(|b| b.line == breakpoint.line) { breakpoints.remove(pos); @@ -4340,7 +4317,6 @@ fn dap_run(cx: &mut Context) { use helix_lsp::block_on; if let Some(debugger) = &mut cx.editor.debugger { - let mut debugger = block_on(debugger.lock()); let request = debugger.configuration_done(); let _ = block_on(request).unwrap(); } diff --git a/helix-term/src/ui/editor.rs b/helix-term/src/ui/editor.rs index 7f441a7d2..f8035ae4c 100644 --- a/helix-term/src/ui/editor.rs +++ b/helix-term/src/ui/editor.rs @@ -25,8 +25,7 @@ keyboard::{KeyCode, KeyModifiers}, Document, Editor, Theme, View, }; -use std::{borrow::Cow, sync::Arc}; -use tokio::sync::Mutex; +use std::borrow::Cow; use crossterm::event::{Event, MouseButton, MouseEvent, MouseEventKind}; use tui::buffer::Buffer as Surface; @@ -73,7 +72,7 @@ pub fn render_view( is_focused: bool, loader: &syntax::Loader, config: &helix_view::editor::Config, - debugger: Option>>, + debugger: &Option, ) { let inner = view.inner_area(); let area = view.area; @@ -414,9 +413,8 @@ pub fn render_gutter( theme: &Theme, is_focused: bool, config: &helix_view::editor::Config, - debugger: Option>>, + debugger: &Option, ) { - use helix_lsp::block_on; let text = doc.text().slice(..); let last_line = view.last_line(doc); @@ -449,9 +447,8 @@ pub fn render_gutter( let mut stack_pointer: Option = None; if let Some(debugger) = debugger { if let Some(path) = doc.path() { - let dbg = block_on(debugger.lock()); - breakpoints = dbg.breakpoints.get(path).cloned(); - stack_pointer = dbg.stack_pointer.clone() + breakpoints = debugger.breakpoints.get(path).cloned(); + stack_pointer = debugger.stack_pointer.clone() } } @@ -1047,10 +1044,6 @@ fn render(&mut self, area: Rect, surface: &mut Surface, cx: &mut Context) { for (view, is_focused) in cx.editor.tree.views() { let doc = cx.editor.document(view.doc).unwrap(); let loader = &cx.editor.syn_loader; - let mut dbg: Option>> = None; - if let Some(debugger) = &cx.editor.debugger { - dbg = Some(Arc::clone(debugger)); - } self.render_view( doc, view, @@ -1060,7 +1053,7 @@ fn render(&mut self, area: Rect, surface: &mut Surface, cx: &mut Context) { is_focused, loader, &cx.editor.config, - dbg, + &cx.editor.debugger, ); } diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs index 2309379af..17319327e 100644 --- a/helix-view/src/editor.rs +++ b/helix-view/src/editor.rs @@ -8,7 +8,6 @@ use futures_util::future; use futures_util::stream::select_all::SelectAll; -use tokio::sync::Mutex; use tokio_stream::wrappers::UnboundedReceiverStream; use std::{ @@ -75,8 +74,8 @@ pub struct Editor { pub theme: Theme, pub language_servers: helix_lsp::Registry, - pub debugger: Option>>, - pub debuggers: SelectAll>, + pub debugger: Option, + pub debugger_events: SelectAll>, pub clipboard_provider: Box, @@ -116,7 +115,7 @@ pub fn new( theme: themes.default(), language_servers, debugger: None, - debuggers: SelectAll::new(), + debugger_events: SelectAll::new(), syn_loader: config_loader, theme_loader: themes, registers: Registers::default(),