Skip to content

Commit 2988cdf

Browse files
committed
feat: re-implement and improve persistent sessions in TaskManager
- Overhaul session management and process termination logic. - Resolve bugs in session teardown and task handling. - Improve test coverage and stability for background tasks.
1 parent 067a2bd commit 2988cdf

5 files changed

Lines changed: 272 additions & 370 deletions

File tree

R/languageserver.R

Lines changed: 35 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,18 @@ LanguageServer <- R6::R6Class("LanguageServer",
2323
inputcon = NULL,
2424
outputcon = NULL,
2525
exit_flag = NULL,
26-
2726
documents = NULL,
2827
workspace = NULL,
29-
3028
processId = NULL,
3129
rootUri = NULL,
3230
rootPath = NULL,
3331
initializationOptions = NULL,
3432
ClientCapabilities = NULL,
3533
ServerCapabilities = NULL,
36-
3734
diagnostics_task_manager = NULL,
3835
parse_task_manager = NULL,
3936
resolve_task_manager = NULL,
40-
4137
pending_replies = NULL,
42-
4338
initialize = function(host, port) {
4439
if (is.null(port)) {
4540
logger$info("connection type: stdio")
@@ -58,34 +53,23 @@ LanguageServer <- R6::R6Class("LanguageServer",
5853
self$inputcon <- inputcon
5954
self$outputcon <- outputcon
6055

61-
cpus <- parallel::detectCores()
62-
# Performance optimization: Allow more workers and scale with CPU count
63-
# Old default: min(max(floor(cpus / 2), 1), 3) - capped at 3
64-
# New default: min(max(cpus - 1, 2), 8) - scale up to 8 workers
65-
default_pool_size <- min(max(cpus - 1, 2), 8)
66-
pool_size <- as.integer(
67-
Sys.getenv("R_LANGSVR_POOL_SIZE", default_pool_size))
68-
69-
# parse pool - increase size for better throughput
70-
# Parse operations are CPU-bound and can benefit from parallelism
71-
parse_pool <- if (pool_size > 0) SessionPool$new(pool_size, "parse") else NULL
72-
# diagnostics is slower, so use a separate pool
73-
# Diagnostics can use slightly fewer workers since they're I/O heavy
74-
diagnostics_pool_size <- min(max(floor(pool_size * 0.75), 1), pool_size)
75-
diagnostics_pool <- if (pool_size > 0) SessionPool$new(diagnostics_pool_size, "diagnostics") else NULL
76-
77-
self$parse_task_manager <- TaskManager$new("parse", parse_pool)
78-
self$diagnostics_task_manager <- TaskManager$new("diagnostics", diagnostics_pool)
56+
self$parse_task_manager <- TaskManager$new(
57+
"parse",
58+
use_session = TRUE, process_recent_first = TRUE,
59+
)
60+
self$diagnostics_task_manager <- TaskManager$new(
61+
"diagnostics",
62+
use_session = TRUE, process_recent_first = TRUE
63+
)
7964

8065
# no pool for resolve task
8166
# resolve task require a new session for every task
82-
self$resolve_task_manager <- TaskManager$new("resolve", NULL)
67+
self$resolve_task_manager <- TaskManager$new("resolve")
8368

8469
self$pending_replies <- collections::dict()
8570

8671
super$initialize()
8772
},
88-
8973
process_events = function() {
9074
self$diagnostics_task_manager$run_tasks()
9175
self$diagnostics_task_manager$check_tasks()
@@ -97,11 +81,8 @@ LanguageServer <- R6::R6Class("LanguageServer",
9781
self$workspace$poll_namespace_file()
9882
}
9983
},
100-
101-
text_sync = function(
102-
# TODO: move it to Workspace!?
103-
uri, document, run_lintr = FALSE, parse = FALSE, delay = 0) {
104-
84+
text_sync = function( # TODO: move it to Workspace!?
85+
uri, document, run_lintr = FALSE, parse = FALSE, delay = 0) {
10586
if (!self$pending_replies$has(uri)) {
10687
self$pending_replies$set(uri, list(
10788
`textDocument/documentSymbol` = collections::queue(),
@@ -131,7 +112,6 @@ LanguageServer <- R6::R6Class("LanguageServer",
131112
)
132113
}
133114
},
134-
135115
check_connection = function() {
136116
if (!isOpen(self$inputcon)) {
137117
self$exit_flag <- TRUE
@@ -142,13 +122,11 @@ LanguageServer <- R6::R6Class("LanguageServer",
142122
self$exit_flag <- TRUE
143123
}
144124
},
145-
146125
write_text = function(text) {
147126
# we have made effort to ensure that text is utf-8
148127
# so text is printed as is
149128
writeLines(text, self$outputcon, sep = "", useBytes = TRUE)
150129
},
151-
152130
read_line = function() {
153131
if (self$tcp) {
154132
if (socketSelect(list(self$inputcon), timeout = 0)) {
@@ -160,7 +138,6 @@ LanguageServer <- R6::R6Class("LanguageServer",
160138
stdin_read_line()
161139
}
162140
},
163-
164141
read_char = function(n) {
165142
if (self$tcp) {
166143
out <- readChar(self$inputcon, n, useBytes = TRUE)
@@ -170,24 +147,34 @@ LanguageServer <- R6::R6Class("LanguageServer",
170147
stdin_read_char(n)
171148
}
172149
},
173-
174150
run = function() {
151+
on.exit(
152+
{
153+
if (!is.null(self$parse_task_manager)) self$parse_task_manager$stop()
154+
if (!is.null(self$diagnostics_task_manager)) self$diagnostics_task_manager$stop()
155+
if (!is.null(self$resolve_task_manager)) self$resolve_task_manager$stop()
156+
},
157+
add = TRUE
158+
)
175159
while (TRUE) {
176-
ret <- tryCatchStack({
177-
if (isTRUE(self$exit_flag)) {
178-
logger$info("exiting")
179-
break
180-
}
160+
ret <- tryCatchStack(
161+
{
162+
if (isTRUE(self$exit_flag)) {
163+
logger$info("exiting")
164+
break
165+
}
181166

182-
self$process_events()
167+
self$process_events()
183168

184-
data <- self$fetch(blocking = FALSE)
185-
if (is.null(data)) {
186-
Sys.sleep(0.1)
187-
next
188-
}
189-
self$handle_raw(data)
190-
}, error = function(e) e)
169+
data <- self$fetch(blocking = FALSE)
170+
if (is.null(data)) {
171+
Sys.sleep(0.1)
172+
next
173+
}
174+
self$handle_raw(data)
175+
},
176+
error = function(e) e
177+
)
191178
if (inherits(ret, "error")) {
192179
logger$error(ret)
193180
logger$error("exiting")

0 commit comments

Comments
 (0)