Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
55e8f8e
Expand JsArrayBuffer/Uint8Array/BufferSource APIs
jasnell Apr 27, 2026
03ca3b8
No jsg::BufferSource in vfs
jasnell Apr 27, 2026
a211fa5
No jsg::BufferSource in modules-new
jasnell Apr 27, 2026
11c9761
No jsg::Buffersource in readAllBytes
jasnell Apr 27, 2026
183d16f
Move BYOB views away from using jsg::BufferSource
jasnell Apr 27, 2026
047285a
Moving writable away from jsg::BufferSource
jasnell Apr 27, 2026
aeabc8a
No BufferSource in writable-sink-adapter
jasnell Apr 27, 2026
6af6ecf
No jsg::BufferSource in readable-source-adapter
jasnell Apr 28, 2026
924f344
No jsg::BufferSource in encoding.c++
jasnell Apr 28, 2026
df79b3b
Move streams away from jsg::BufferSource and use jsg::Js* types
jasnell Apr 28, 2026
c7786ad
Apply a couple fixups
jasnell Apr 29, 2026
6d11646
Replace WritableImpl::Algorithms with UnderlyingSinkImpl
jasnell Apr 29, 2026
e73cda0
Restructure WritableStreamJsController setup
jasnell Apr 29, 2026
f25e026
Modify the WritableStreamJsController setup to accept sink impl
jasnell Apr 29, 2026
ce9af11
Add alternative internal writable stream implementation
jasnell Apr 29, 2026
0c2c055
Add PersistentContinuation mechanism to jsg/promise.h
jasnell Apr 30, 2026
8aa2a50
Use persistent promise continuations in standard.c++
jasnell Apr 30, 2026
36b491b
Implement a simple write fast path
jasnell Apr 30, 2026
2172a3b
Add writev optimization for js-backed streams
jasnell Apr 30, 2026
8f5376d
Implement writev optimization for WritableStreamJsController
jasnell Apr 30, 2026
2b8172b
Implement flush in WritableStreamJsController
jasnell Apr 30, 2026
6e347e7
Additional minor cleanups and improvements
jasnell Apr 30, 2026
9c1d805
Add UnderlyingSourceImpl
jasnell Apr 30, 2026
c04fb05
Implement InternalUnderlyingSourceImpl
jasnell Apr 30, 2026
e9777dc
Use the persistent continuation mechanism for pull
jasnell Apr 30, 2026
96eda57
Introduce TransformerImpl
jasnell Apr 30, 2026
317b0e7
Update transforms to use persistent continuations
jasnell Apr 30, 2026
0a5f738
Minor flags optimization in TransformStreamDefaultController
jasnell Apr 30, 2026
6868075
Add transformv optimization path
jasnell Apr 30, 2026
6cf81de
Remove the enable draining read autogate and legacy path
jasnell Apr 30, 2026
6b2602c
Use persistent continuations in writable pipeloop
jasnell Apr 30, 2026
74f4465
Fixup tests and asan failures with the new optimizations
jasnell May 1, 2026
5a90ea4
Fixup linting issues
jasnell May 1, 2026
fe2e1b1
Have draining read using persistent continuations
jasnell May 1, 2026
a2b7191
Update standard AllReader to use persistent continuations
jasnell May 1, 2026
3a1de14
Update drainingReads to use persistent continuations
jasnell May 1, 2026
49c7b6b
Replace bool args with WD_STRONG_BOOLS in streams
jasnell May 1, 2026
d884030
Update some out of date code comments
jasnell May 1, 2026
899cad6
Implement multiple fast-paths for internal-backed streams
jasnell May 1, 2026
e7be82c
Finish closing gaps between standard/internal stream impls
jasnell May 1, 2026
054fa2c
Switch some of the bool fields to packed structs
jasnell May 1, 2026
4534fe1
Address multiple review findings
jasnell May 1, 2026
08e1159
Apply additional cleanups
jasnell May 1, 2026
0dd2822
Add transformv algorithm to text encoder and decoder streams
jasnell May 2, 2026
aea8f4b
Additioanl streams cleanups/improvements
jasnell May 2, 2026
b3607fa
Improve ReadableStream.from and async generator support
jasnell May 2, 2026
3c82aec
Switch from js.tryCatch to JSG_TRY/JSG_CATCH
jasnell May 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ jsg::Promise<jsg::Ref<ExecOutput>> ExecProcess::output(jsg::Lock& js) {
stdoutPromise =
stream->getController()
.readAllBytes(js, IoContext::current().getLimitEnforcer().getBufferingLimit())
.then(js, [](jsg::Lock&, jsg::BufferSource bytes) {
return kj::heapArray(bytes.asArrayPtr());
.then(js, [](jsg::Lock& js, jsg::JsRef<jsg::JsArrayBuffer> bytes) {
return bytes.getHandle(js).copy();
});
}

Expand All @@ -165,8 +165,8 @@ jsg::Promise<jsg::Ref<ExecOutput>> ExecProcess::output(jsg::Lock& js) {
"Cannot call output() after stderr has started being consumed.");
stderrPromise = stream->getController()
.readAllBytes(js, kj::maxValue)
.then(js, [](jsg::Lock&, jsg::BufferSource bytes) {
return kj::heapArray(bytes.asArrayPtr());
.then(js, [](jsg::Lock& js, jsg::JsRef<jsg::JsArrayBuffer> bytes) {
return bytes.getHandle(js).copy();
});
}

Expand Down Expand Up @@ -525,7 +525,7 @@ jsg::Promise<jsg::Ref<ExecProcess>> Container::exec(
KJ_CASE_ONEOF(readable, jsg::Ref<ReadableStream>) {
auto sink = newSystemStream(kj::mv(stdinWriter), StreamEncoding::IDENTITY, ioContext);
auto pipePromise =
(ioContext.waitForDeferredProxy(readable->pumpTo(js, kj::mv(sink), true)));
(ioContext.waitForDeferredProxy(readable->pumpTo(js, kj::mv(sink), End::YES)));
ioContext.addTask(pipePromise.attach(readable.addRef()));
}
// user sets "pipe"... they want to consume the API with the stdin WritableStream
Expand Down
10 changes: 6 additions & 4 deletions src/workerd/api/crypto/crypto.c++
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ void DigestStream::dispose(jsg::Lock& js) {
KJ_IF_SOME(ready, state.tryGet<Ready>()) {
auto reason = js.typeError("The DigestStream was disposed.");
ready.resolver.reject(js, reason);
state.init<StreamStates::Errored>(js.v8Ref<v8::Value>(reason));
state.init<StreamStates::Errored>(reason.addRef(js));
}
}
JSG_CATCH(exception) {
Expand Down Expand Up @@ -859,7 +859,7 @@ void DigestStream::abort(jsg::Lock& js, jsg::JsValue reason) {
// If the state is already closed or errored, then this is a non-op
KJ_IF_SOME(ready, state.tryGet<Ready>()) {
ready.resolver.reject(js, reason);
state.init<StreamStates::Errored>(js.v8Ref<v8::Value>(reason));
state.init<StreamStates::Errored>(reason.addRef(js));
}
}

Expand All @@ -870,7 +870,7 @@ jsg::Ref<DigestStream> DigestStream::constructor(jsg::Lock& js, Algorithm algori
interpretAlgorithmParam(kj::mv(algorithm)), kj::mv(paf.resolver), kj::mv(paf.promise));

// clang-format off
stream->getController().setup(js, UnderlyingSink{
auto sink = kj::heap<UnderlyingSinkImpl>(js, UnderlyingSink{
.write = [&stream = *stream](jsg::Lock& js, v8::Local<v8::Value> chunk, auto c) mutable {
return js.tryCatch([&] {
// Make sure what we got can be interpreted as bytes...
Expand Down Expand Up @@ -916,9 +916,11 @@ jsg::Ref<DigestStream> DigestStream::constructor(jsg::Lock& js, Algorithm algori
return js.resolvedPromise();
}, [&](jsg::Value exception) { return js.rejectedPromise<void>(kj::mv(exception)); });
}
}, kj::none);
}, StreamQueuingStrategy {});
// clang-format on

stream->getController().setup(js, kj::mv(sink));

return kj::mv(stream);
}

Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/eventsource.c++
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ void EventSource::run(jsg::Lock& js,
// pumping the body into an EventSourceSink until the body is closed, canceled,
// or errored.
context
.awaitIo(
js, processBody(context, readable->pumpTo(js, kj::heap<EventSourceSink>(*this), true)))
.awaitIo(js,
processBody(context, readable->pumpTo(js, kj::heap<EventSourceSink>(*this), End::YES)))
.then(js, kj::mv(onSuccess), kj::mv(onFailed));
}

Expand Down
58 changes: 30 additions & 28 deletions src/workerd/api/filesystem.c++
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ void FileSystemModule::close(jsg::Lock& js, int fd) {
}

uint32_t FileSystemModule::write(
jsg::Lock& js, int fd, kj::Array<jsg::BufferSource> data, WriteOptions options) {
jsg::Lock& js, int fd, kj::Array<jsg::JsRef<jsg::JsBufferSource>> data, WriteOptions options) {
auto& vfs = workerd::VirtualFileSystem::current(js);

KJ_IF_SOME(opened, vfs.tryGetFd(js, fd)) {
Expand All @@ -513,7 +513,7 @@ uint32_t FileSystemModule::write(
auto pos = getPosition(js, opened.addRef(), file.addRef(), options);
uint32_t total = 0;
for (auto& buffer: data) {
KJ_SWITCH_ONEOF(file->write(js, pos, buffer)) {
KJ_SWITCH_ONEOF(file->write(js, pos, buffer.getHandle(js).asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
pos += written;
total += written;
Expand Down Expand Up @@ -546,7 +546,7 @@ uint32_t FileSystemModule::write(
}

uint32_t FileSystemModule::read(
jsg::Lock& js, int fd, kj::Array<jsg::BufferSource> data, WriteOptions options) {
jsg::Lock& js, int fd, kj::Array<jsg::JsRef<jsg::JsUint8Array>> data, WriteOptions options) {
auto& vfs = workerd::VirtualFileSystem::current(js);
KJ_IF_SOME(opened, vfs.tryGetFd(js, fd)) {
if (!opened->read) {
Expand All @@ -561,11 +561,12 @@ uint32_t FileSystemModule::read(
}
uint32_t total = 0;
for (auto& buffer: data) {
auto read = file->read(js, pos, buffer);
auto handle = buffer.getHandle(js);
auto read = file->read(js, pos, handle.asArrayPtr());
// if read is less than the size of the buffer, we are at EOF.
pos += read;
total += read;
if (read < buffer.size()) break;
if (read < handle.size()) break;
}
// We only update the position if the options.position is not set.
if (options.position == kj::none) {
Expand All @@ -588,7 +589,7 @@ uint32_t FileSystemModule::read(
}
}

jsg::BufferSource FileSystemModule::readAll(jsg::Lock& js, kj::OneOf<int, FilePath> pathOrFd) {
jsg::JsUint8Array FileSystemModule::readAll(jsg::Lock& js, kj::OneOf<int, FilePath> pathOrFd) {
auto& vfs = workerd::VirtualFileSystem::current(js);
KJ_SWITCH_ONEOF(pathOrFd) {
KJ_CASE_ONEOF(path, FilePath) {
Expand All @@ -597,8 +598,8 @@ jsg::BufferSource FileSystemModule::readAll(jsg::Lock& js, kj::OneOf<int, FilePa
KJ_SWITCH_ONEOF(node) {
KJ_CASE_ONEOF(file, kj::Rc<workerd::File>) {
KJ_SWITCH_ONEOF(file->readAllBytes(js)) {
KJ_CASE_ONEOF(data, jsg::BufferSource) {
return kj::mv(data);
KJ_CASE_ONEOF(data, jsg::JsUint8Array) {
return data;
}
KJ_CASE_ONEOF(err, workerd::FsError) {
throwFsError(js, err, "readAll"_kj);
Expand Down Expand Up @@ -635,8 +636,8 @@ jsg::BufferSource FileSystemModule::readAll(jsg::Lock& js, kj::OneOf<int, FilePa
});

KJ_SWITCH_ONEOF(file->readAllBytes(js)) {
KJ_CASE_ONEOF(data, jsg::BufferSource) {
return kj::mv(data);
KJ_CASE_ONEOF(data, jsg::JsUint8Array) {
return data;
}
KJ_CASE_ONEOF(err, workerd::FsError) {
throwFsError(js, err, "freadAll"_kj);
Expand All @@ -656,7 +657,7 @@ jsg::BufferSource FileSystemModule::readAll(jsg::Lock& js, kj::OneOf<int, FilePa

uint32_t FileSystemModule::writeAll(jsg::Lock& js,
kj::OneOf<int, FilePath> pathOrFd,
jsg::BufferSource data,
jsg::JsBufferSource data,
WriteAllOptions options) {
auto& vfs = workerd::VirtualFileSystem::current(js);

Expand Down Expand Up @@ -684,7 +685,7 @@ uint32_t FileSystemModule::writeAll(jsg::Lock& js,
// If the append option is set, we will write to the end of the file
// instead of overwriting it.
if (options.append) {
KJ_SWITCH_ONEOF(file->write(js, stat.size, data)) {
KJ_SWITCH_ONEOF(file->write(js, stat.size, data.asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
return written;
}
Expand All @@ -696,7 +697,7 @@ uint32_t FileSystemModule::writeAll(jsg::Lock& js,
}

// Otherwise, we overwrite the entire file.
KJ_SWITCH_ONEOF(file->writeAll(js, data)) {
KJ_SWITCH_ONEOF(file->writeAll(js, data.asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
return written;
}
Expand Down Expand Up @@ -737,7 +738,7 @@ uint32_t FileSystemModule::writeAll(jsg::Lock& js,
node::THROW_ERR_UV_EPERM(js, "writeAll"_kj);
}
auto file = workerd::File::newWritable(js, static_cast<uint32_t>(data.size()));
KJ_SWITCH_ONEOF(file->writeAll(js, data)) {
KJ_SWITCH_ONEOF(file->writeAll(js, data.asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
KJ_IF_SOME(err, dir->add(js, relative.name, kj::mv(file))) {
throwFsError(js, err, "writeAll"_kj);
Expand Down Expand Up @@ -788,14 +789,14 @@ uint32_t FileSystemModule::writeAll(jsg::Lock& js,
// If the file descriptor was opened in append mode, or if the append option
// is set, then we'll use write instead to append to the end of the file.
if (opened->append || options.append) {
return write(js, fd, kj::arr(kj::mv(data)),
return write(js, fd, kj::arr(data.addRef(js)),
{
.position = stat.size,
});
}

// Otherwise, we overwrite the entire file.
KJ_SWITCH_ONEOF(file->writeAll(js, data)) {
KJ_SWITCH_ONEOF(file->writeAll(js, data.asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
return written;
}
Expand Down Expand Up @@ -1890,9 +1891,8 @@ jsg::Ref<Blob> FileSystemModule::openAsBlob(
}
KJ_CASE_ONEOF(file, kj::Rc<workerd::File>) {
KJ_SWITCH_ONEOF(file->readAllBytes(js)) {
KJ_CASE_ONEOF(bytes, jsg::BufferSource) {
return js.alloc<Blob>(
js, bytes.getJsHandle(js), kj::mv(options.type).orDefault(kj::String()));
KJ_CASE_ONEOF(bytes, jsg::JsUint8Array) {
return js.alloc<Blob>(js, bytes, kj::mv(options.type).orDefault(kj::String()));
}
KJ_CASE_ONEOF(err, workerd::FsError) {
throwFsError(js, err, "open"_kj);
Expand Down Expand Up @@ -2557,10 +2557,10 @@ jsg::Promise<jsg::Ref<File>> FileSystemFileHandle::getFile(
KJ_CASE_ONEOF(file, kj::Rc<workerd::File>) {
auto stat = file->stat(js);
KJ_SWITCH_ONEOF(file->readAllBytes(js)) {
KJ_CASE_ONEOF(bytes, jsg::BufferSource) {
KJ_CASE_ONEOF(bytes, jsg::JsUint8Array) {
return js.resolvedPromise(
js.alloc<File>(js, bytes.getJsHandle(js), jsg::USVString(kj::str(getName(js))),
kj::String(), (stat.lastModified - kj::UNIX_EPOCH) / kj::MILLISECONDS));
js.alloc<File>(js, bytes, jsg::USVString(kj::str(getName(js))), kj::String(),
(stat.lastModified - kj::UNIX_EPOCH) / kj::MILLISECONDS));
}
KJ_CASE_ONEOF(err, workerd::FsError) {
return js.rejectedPromise<jsg::Ref<File>>(
Expand Down Expand Up @@ -2713,7 +2713,9 @@ jsg::Promise<jsg::Ref<FileSystemWritableFileStream>> FileSystemFileHandle::creat
return js.resolvedPromise();
}, [&](jsg::Value exception) { return js.rejectedPromise<void>(kj::mv(exception)); });
};
stream->getController().setup(js, kj::mv(sink), kj::none);

stream->getController().setup(
js, kj::heap<UnderlyingSinkImpl>(js, kj::mv(sink), StreamQueuingStrategy{}));

return js.resolvedPromise(kj::mv(stream));
}
Expand All @@ -2724,7 +2726,7 @@ FileSystemWritableFileStream::FileSystemWritableFileStream(
sharedState(kj::mv(sharedState)) {}

jsg::Promise<void> FileSystemWritableFileStream::write(jsg::Lock& js,
kj::OneOf<jsg::Ref<Blob>, jsg::BufferSource, kj::String, WriteParams> data,
kj::OneOf<jsg::Ref<Blob>, jsg::JsBufferSource, kj::String, WriteParams> data,
const jsg::TypeHandler<jsg::Ref<jsg::DOMException>>& deHandler) {
JSG_REQUIRE(!getController().isLockedToWriter(), TypeError,
"Cannot write to a stream that is locked to a reader");
Expand All @@ -2750,8 +2752,8 @@ jsg::Promise<void> FileSystemWritableFileStream::writeImpl(jsg::Lock& js,
}
}
}
KJ_CASE_ONEOF(buffer, jsg::BufferSource) {
KJ_SWITCH_ONEOF(inner->write(js, state.position, buffer)) {
KJ_CASE_ONEOF(buffer, jsg::JsBufferSource) {
KJ_SWITCH_ONEOF(inner->write(js, state.position, buffer.asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
state.position += written;
}
Expand Down Expand Up @@ -2799,8 +2801,8 @@ jsg::Promise<void> FileSystemWritableFileStream::writeImpl(jsg::Lock& js,
}
KJ_UNREACHABLE;
}
KJ_CASE_ONEOF(buffer, jsg::BufferSource) {
KJ_SWITCH_ONEOF(inner->write(js, offset, buffer)) {
KJ_CASE_ONEOF(buffer, jsg::JsRef<jsg::JsBufferSource>) {
KJ_SWITCH_ONEOF(inner->write(js, offset, buffer.getHandle(js).asArrayPtr())) {
KJ_CASE_ONEOF(written, uint32_t) {
state.position = offset + written;
return js.resolvedPromise();
Expand Down
12 changes: 6 additions & 6 deletions src/workerd/api/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ class FileSystemModule final: public jsg::Object {
JSG_STRUCT(position);
};

uint32_t write(jsg::Lock& js, int fd, kj::Array<jsg::BufferSource> data, WriteOptions options);
uint32_t read(jsg::Lock& js, int fd, kj::Array<jsg::BufferSource> data, WriteOptions options);
uint32_t write(jsg::Lock& js, int fd, kj::Array<jsg::JsRef<jsg::JsBufferSource>> data, WriteOptions options);
uint32_t read(jsg::Lock& js, int fd, kj::Array<jsg::JsRef<jsg::JsUint8Array>> data, WriteOptions options);

jsg::BufferSource readAll(jsg::Lock& js, kj::OneOf<int, FilePath> pathOrFd);
jsg::JsUint8Array readAll(jsg::Lock& js, kj::OneOf<int, FilePath> pathOrFd);

struct WriteAllOptions {
bool exclusive;
Expand All @@ -116,7 +116,7 @@ class FileSystemModule final: public jsg::Object {

uint32_t writeAll(jsg::Lock& js,
kj::OneOf<int, FilePath> pathOrFd,
jsg::BufferSource data,
jsg::JsBufferSource data,
WriteAllOptions options);

struct RenameOrCopyOptions {
Expand Down Expand Up @@ -298,12 +298,12 @@ struct FileSystemFileWriteParams {
jsg::Optional<uint32_t> position;
// Yes, wrapping the kj::Maybe with a jsg::Optional is intentional here. We need to
// be able to accept null or undefined values and handle them per the spec.
jsg::Optional<kj::Maybe<kj::OneOf<jsg::Ref<Blob>, jsg::BufferSource, kj::String>>> data;
jsg::Optional<kj::Maybe<kj::OneOf<jsg::Ref<Blob>, jsg::JsRef<jsg::JsBufferSource>, kj::String>>> data;
JSG_STRUCT(type, size, position, data);
};

using FileSystemWritableData =
kj::OneOf<jsg::Ref<Blob>, jsg::BufferSource, kj::String, FileSystemFileWriteParams>;
kj::OneOf<jsg::Ref<Blob>, jsg::JsBufferSource, kj::String, FileSystemFileWriteParams>;

class FileSystemFileHandle final: public FileSystemHandle {
public:
Expand Down
10 changes: 5 additions & 5 deletions src/workerd/api/html-rewriter.c++
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ kj::Promise<void> Rewriter::replacerThunkPromise(

auto streamSink = kj::heap<ReplacerStreamSink>(sink, registration.isHtml);
return ioContext.waitForDeferredProxy(
registration.stream->pumpTo(lock, kj::mv(streamSink), true));
registration.stream->pumpTo(lock, kj::mv(streamSink), End::YES));
});
}

Expand Down Expand Up @@ -1294,10 +1294,10 @@ jsg::Ref<Response> HTMLRewriter::transform(jsg::Lock& js, jsg::Ref<Response> res
// after we know that nothing else (like invalid encoding) could cause an exception.

// Drive and flush the parser asynchronously.
ioContext.addTask(
ioContext
.waitForDeferredProxy(KJ_ASSERT_NONNULL(maybeInput)->pumpTo(js, kj::mv(rewriter), true))
.catch_([](kj::Exception&& e) {
ioContext.addTask(ioContext
.waitForDeferredProxy(
KJ_ASSERT_NONNULL(maybeInput)->pumpTo(js, kj::mv(rewriter), End::YES))
.catch_([](kj::Exception&& e) {
// Errors in pumpTo() are already propagated to the destination stream. We don't want to
// throw them from here since it'll cause an uncaught exception to be reported via taskFailed(),
// which would poison the IoContext even though the application may have handled the error.
Expand Down
Loading
Loading