lsp: Don't send notifications until initialize completes

Then send open events for all documents with the LSP attached.
This commit is contained in:
Blaž Hrastnik 2021-09-02 13:55:08 +09:00
parent 2793ff3832
commit 46f3c69f06
4 changed files with 105 additions and 46 deletions

View File

@ -226,6 +226,8 @@ pub fn parse(method: &str, params: jsonrpc::Params) -> Option<MethodCall> {
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub enum Notification { pub enum Notification {
// we inject this notification to signal the LSP is ready
Initialized,
PublishDiagnostics(lsp::PublishDiagnosticsParams), PublishDiagnostics(lsp::PublishDiagnosticsParams),
ShowMessage(lsp::ShowMessageParams), ShowMessage(lsp::ShowMessageParams),
LogMessage(lsp::LogMessageParams), LogMessage(lsp::LogMessageParams),
@ -237,6 +239,7 @@ pub fn parse(method: &str, params: jsonrpc::Params) -> Option<Notification> {
use lsp::notification::Notification as _; use lsp::notification::Notification as _;
let notification = match method { let notification = match method {
lsp::notification::Initialized::METHOD => Self::Initialized,
lsp::notification::PublishDiagnostics::METHOD => { lsp::notification::PublishDiagnostics::METHOD => {
let params: lsp::PublishDiagnosticsParams = params let params: lsp::PublishDiagnosticsParams = params
.parse() .parse()
@ -294,7 +297,7 @@ pub fn new() -> Self {
} }
} }
pub fn get_by_id(&mut self, id: usize) -> Option<&Client> { pub fn get_by_id(&self, id: usize) -> Option<&Client> {
self.inner self.inner
.values() .values()
.find(|(client_id, _)| client_id == &id) .find(|(client_id, _)| client_id == &id)
@ -302,55 +305,52 @@ pub fn get_by_id(&mut self, id: usize) -> Option<&Client> {
} }
pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result<Arc<Client>> { pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result<Arc<Client>> {
if let Some(config) = &language_config.language_server { let config = match &language_config.language_server {
// avoid borrow issues Some(config) => config,
let inner = &mut self.inner; None => return Err(Error::LspNotDefined),
let s_incoming = &mut self.incoming; };
match inner.entry(language_config.scope.clone()) { match self.inner.entry(language_config.scope.clone()) {
Entry::Occupied(entry) => Ok(entry.get().1.clone()), Entry::Occupied(entry) => Ok(entry.get().1.clone()),
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
// initialize a new client // initialize a new client
let id = self.counter.fetch_add(1, Ordering::Relaxed); let id = self.counter.fetch_add(1, Ordering::Relaxed);
let (client, incoming, initialize_notify) = Client::start( let (client, incoming, initialize_notify) = Client::start(
&config.command, &config.command,
&config.args, &config.args,
serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(), serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(),
id, id,
)?; )?;
s_incoming.push(UnboundedReceiverStream::new(incoming)); self.incoming.push(UnboundedReceiverStream::new(incoming));
let client = Arc::new(client); let client = Arc::new(client);
let _client = client.clone(); // Initialize the client asynchronously
// Initialize the client asynchronously let _client = client.clone();
tokio::spawn(async move { tokio::spawn(async move {
use futures_util::TryFutureExt; use futures_util::TryFutureExt;
let value = _client let value = _client
.capabilities .capabilities
.get_or_try_init(|| { .get_or_try_init(|| {
_client _client
.initialize() .initialize()
.map_ok(|response| response.capabilities) .map_ok(|response| response.capabilities)
}) })
.await; .await;
value.expect("failed to initialize capabilities"); value.expect("failed to initialize capabilities");
// next up, notify<initialized> // next up, notify<initialized>
_client _client
.notify::<lsp::notification::Initialized>(lsp::InitializedParams {}) .notify::<lsp::notification::Initialized>(lsp::InitializedParams {})
.await .await
.unwrap(); .unwrap();
initialize_notify.notify_one(); initialize_notify.notify_one();
}); });
entry.insert((id, client.clone())); entry.insert((id, client.clone()));
Ok(client) Ok(client)
}
} }
} else {
Err(Error::LspNotDefined)
} }
} }

View File

@ -64,11 +64,16 @@ pub fn start(
let transport = Arc::new(transport); let transport = Arc::new(transport);
tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::recv(
transport.clone(),
server_stdout,
client_tx.clone(),
));
tokio::spawn(Self::err(transport.clone(), server_stderr)); tokio::spawn(Self::err(transport.clone(), server_stderr));
tokio::spawn(Self::send( tokio::spawn(Self::send(
transport, transport,
server_stdin, server_stdin,
client_tx,
client_rx, client_rx,
notify.clone(), notify.clone(),
)); ));
@ -269,6 +274,7 @@ async fn err(_transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) {
async fn send( async fn send(
transport: Arc<Self>, transport: Arc<Self>,
mut server_stdin: BufWriter<ChildStdin>, mut server_stdin: BufWriter<ChildStdin>,
mut client_tx: UnboundedSender<(usize, jsonrpc::Call)>,
mut client_rx: UnboundedReceiver<Payload>, mut client_rx: UnboundedReceiver<Payload>,
initialize_notify: Arc<Notify>, initialize_notify: Arc<Notify>,
) { ) {
@ -303,6 +309,22 @@ fn is_initialize(payload: &Payload) -> bool {
_ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe _ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe
// server successfully initialized // server successfully initialized
is_pending = false; is_pending = false;
use lsp_types::notification::Notification;
// Hack: inject an initialized notification so we trigger code that needs to happen after init
let notification = ServerMessage::Call(jsonrpc::Call::Notification(jsonrpc::Notification {
jsonrpc: None,
method: lsp_types::notification::Initialized::METHOD.to_string(),
params: jsonrpc::Params::None,
}));
match transport.process_server_message(&mut client_tx, notification).await {
Ok(_) => {}
Err(err) => {
error!("err: <- {:?}", err);
}
}
// drain the pending queue and send payloads to server // drain the pending queue and send payloads to server
for msg in pending_messages.drain(..) { for msg in pending_messages.drain(..) {
log::info!("Draining pending message {:?}", msg); log::info!("Draining pending message {:?}", msg);
@ -317,6 +339,11 @@ fn is_initialize(payload: &Payload) -> bool {
msg = client_rx.recv() => { msg = client_rx.recv() => {
if let Some(msg) = msg { if let Some(msg) = msg {
if is_pending && !is_initialize(&msg) { if is_pending && !is_initialize(&msg) {
// ignore notifications
if let Payload::Notification(_) = msg {
continue;
}
log::info!("Language server not initialized, delaying request"); log::info!("Language server not initialized, delaying request");
pending_messages.push(msg); pending_messages.push(msg);
} else { } else {

View File

@ -275,6 +275,37 @@ pub async fn handle_language_server_message(
}; };
match notification { match notification {
Notification::Initialized => {
let language_server =
match self.editor.language_servers.get_by_id(server_id) {
Some(language_server) => language_server,
None => {
warn!("can't find language server with id `{}`", server_id);
return;
}
};
let docs = self.editor.documents().filter(|doc| {
doc.language_server().map(|server| server.id()) == Some(server_id)
});
// trigger textDocument/didOpen for docs that are already open
for doc in docs {
// TODO: extract and share with editor.open
let language_id = doc
.language()
.and_then(|s| s.split('.').last()) // source.rust
.map(ToOwned::to_owned)
.unwrap_or_default();
tokio::spawn(language_server.text_document_did_open(
doc.url().unwrap(),
doc.version(),
doc.text(),
language_id,
));
}
}
Notification::PublishDiagnostics(params) => { Notification::PublishDiagnostics(params) => {
let path = params.uri.to_file_path().unwrap(); let path = params.uri.to_file_path().unwrap();
let doc = self.editor.document_by_path_mut(&path); let doc = self.editor.document_by_path_mut(&path);

View File

@ -255,20 +255,21 @@ pub fn open(&mut self, path: PathBuf, action: Action) -> Result<DocumentId, Erro
.and_then(|language| self.language_servers.get(language).ok()); .and_then(|language| self.language_servers.get(language).ok());
if let Some(language_server) = language_server { if let Some(language_server) = language_server {
doc.set_language_server(Some(language_server.clone()));
let language_id = doc let language_id = doc
.language() .language()
.and_then(|s| s.split('.').last()) // source.rust .and_then(|s| s.split('.').last()) // source.rust
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.unwrap_or_default(); .unwrap_or_default();
// TODO: this now races with on_init code if the init happens too quickly
tokio::spawn(language_server.text_document_did_open( tokio::spawn(language_server.text_document_did_open(
doc.url().unwrap(), doc.url().unwrap(),
doc.version(), doc.version(),
doc.text(), doc.text(),
language_id, language_id,
)); ));
doc.set_language_server(Some(language_server));
} }
let id = self.documents.insert(doc); let id = self.documents.insert(doc);