Skip to content

Commit bd884e0

Browse files
committed
feat(graalvm): readable web streams implementation
Signed-off-by: Dario Valdespino <dvaldespino00@gmail.com>
1 parent 157ae71 commit bd884e0

5 files changed

Lines changed: 462 additions & 0 deletions

File tree

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/WebStreamsIntrinsic.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import elide.core.api.Symbolic.Unresolved
2323
import elide.runtime.core.DelicateElideApi
2424
import elide.runtime.gvm.api.Intrinsic
2525
import elide.runtime.gvm.internals.intrinsics.js.AbstractJsIntrinsic
26+
import elide.runtime.gvm.internals.intrinsics.js.webstreams.readable.ReadableStreamSource
2627
import elide.runtime.gvm.js.JsError
2728
import elide.runtime.gvm.js.JsSymbol.JsSymbols.asPublicJsSymbol
2829
import elide.runtime.intrinsics.GuestIntrinsic
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
package elide.runtime.gvm.internals.intrinsics.js.webstreams.readable
2+
3+
import com.google.common.util.concurrent.AtomicDouble
4+
import java.util.concurrent.ConcurrentLinkedQueue
5+
import java.util.concurrent.atomic.AtomicInteger
6+
import java.util.concurrent.atomic.AtomicReference
7+
import elide.runtime.gvm.internals.intrinsics.js.webstreams.readable.ReadableDefaultStream.ReadResult
8+
import elide.runtime.intrinsics.js.CompletableJsPromise
9+
import elide.runtime.intrinsics.js.JsPromise
10+
import elide.runtime.intrinsics.js.err.TypeError
11+
import elide.vm.annotations.Polyglot
12+
13+
/** Shorthand for a completable promise used to define read queues. */
14+
private typealias DefaultReadRequest = CompletableJsPromise<ReadResult>
15+
16+
/**
17+
* Specialized implementation for a 'default' stream, i.e. a [ReadableStream] with a `ReadableStreamDefaultController`
18+
* and `ReadableStreamDefaultReader`.
19+
*/
20+
internal class ReadableDefaultStream internal constructor(
21+
override val source: ReadableStreamSource,
22+
override val strategy: ReadableStreamQueuingStrategy,
23+
) : ReadableStream() {
24+
/**
25+
* Encapsulates the result of a read; [done] indicates whether the [chunk] is the final value that will be available
26+
* from the source.
27+
*/
28+
data class ReadResult(val chunk: Any?, val done: Boolean)
29+
30+
/** An arbitrary [chunk] value with an attached [size], as calculated by the stream's queueing strategy. */
31+
data class SizedChunk(val chunk: Any?, val size: Double)
32+
33+
/**
34+
* [`ReadableStreamDefaultController`](https://streams.spec.whatwg.org/#rs-default-controller-class) spec
35+
* implementation, including all the exposed fields and methods in the spec.
36+
*/
37+
private inner class ReadableStreamDefaultController : ReadableStreamController {
38+
/** Atomic controller state flag. */
39+
private val controllerState = AtomicInteger(CONTROLLER_STARTED)
40+
41+
/** Desired total size for the inbound queue, used for backpressure control. */
42+
val desiredSize: Double?
43+
@Polyglot get() = when (streamState.get()) {
44+
STREAM_CLOSED -> 0.0
45+
STREAM_READABLE -> strategy.highWaterMark() - sourceChunksSize.get()
46+
else -> null
47+
}
48+
49+
init {
50+
// setup the source
51+
runCatching { source.start(this) }
52+
.onFailure(::error)
53+
.onSuccess {
54+
controllerState.set(CONTROLLER_STARTED)
55+
if (shouldPull()) pull()
56+
}
57+
}
58+
59+
/** Whether the controller should request new chunks from the underlying [source]. */
60+
private fun shouldPull(): Boolean {
61+
val state = controllerState.get()
62+
return when {
63+
state == CONTROLLER_CLOSING || streamState.get() != STREAM_READABLE -> false
64+
state == CONTROLLER_UNINITIALIZED -> false
65+
locked && readQueue.isNotEmpty() -> true
66+
(desiredSize ?: 0.0) > 0.0 -> true
67+
else -> false
68+
}
69+
}
70+
71+
/** Cancel the controller and the underlying source. Called by the stream during [ReadableDefaultStream.cancel]. */
72+
fun cancel(reason: Any? = null): JsPromise<Unit> {
73+
sourceChunks.clear()
74+
sourceChunksSize.set(0.0)
75+
return source.cancel(reason)
76+
}
77+
78+
/**
79+
* Close the controller and the associated stream. If there are undelivered chunks, the stream will not be closed
80+
* until they are claimed.
81+
*/
82+
@Polyglot fun close() {
83+
if (controllerState.getAndSet(CONTROLLER_CLOSING) == CONTROLLER_CLOSING) return
84+
// don't close the stream if there are undelivered elements
85+
if (sourceChunks.isEmpty()) this@ReadableDefaultStream.close()
86+
}
87+
88+
/**
89+
* Shut down the controller and associated stream with the given error [reason]. Unlike [close], this method does
90+
* not wait for undelivered elements to be claimed, all unread data will be lost.
91+
*/
92+
@Polyglot fun error(reason: Any? = null) {
93+
if (streamState.get() != STREAM_READABLE) return
94+
sourceChunks.clear()
95+
sourceChunksSize.set(0.0)
96+
this@ReadableDefaultStream.error(reason)
97+
}
98+
99+
/**
100+
* Enqueue a new chunk, making it available immediately for readers. If any pending read requests are found, the
101+
* chunk will be delivered directly without using the queue.
102+
*/
103+
@Polyglot fun enqueue(chunk: Any? = null) {
104+
// check the controller isn't closing or closed
105+
if (controllerState.get() == CONTROLLER_CLOSING || streamState.get() != STREAM_READABLE) {
106+
throw TypeError.create("Controller is closing or stream is not readable")
107+
}
108+
109+
// consume queued reads if possible
110+
readQueue.poll()?.let {
111+
it.resolve(ReadResult(chunk, false))
112+
return
113+
}
114+
115+
try {
116+
// enqueue the sized unread chunk
117+
val size = strategy.size(chunk)
118+
sourceChunks.add(SizedChunk(chunk, size))
119+
sourceChunksSize.addAndGet(size)
120+
if (shouldPull()) pull()
121+
} catch (cause: Throwable) {
122+
error(cause)
123+
}
124+
}
125+
126+
/** Pull from the underlying source if the backpressure controls allow it. */
127+
fun pullIfNeeded() {
128+
if (shouldPull()) pull()
129+
}
130+
131+
/**
132+
* Pull from the underlying source to obtain new chunks. If a pull is already in progress, a 'pull again' flag is
133+
* set to trigger a new pull after the current one.
134+
*/
135+
fun pull() {
136+
// if pulling, flag for retry, the queued read will be fulfilled
137+
if (controllerState.compareAndSet(CONTROLLER_PULLING, CONTROLLER_PULL_AGAIN)) return
138+
139+
// if idle, flag as pulling and pull from the source
140+
if (controllerState.compareAndSet(CONTROLLER_STARTED, CONTROLLER_PULLING)) {
141+
source.pull(this).then(
142+
onFulfilled = {
143+
if (controllerState.compareAndSet(CONTROLLER_PULL_AGAIN, CONTROLLER_PULLING)) pullIfNeeded()
144+
else controllerState.compareAndSet(CONTROLLER_PULLING, CONTROLLER_STARTED)
145+
},
146+
onCatch = ::error,
147+
)
148+
return
149+
}
150+
}
151+
152+
/**
153+
* Poll the inbound queue for available chunks, returning `null` if the queue is empty.
154+
*
155+
* If the controller is currently closing and the queue is emptied as a result of this operation, the controller
156+
* stream will be closed.
157+
*/
158+
fun poll(): SizedChunk? {
159+
val chunk = sourceChunks.poll()?.also { sourceChunksSize.addAndGet(-it.size) }
160+
161+
// finish closing if we removed the last chunk from the queue
162+
if (sourceChunks.isEmpty() && controllerState.compareAndSet(CONTROLLER_CLOSING, CONTROLLER_CLOSED))
163+
this@ReadableDefaultStream.close()
164+
165+
return chunk
166+
}
167+
}
168+
169+
170+
/**
171+
* [`ReadableStreamDefaultReader`](https://streams.spec.whatwg.org/#default-reader-class) spec
172+
* implementation, including all the exposed fields and methods in the spec.
173+
*/
174+
private inner class ReadableStreamDefaultReader : ReadableStreamReader {
175+
/** A promised that completes when the reader is closed, and rejects when the reader errors. */
176+
@Polyglot val closed: CompletableJsPromise<Unit> = CompletableJsPromise()
177+
178+
init {
179+
// early closure
180+
when (streamState.get()) {
181+
STREAM_CLOSED -> closed.resolve(Unit)
182+
STREAM_ERRORED -> closed.reject(errorCause.get())
183+
else -> Unit
184+
}
185+
}
186+
187+
/** Read a chunk from the stream, returning a promise that is fulfilled with the result. */
188+
@Polyglot fun read(): JsPromise<ReadResult> {
189+
if (closed.isDone) throw TypeError.create("Reader has been closed")
190+
return when (streamState.get()) {
191+
STREAM_READABLE -> {
192+
// consume queued chunks if available; if the stream was closed after polling, mark as 'done'
193+
controller.poll()?.let { return JsPromise.resolved(ReadResult(it, streamState.get() != STREAM_READABLE)) }
194+
195+
// enqueue read request and pull
196+
CompletableJsPromise<ReadResult>().also { request ->
197+
readQueue.add(request)
198+
controller.pullIfNeeded()
199+
}
200+
}
201+
202+
STREAM_ERRORED -> JsPromise.rejected(TypeError.create(errorCause.get().toString()))
203+
else -> JsPromise.resolved(ReadResult(null, true))
204+
}
205+
}
206+
207+
/**
208+
* Release this reader's lock on the stream, allowing a new reader to be acquired and invalidating this instance.
209+
* Pending reads will still complete normally.
210+
*/
211+
@Polyglot fun releaseLock() {
212+
if (closed.isDone) throw TypeError.create("Reader has already been released")
213+
closed.reject(TypeError.create("Reader lock was released"))
214+
reader.set(null)
215+
}
216+
217+
/** Cancel the stream for this reader. */
218+
@Polyglot fun cancel() {
219+
this@ReadableDefaultStream.cancel()
220+
}
221+
}
222+
223+
/**
224+
* Chunks are pushed by the source using the controller, and pulled by consumers using the reader; the queue can be
225+
* skipped if there are pending reads when a new chunk is pushed.
226+
*/
227+
private val sourceChunks: ConcurrentLinkedQueue<SizedChunk> = ConcurrentLinkedQueue()
228+
229+
/** Total size of the chunks in [sourceChunks], as calculated by the stream's queueing [strategy]. */
230+
private val sourceChunksSize = AtomicDouble(0.0)
231+
232+
/**
233+
* Reads are enqueued by consumers using the reader, and fulfilled by the source by pushing chunks with the
234+
* controller; the queue can be skipped if there are queued chunks when a read is requested.
235+
*/
236+
private val readQueue: ConcurrentLinkedQueue<DefaultReadRequest> = ConcurrentLinkedQueue()
237+
238+
/** Handle used by the [source] to push new chunks and control the stream. */
239+
private val controller = ReadableStreamDefaultController()
240+
241+
/**
242+
* Handle used by consumers to read chunks; if not `null`, the stream is considered 'locked' and must be released
243+
* before a new reader can be acquired.
244+
*/
245+
private val reader = AtomicReference<ReadableStreamDefaultReader>()
246+
247+
/** Stored cause for the stream's failure. */
248+
private val errorCause = AtomicReference<Any>()
249+
250+
@get:Polyglot override val locked: Boolean get() = reader.get() != null
251+
252+
/** Close the stream, preventing new chunks from being enqueued by the source or read by consumers. */
253+
private fun close() {
254+
if (!streamState.compareAndSet(STREAM_READABLE, STREAM_CLOSED)) throw TypeError.create("Stream is not readable")
255+
reader.getAndSet(null)?.closed?.resolve(Unit) ?: return
256+
257+
// close unfulfilled requests
258+
while (true) readQueue.poll()?.resolve(ReadResult(null, true)) ?: break
259+
}
260+
261+
/**
262+
* Cancel the stream with an error; if locked, the reader will also be closed with the same cause. All pending read
263+
* requests will be rejected with [reason].
264+
*/
265+
private fun error(reason: Any? = null) {
266+
if (!streamState.compareAndSet(STREAM_READABLE, STREAM_ERRORED)) throw TypeError.create("Stream is not readable")
267+
errorCause.set(reason)
268+
269+
reader.get()?.closed?.reject(reason) ?: return
270+
while (true) readQueue.poll()?.reject(reason) ?: break
271+
}
272+
273+
@Polyglot override fun cancel(reason: Any?): JsPromise<Unit> {
274+
return when (streamState.get()) {
275+
STREAM_CLOSED -> JsPromise.resolved(Unit)
276+
STREAM_ERRORED -> JsPromise.rejected(TypeError.create(errorCause.get().toString()))
277+
else -> {
278+
close()
279+
controller.cancel(reason)
280+
}
281+
}
282+
}
283+
284+
@Polyglot override fun getReader(options: Any?): ReadableStreamReader {
285+
if (reader.getAndUpdate { it ?: ReadableStreamDefaultReader() } != null) throw TypeError.create("Stream is locked")
286+
return reader.get()
287+
}
288+
}

0 commit comments

Comments
 (0)