From 0becf762fcd366df3671fb27f7904a47b2656130 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 23 Mar 2026 11:33:54 -0700 Subject: [PATCH 1/4] stream: make stream.Readable implement the toAsyncStreamable protocol Signed-off-by: James M Snell Assisted-by: Opencode/Opus 4.6 --- doc/api/errors.md | 7 + lib/internal/errors.js | 2 + lib/internal/streams/iter/from.js | 21 +- lib/internal/streams/iter/types.js | 10 + lib/internal/streams/readable.js | 145 ++++ ...t-stream-iter-readable-interop-disabled.js | 44 ++ .../test-stream-iter-readable-interop.js | 640 ++++++++++++++++++ 7 files changed, 867 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-stream-iter-readable-interop-disabled.js create mode 100644 test/parallel/test-stream-iter-readable-interop.js diff --git a/doc/api/errors.md b/doc/api/errors.md index 65ef2ce7bf5d01..98073d49d62098 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream. A stream method was called that cannot complete because the stream was destroyed using `stream.destroy()`. + + +### `ERR_STREAM_ITER_MISSING_FLAG` + +A stream/iter API was used without the `--experimental-stream-iter` CLI flag +enabled. + ### `ERR_STREAM_NULL_VALUES` diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 7c4728627731fe..206e2a24716022 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); +E('ERR_STREAM_ITER_MISSING_FLAG', + 'The stream/iter API requires the --experimental-stream-iter flag', TypeError); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index d76f430ab0d51e..f117808fa7d3b7 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -37,6 +37,7 @@ const { } = require('internal/util/types'); const { + kTrustedSource, toStreamable, toAsyncStreamable, } = require('internal/streams/iter/types'); @@ -483,6 +484,11 @@ function from(input) { throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input); } + // Fast path: trusted source already yields valid Uint8Array[] batches + if (input[kTrustedSource]) { + return input; + } + // Check for primitives first (ByteInput) if (isPrimitiveChunk(input)) { const chunk = primitiveToUint8Array(input); @@ -531,11 +537,22 @@ function from(input) { // Check toAsyncStreamable protocol (takes precedence over toStreamable and // iteration protocols) if (typeof input[toAsyncStreamable] === 'function') { + const result = input[toAsyncStreamable](); + // Synchronous trusted source (e.g. Readable batched iterator) + if (result?.[kTrustedSource]) { + return result; + } return { __proto__: null, async *[SymbolAsyncIterator]() { - const result = await input[toAsyncStreamable](); - yield* from(result)[SymbolAsyncIterator](); + // The result may be a Promise. Check trusted on both the Promise + // itself (if tagged) and the resolved value. + const resolved = await result; + if (resolved?.[kTrustedSource]) { + yield* resolved[SymbolAsyncIterator](); + return; + } + yield* from(resolved)[SymbolAsyncIterator](); }, }; } diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index c205db00e3782a..9e528647ca9110 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -55,9 +55,19 @@ const drainableProtocol = SymbolFor('Stream.drainableProtocol'); */ const kTrustedTransform = Symbol('kTrustedTransform'); +/** + * Internal sentinel for trusted sources. An async iterable with + * [kTrustedSource] = true signals that it already yields valid + * Uint8Array[] batches - no normalizeAsyncSource wrapper needed. + * from() will return such sources directly, skipping all normalization. + * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for(). + */ +const kTrustedSource = Symbol('kTrustedSource'); + module.exports = { broadcastProtocol, drainableProtocol, + kTrustedSource, kTrustedTransform, shareProtocol, shareSyncProtocol, diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 919c527a2be6f8..d3680743328419 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -35,6 +35,7 @@ const { Symbol, SymbolAsyncDispose, SymbolAsyncIterator, + SymbolFor, SymbolSpecies, TypedArrayPrototypeSet, } = primordials; @@ -52,6 +53,8 @@ const { } = require('internal/streams/add-abort-signal'); const { eos } = require('internal/streams/end-of-stream'); +const { getOptionValue } = require('internal/options'); + let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; }); @@ -82,6 +85,7 @@ const { ERR_INVALID_ARG_TYPE, ERR_METHOD_NOT_IMPLEMENTED, ERR_OUT_OF_RANGE, + ERR_STREAM_ITER_MISSING_FLAG, ERR_STREAM_PUSH_AFTER_EOF, ERR_STREAM_UNSHIFT_AFTER_END_EVENT, ERR_UNKNOWN_ENCODING, @@ -1796,3 +1800,144 @@ Readable.wrap = function(src, options) { }, }).wrap(src); }; + +// Efficient interop with the stream/iter API via toAsyncStreamable protocol. +// Provides a batched async iterator that drains the internal buffer into +// Uint8Array[] batches, avoiding the per-chunk Promise overhead of the +// standard Symbol.asyncIterator path. +// +// The flag cannot be checked at module load time (readable.js loads during +// bootstrap before options are available). Instead, toAsyncStreamable is +// always defined but lazily initializes on first call - throwing if the +// flag is not set, or installing the real implementation if it is. +{ + const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable'); + let kTrustedSource; + let normalizeAsyncValue; + let isU8; + + // Maximum chunks to drain into a single batch. Bounds peak memory when + // _read() synchronously pushes many chunks into the buffer. + const MAX_DRAIN_BATCH = 128; + + function lazyInit() { + if (kTrustedSource !== undefined) return; + if (!getOptionValue('--experimental-stream-iter')) { + throw new ERR_STREAM_ITER_MISSING_FLAG(); + } + ({ kTrustedSource } = require('internal/streams/iter/types')); + ({ normalizeAsyncValue } = require('internal/streams/iter/from')); + ({ isUint8Array: isU8 } = require('internal/util/types')); + } + + // Normalize a batch of raw chunks from an object-mode or encoded + // Readable into Uint8Array values. Returns the normalized batch, + // or null if normalization produced no output. + async function normalizeBatch(raw) { + const batch = []; + for (let i = 0; i < raw.length; i++) { + const value = raw[i]; + if (isU8(value)) { + batch.push(value); + } else { + // normalizeAsyncValue may await for async protocols (e.g. + // toAsyncStreamable on yielded objects). Stream events during + // the suspension are queued, not lost - errors will surface + // on the next loop iteration after this yield completes. + for await (const normalized of normalizeAsyncValue(value)) { + batch.push(normalized); + } + } + } + return batch.length > 0 ? batch : null; + } + + // Batched async iterator for Readable streams. Same mechanism as + // createAsyncIterator (same event setup, same stream.read() to + // trigger _read(), same teardown) but drains all currently buffered + // chunks into a single Uint8Array[] batch per yield, amortizing the + // Promise/microtask cost across multiple chunks. + // + // When normalize is provided (object-mode / encoded streams), each + // drained batch is passed through it to convert chunks to Uint8Array. + // When normalize is null (byte-mode), chunks are already Buffers + // (Uint8Array subclass) and are yielded directly. + async function* createBatchedAsyncIterator(stream, normalize) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + stream.on('readable', next); + + let error; + const cleanup = eos(stream, { writable: false }, (err) => { + error = err ? aggregateTwoErrors(error, err) : null; + callback(); + callback = nop; + }); + + try { + while (true) { + const chunk = stream.destroyed ? null : stream.read(); + if (chunk !== null) { + // Drain any additional already-buffered chunks into the same + // batch. The first read() may trigger _read() which + // synchronously pushes more data into the buffer. We drain + // that buffered data without issuing unbounded _read() calls - + // once state.length hits 0 or MAX_DRAIN_BATCH is reached, we + // stop and yield what we have. + const batch = [chunk]; + while (batch.length < MAX_DRAIN_BATCH && + stream._readableState.length > 0) { + const c = stream.read(); + if (c === null) break; + batch.push(c); + } + if (normalize !== null) { + const result = await normalize(batch); + if (result !== null) { + yield result; + } + } else { + yield batch; + } + } else if (error) { + throw error; + } else if (error === null) { + return; + } else { + await new Promise(next); + } + } + } catch (err) { + error = aggregateTwoErrors(error, err); + throw error; + } finally { + if (error === undefined || stream._readableState.autoDestroy) { + destroyImpl.destroyer(stream, null); + } else { + stream.off('readable', next); + cleanup(); + } + } + } + + Readable.prototype[toAsyncStreamable] = function() { + lazyInit(); + const state = this._readableState; + const normalize = (state.objectMode || state.encoding) ? + normalizeBatch : + null; + const iter = createBatchedAsyncIterator(this, normalize); + iter[kTrustedSource] = true; + iter.stream = this; + return iter; + }; +} diff --git a/test/parallel/test-stream-iter-readable-interop-disabled.js b/test/parallel/test-stream-iter-readable-interop-disabled.js new file mode 100644 index 00000000000000..794bf2842cfbfa --- /dev/null +++ b/test/parallel/test-stream-iter-readable-interop-disabled.js @@ -0,0 +1,44 @@ +'use strict'; + +// Tests that toAsyncStreamable throws ERR_STREAM_ITER_MISSING_FLAG +// when --experimental-stream-iter is not enabled. + +const common = require('../common'); +const assert = require('assert'); +const { spawnPromisified } = common; + +async function testToAsyncStreamableWithoutFlag() { + const { stderr, code } = await spawnPromisified(process.execPath, [ + '-e', + ` + const { Readable } = require('stream'); + const r = new Readable({ read() {} }); + r[Symbol.for('Stream.toAsyncStreamable')](); + `, + ]); + assert.notStrictEqual(code, 0); + assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/); +} + +async function testToAsyncStreamableWithFlag() { + const { code } = await spawnPromisified(process.execPath, [ + '--experimental-stream-iter', + '-e', + ` + const { Readable } = require('stream'); + const r = new Readable({ + read() { this.push(Buffer.from('ok')); this.push(null); } + }); + const sym = Symbol.for('Stream.toAsyncStreamable'); + const iter = r[sym](); + // Should not throw, and should have stream property + if (!iter.stream) process.exit(1); + `, + ]); + assert.strictEqual(code, 0); +} + +Promise.all([ + testToAsyncStreamableWithoutFlag(), + testToAsyncStreamableWithFlag(), +]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-readable-interop.js b/test/parallel/test-stream-iter-readable-interop.js new file mode 100644 index 00000000000000..f201dab4f5642a --- /dev/null +++ b/test/parallel/test-stream-iter-readable-interop.js @@ -0,0 +1,640 @@ +// Flags: --experimental-stream-iter +'use strict'; + +// Tests for classic Readable stream interop with the stream/iter API +// via the toAsyncStreamable protocol and kTrustedSource optimization. + +const common = require('../common'); +const assert = require('assert'); +const { Readable } = require('stream'); +const { + from, + pull, + bytes, + text, +} = require('stream/iter'); + +const toAsyncStreamable = Symbol.for('Stream.toAsyncStreamable'); + +// ============================================================================= +// toAsyncStreamable protocol is present on Readable.prototype +// ============================================================================= + +function testProtocolExists() { + assert.strictEqual(typeof Readable.prototype[toAsyncStreamable], 'function'); + + const readable = new Readable({ read() {} }); + assert.strictEqual(typeof readable[toAsyncStreamable], 'function'); +} + +// ============================================================================= +// Byte-mode Readable: basic round-trip through from() +// ============================================================================= + +async function testByteModeThroughFrom() { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + this.push(Buffer.from(' world')); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// Byte-mode Readable: basic round-trip through pull() +// ============================================================================= + +async function testByteModeThroughPull() { + const readable = new Readable({ + read() { + this.push(Buffer.from('pull ')); + this.push(Buffer.from('test')); + this.push(null); + }, + }); + + const result = await text(pull(readable)); + assert.strictEqual(result, 'pull test'); +} + +// ============================================================================= +// Byte-mode Readable: bytes consumer +// ============================================================================= + +async function testByteModeBytes() { + const data = Buffer.from('binary data here'); + const readable = new Readable({ + read() { + this.push(data); + this.push(null); + }, + }); + + const result = await bytes(from(readable)); + assert.deepStrictEqual(result, new Uint8Array(data)); +} + +// ============================================================================= +// Byte-mode Readable: batching - multiple buffered chunks yield as one batch +// ============================================================================= + +async function testBatchingBehavior() { + const readable = new Readable({ + read() { + // Push multiple chunks synchronously so they all buffer + for (let i = 0; i < 10; i++) { + this.push(Buffer.from(`chunk${i}`)); + } + this.push(null); + }, + }); + + const source = from(readable); + const batches = []; + for await (const batch of source) { + batches.push(batch); + } + + // All chunks were buffered synchronously, so they should come out + // as fewer batches than individual chunks (ideally one batch). + assert.ok(batches.length < 10, + `Expected fewer batches than chunks, got ${batches.length}`); + + // Total data should be correct + const allChunks = batches.flat(); + const combined = Buffer.concat(allChunks); + let expected = ''; + for (let i = 0; i < 10; i++) { + expected += `chunk${i}`; + } + assert.strictEqual(combined.toString(), expected); +} + +// ============================================================================= +// Byte-mode Readable: kTrustedSource is set +// ============================================================================= + +function testTrustedSourceByteMode() { + const readable = new Readable({ read() {} }); + const result = readable[toAsyncStreamable](); + // kTrustedSource is a private symbol, but we can verify the result + // is used directly by from() without wrapping by checking it has + // Symbol.asyncIterator + assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); + assert.strictEqual(result.stream, readable); +} + +// ============================================================================= +// Byte-mode Readable: multi-read with delayed pushes +// ============================================================================= + +async function testDelayedPushes() { + let pushCount = 0; + const readable = new Readable({ + read() { + if (pushCount < 3) { + setTimeout(() => { + this.push(Buffer.from(`delayed${pushCount}`)); + pushCount++; + if (pushCount === 3) { + this.push(null); + } + }, 10); + } + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'delayed0delayed1delayed2'); +} + +// ============================================================================= +// Byte-mode Readable: empty stream +// ============================================================================= + +async function testEmptyStream() { + const readable = new Readable({ + read() { + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, ''); +} + +// ============================================================================= +// Byte-mode Readable: error propagation +// ============================================================================= + +async function testErrorPropagation() { + const readable = new Readable({ + read() { + process.nextTick(() => this.destroy(new Error('test error'))); + }, + }); + + await assert.rejects( + text(from(readable)), + (err) => err.message === 'test error', + ); +} + +// ============================================================================= +// Byte-mode Readable: with transforms +// ============================================================================= + +async function testWithTransform() { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + this.push(null); + }, + }); + + // Uppercase transform + function uppercase(chunks) { + if (chunks === null) return null; + return chunks.map((c) => { + const buf = Buffer.from(c); + for (let i = 0; i < buf.length; i++) { + if (buf[i] >= 97 && buf[i] <= 122) buf[i] -= 32; + } + return buf; + }); + } + + const result = await text(pull(readable, uppercase)); + assert.strictEqual(result, 'HELLO'); +} + +// ============================================================================= +// Object-mode Readable: strings are normalized to Uint8Array +// ============================================================================= + +async function testObjectModeStrings() { + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.push(' object'); + this.push(' mode'); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello object mode'); +} + +// ============================================================================= +// Object-mode Readable: Uint8Array chunks pass through +// ============================================================================= + +async function testObjectModeUint8Array() { + const readable = new Readable({ + objectMode: true, + read() { + this.push(new Uint8Array([72, 73])); // "HI" + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'HI'); +} + +// ============================================================================= +// Object-mode Readable: mixed types (strings + Uint8Array) +// ============================================================================= + +async function testObjectModeMixed() { + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.push(Buffer.from(' ')); + this.push('world'); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// Object-mode Readable: toStreamable protocol objects +// ============================================================================= + +async function testObjectModeToStreamable() { + const toStreamableSym = Symbol.for('Stream.toStreamable'); + const readable = new Readable({ + objectMode: true, + read() { + this.push({ + [toStreamableSym]() { + return 'from-protocol'; + }, + }); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'from-protocol'); +} + +// ============================================================================= +// Object-mode Readable: kTrustedSource is set +// ============================================================================= + +function testTrustedSourceObjectMode() { + const readable = new Readable({ objectMode: true, read() {} }); + const result = readable[toAsyncStreamable](); + assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); + assert.strictEqual(result.stream, readable); +} + +// ============================================================================= +// Encoded Readable: strings are re-encoded to Uint8Array +// ============================================================================= + +async function testEncodedReadable() { + const readable = new Readable({ + encoding: 'utf8', + read() { + this.push(Buffer.from('encoded')); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'encoded'); +} + +// ============================================================================= +// Readable.from() source: verify interop with Readable.from() +// ============================================================================= + +async function testReadableFrom() { + const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']); + + const result = await text(from(readable)); + assert.strictEqual(result, 'chunk1chunk2chunk3'); +} + +// ============================================================================= +// Byte-mode Readable: large data +// ============================================================================= + +async function testLargeData() { + const totalSize = 1024 * 1024; // 1 MB + const chunkSize = 16384; + let pushed = 0; + + const readable = new Readable({ + read() { + if (pushed < totalSize) { + const size = Math.min(chunkSize, totalSize - pushed); + const buf = Buffer.alloc(size, 0x41); // Fill with 'A' + this.push(buf); + pushed += size; + } else { + this.push(null); + } + }, + }); + + const result = await bytes(from(readable)); + assert.strictEqual(result.length, totalSize); + assert.strictEqual(result[0], 0x41); + assert.strictEqual(result[totalSize - 1], 0x41); +} + +// ============================================================================= +// Byte-mode Readable: consumer return (early termination) +// ============================================================================= + +async function testEarlyTermination() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`chunk${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + // Take only the first batch + const source = from(readable); + const batches = []; + for await (const batch of source) { + batches.push(batch); + break; // Stop after first batch + } + + assert.ok(batches.length >= 1); + // Stream should be destroyed after consumer return + // Give it a tick to clean up + await new Promise((resolve) => setTimeout(resolve, 50)); + assert.ok(readable.destroyed); +} + +// ============================================================================= +// Byte-mode Readable: pull() with compression transform +// ============================================================================= + +async function testWithCompression() { + const { + compressGzip, + decompressGzip, + } = require('zlib/iter'); + + const readable = new Readable({ + read() { + this.push(Buffer.from('compress me via classic Readable')); + this.push(null); + }, + }); + + const compressed = pull(readable, compressGzip()); + const result = await text(pull(compressed, decompressGzip())); + assert.strictEqual(result, 'compress me via classic Readable'); +} + +// ============================================================================= +// Object-mode Readable: error propagation +// ============================================================================= + +async function testObjectModeError() { + const readable = new Readable({ + objectMode: true, + read() { + process.nextTick(() => this.destroy(new Error('object error'))); + }, + }); + + await assert.rejects( + text(from(readable)), + (err) => err.message === 'object error', + ); +} + +// ============================================================================= +// Stream destroyed mid-iteration +// ============================================================================= + +async function testDestroyMidIteration() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`chunk${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + const chunks = []; + await assert.rejects(async () => { + for await (const batch of from(readable)) { + chunks.push(...batch); + if (chunks.length >= 3) { + readable.destroy(); + } + } + }, { code: 'ERR_STREAM_PREMATURE_CLOSE' }); + assert.ok(readable.destroyed); + assert.ok(chunks.length >= 3); +} + +// ============================================================================= +// Error after partial data +// ============================================================================= + +async function testErrorAfterPartialData() { + let count = 0; + const readable = new Readable({ + read() { + if (count < 3) { + this.push(Buffer.from(`ok${count++}`)); + } else { + this.destroy(new Error('late error')); + } + }, + }); + + const chunks = []; + await assert.rejects(async () => { + for await (const batch of from(readable)) { + chunks.push(...batch); + } + }, { message: 'late error' }); + assert.ok(chunks.length > 0, 'Should have received partial data'); +} + +// ============================================================================= +// Multiple consumers (second iteration yields empty) +// ============================================================================= + +async function testMultipleConsumers() { + const readable = new Readable({ + read() { + this.push(Buffer.from('data')); + this.push(null); + }, + }); + + const first = await text(from(readable)); + assert.strictEqual(first, 'data'); + + // Second consumption - stream is already consumed/destroyed + const second = await text(from(readable)); + assert.strictEqual(second, ''); +} + +// ============================================================================= +// highWaterMark: 0 - each chunk becomes its own batch +// ============================================================================= + +async function testHighWaterMarkZero() { + let pushCount = 0; + const readable = new Readable({ + highWaterMark: 0, + read() { + if (pushCount < 3) { + this.push(Buffer.from(`hwm0-${pushCount++}`)); + } else { + this.push(null); + } + }, + }); + + const batches = []; + for await (const batch of from(readable)) { + batches.push(batch); + } + + // With HWM=0, buffer is always empty so drain loop never fires. + // Each chunk should be its own batch. + assert.strictEqual(batches.length, 3); + const combined = Buffer.concat(batches.flat()); + assert.strictEqual(combined.toString(), 'hwm0-0hwm0-1hwm0-2'); +} + +// ============================================================================= +// Duplex stream (Duplex extends Readable, toAsyncStreamable should work) +// ============================================================================= + +async function testDuplexStream() { + const { Duplex } = require('stream'); + + const duplex = new Duplex({ + read() { + this.push(Buffer.from('duplex-data')); + this.push(null); + }, + write(chunk, enc, cb) { cb(); }, + }); + + const result = await text(from(duplex)); + assert.strictEqual(result, 'duplex-data'); +} + +// ============================================================================= +// setEncoding called dynamically after construction +// ============================================================================= + +async function testSetEncodingDynamic() { + const readable = new Readable({ + read() { + this.push(Buffer.from('dynamic-enc')); + this.push(null); + }, + }); + + readable.setEncoding('utf8'); + + const result = await text(from(readable)); + assert.strictEqual(result, 'dynamic-enc'); +} + +// ============================================================================= +// AbortSignal cancellation +// ============================================================================= + +async function testAbortSignal() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`sig${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + const ac = new AbortController(); + const chunks = []; + + await assert.rejects(async () => { + for await (const batch of pull(readable, { signal: ac.signal })) { + chunks.push(...batch); + if (chunks.length >= 2) { + ac.abort(); + } + } + }, { name: 'AbortError' }); + assert.ok(chunks.length >= 2); +} + +// ============================================================================= +// kTrustedSource identity - from() returns same object for trusted sources +// ============================================================================= + +function testTrustedSourceIdentity() { + const readable = new Readable({ read() {} }); + const iter = readable[toAsyncStreamable](); + + // from() should return the trusted iterator directly (same reference), + // not wrap it in another generator + const result = from(iter); + assert.strictEqual(result, iter); +} + +// ============================================================================= +// Run all tests +// ============================================================================= + +testProtocolExists(); +testTrustedSourceByteMode(); +testTrustedSourceObjectMode(); +testTrustedSourceIdentity(); + +Promise.all([ + testByteModeThroughFrom(), + testByteModeThroughPull(), + testByteModeBytes(), + testBatchingBehavior(), + testDelayedPushes(), + testEmptyStream(), + testErrorPropagation(), + testWithTransform(), + testObjectModeStrings(), + testObjectModeUint8Array(), + testObjectModeMixed(), + testObjectModeToStreamable(), + testEncodedReadable(), + testReadableFrom(), + testLargeData(), + testEarlyTermination(), + testWithCompression(), + testObjectModeError(), + testDestroyMidIteration(), + testErrorAfterPartialData(), + testMultipleConsumers(), + testHighWaterMarkZero(), + testDuplexStream(), + testSetEncodingDynamic(), + testAbortSignal(), +]).then(common.mustCall()); From 8d1adefb74b081d2ae0fd2e2536c30dc9bfa29b2 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 23 Mar 2026 13:51:35 -0700 Subject: [PATCH 2/4] stream: add fromStreamIter/fromStreamIterSync to stream.Readable Signed-off-by: James M Snell Assisted-by: Opencode/Opus 4.6 --- doc/api/stream.md | 157 +++++ lib/internal/streams/readable.js | 137 ++++ ...t-stream-iter-readable-interop-disabled.js | 28 + test/parallel/test-stream-iter-to-readable.js | 618 ++++++++++++++++++ 4 files changed, 940 insertions(+) create mode 100644 test/parallel/test-stream-iter-to-readable.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 65afeaad6306e0..6f046137f4ab75 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file has less then 64 KiB of data because no `highWaterMark` option is provided to [`fs.createReadStream()`][]. +##### `readable[Symbol.for('Stream.toAsyncStreamable')]()` + + + +> Stability: 1 - Experimental + +* Returns: {AsyncIterable} An `AsyncIterable` that yields + batched chunks from the stream. + +When the `--experimental-stream-iter` flag is enabled, `Readable` streams +implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient +consumption by the [`stream/iter`][] API. + +This provides a batched async iterator that drains the stream's internal +buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead +of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks +are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses). +For object-mode or encoded streams, each chunk is normalized to `Uint8Array` +before batching. + +The returned iterator is tagged as a trusted source, so [`from()`][stream-iter-from] +passes it through without additional normalization. + +```mjs +import { Readable } from 'node:stream'; +import { text, from } from 'node:stream/iter'; + +const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, +}); + +// Readable is automatically consumed via toAsyncStreamable +console.log(await text(from(readable))); // 'hello' +``` + +```cjs +const { Readable } = require('node:stream'); +const { text, from } = require('node:stream/iter'); + +async function run() { + const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, + }); + + console.log(await text(from(readable))); // 'hello' +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `readable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* `source` {AsyncIterable} An `AsyncIterable` source, such as + the return value of [`pull()`][] or [`from()`][stream-iter-from]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). + * `signal` {AbortSignal} An optional signal that can be used to abort + the readable, destroying the stream and cleaning up the source iterator. +* Returns: {stream.Readable} + +Creates a byte-mode {stream.Readable} from an `AsyncIterable` +(the native batch format used by the [`stream/iter`][] API). Each +`Uint8Array` in a yielded batch is pushed as a separate chunk into the +Readable. + +This method requires the `--experimental-stream-iter` CLI flag. + +```mjs +import { Readable } from 'node:stream'; +import { createWriteStream } from 'node:fs'; +import { from, pull } from 'node:stream/iter'; +import { compressGzip } from 'node:zlib/iter'; + +// Bridge a stream/iter pipeline to a classic Readable +const source = pull(from('hello world'), compressGzip()); +const readable = Readable.fromStreamIter(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +```cjs +const { Readable } = require('node:stream'); +const { createWriteStream } = require('node:fs'); +const { from, pull } = require('node:stream/iter'); +const { compressGzip } = require('node:zlib/iter'); + +const source = pull(from('hello world'), compressGzip()); +const readable = Readable.fromStreamIter(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +### `stream.Readable.fromStreamIterSync(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {Iterable} An `Iterable` source, such as the + return value of [`pullSync()`][] or [`fromSync()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). +* Returns: {stream.Readable} + +Creates a byte-mode {stream.Readable} from a synchronous +`Iterable` (the native batch format used by the +[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is +pushed as a separate chunk into the Readable. + +The `_read()` method pulls from the iterator synchronously, so data is +available immediately via `readable.read()` without waiting for async +callbacks. + +This method requires the `--experimental-stream-iter` CLI flag. + +```mjs +import { Readable } from 'node:stream'; +import { fromSync } from 'node:stream/iter'; + +const source = fromSync('hello world'); +const readable = Readable.fromStreamIterSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +```cjs +const { Readable } = require('node:stream'); +const { fromSync } = require('node:stream/iter'); + +const source = fromSync('hello world'); +const readable = Readable.fromStreamIterSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + ### `stream.Readable.fromWeb(readableStream[, options])` + +> Stability: 1 - Experimental + +* `options` {Object} + * `backpressure` {string} Backpressure policy. One of `'strict'` (default), + `'block'`, or `'drop-newest'`. See below for details. +* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter. + +When the `--experimental-stream-iter` flag is enabled, returns an adapter +object that conforms to the [`stream/iter`][] Writer interface, allowing the +`Writable` to be used as a destination in the iterable streams API. + +Since all writes on a classic `stream.Writable` are fundamentally +asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`) +always return `false` or `-1`, deferring to the async path. The per-write +`options.signal` parameter from the Writer interface is also ignored; classic +`stream.Writable` has no per-write abort signal support, so cancellation +should be handled at the pipeline level. + +**Backpressure policies:** + +* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when + the buffer is full (`writableLength >= writableHighWaterMark`). This catches + callers that ignore backpressure. +* `'block'` — writes wait for the `'drain'` event when the buffer is full. + This matches classic `stream.Writable` behavior and is the recommended + policy when using [`pipeTo()`][stream-iter-pipeto]. +* `'drop-newest'` — writes are silently discarded when the buffer is full. + The data is not written to the underlying resource, but `writer.end()` + still reports the total bytes (including dropped bytes) for consistency + with the Writer spec. +* `'drop-oldest'` — **not supported**. Classic `stream.Writable` does not + provide an API to evict already-buffered data without risking partial + eviction of atomic `writev()` batches. Passing this value throws + `ERR_INVALID_ARG_VALUE`. + +The adapter maps: + +* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the + backpressure policy. +* `writer.writev(chunks)` — corks the writable, writes all chunks, then + uncorks. Subject to the backpressure policy. +* `writer.end()` — calls `writable.end()` and resolves with total bytes + written when the `'finish'` event fires. +* `writer.fail(reason)` — calls `writable.destroy(reason)`. +* `writer.desiredSize` — returns the available buffer space + (`writableHighWaterMark - writableLength`), or `null` if the stream + is destroyed or finished. + +```mjs +import { Writable } from 'node:stream'; +import { from, pipeTo } from 'node:stream/iter'; + +const chunks = []; +const writable = new Writable({ + write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, +}); + +// Use 'block' policy with pipeTo for classic backpressure behavior +await pipeTo(from('hello world'), + writable.toStreamIterWriter({ backpressure: 'block' })); +``` + +```cjs +const { Writable } = require('node:stream'); +const { from, pipeTo } = require('node:stream/iter'); + +async function run() { + const chunks = []; + const writable = new Writable({ + write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, + }); + + await pipeTo(from('hello world'), + writable.toStreamIterWriter({ backpressure: 'block' })); +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `writable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is + required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`, + and `writev()` are optional. +* Returns: {stream.Writable} + +When the `--experimental-stream-iter` flag is enabled, creates a classic +`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer]. + +Each `_write()` / `_writev()` call attempts the Writer's synchronous method +first (`writeSync` / `writevSync`), falling back to the async method if the +sync path returns `false`. Similarly, `_final()` tries `endSync()` before +`end()`. When the sync path succeeds, the callback is deferred via +`queueMicrotask` to preserve the async resolution contract that Writable +internals expect. + +* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls + back to `await writer.write(bytes)`. +* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls + back to `await writer.writev(chunks)`. Only defined if `writer.writev` + exists. +* `_final(cb)` — tries `writer.endSync()`, falls back to + `await writer.end()`. +* `_destroy(err, cb)` — calls `writer.fail(err)`. + +```mjs +import { Writable } from 'node:stream'; +import { push, from, pipeTo } from 'node:stream/iter'; + +const { writer, readable } = push(); +const writable = Writable.fromStreamIter(writer); + +writable.write('hello'); +writable.end(); +``` + +```cjs +const { Writable } = require('node:stream'); +const { push, from, pipeTo } = require('node:stream/iter'); + +const { writer, readable } = push(); +const writable = Writable.fromStreamIter(writer); + +writable.write('hello'); +writable.end(); +``` + +This method requires the `--experimental-stream-iter` CLI flag. + ### `stream.Writable.fromWeb(writableStream[, options])` - -> Stability: 1 - Experimental - -* `options` {Object} - * `backpressure` {string} Backpressure policy. One of `'strict'` (default), - `'block'`, or `'drop-newest'`. See below for details. -* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter. - -When the `--experimental-stream-iter` flag is enabled, returns an adapter -object that conforms to the [`stream/iter`][] Writer interface, allowing the -`Writable` to be used as a destination in the iterable streams API. - -Since all writes on a classic `stream.Writable` are fundamentally -asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`) -always return `false` or `-1`, deferring to the async path. The per-write -`options.signal` parameter from the Writer interface is also ignored; classic -`stream.Writable` has no per-write abort signal support, so cancellation -should be handled at the pipeline level. - -**Backpressure policies:** - -* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when - the buffer is full (`writableLength >= writableHighWaterMark`). This catches - callers that ignore backpressure. -* `'block'` — writes wait for the `'drain'` event when the buffer is full. - This matches classic `stream.Writable` behavior and is the recommended - policy when using [`pipeTo()`][stream-iter-pipeto]. -* `'drop-newest'` — writes are silently discarded when the buffer is full. - The data is not written to the underlying resource, but `writer.end()` - still reports the total bytes (including dropped bytes) for consistency - with the Writer spec. -* `'drop-oldest'` — **not supported**. Classic `stream.Writable` does not - provide an API to evict already-buffered data without risking partial - eviction of atomic `writev()` batches. Passing this value throws - `ERR_INVALID_ARG_VALUE`. - -The adapter maps: - -* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the - backpressure policy. -* `writer.writev(chunks)` — corks the writable, writes all chunks, then - uncorks. Subject to the backpressure policy. -* `writer.end()` — calls `writable.end()` and resolves with total bytes - written when the `'finish'` event fires. -* `writer.fail(reason)` — calls `writable.destroy(reason)`. -* `writer.desiredSize` — returns the available buffer space - (`writableHighWaterMark - writableLength`), or `null` if the stream - is destroyed or finished. - -```mjs -import { Writable } from 'node:stream'; -import { from, pipeTo } from 'node:stream/iter'; - -const chunks = []; -const writable = new Writable({ - write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, -}); - -// Use 'block' policy with pipeTo for classic backpressure behavior -await pipeTo(from('hello world'), - writable.toStreamIterWriter({ backpressure: 'block' })); -``` - -```cjs -const { Writable } = require('node:stream'); -const { from, pipeTo } = require('node:stream/iter'); - -async function run() { - const chunks = []; - const writable = new Writable({ - write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, - }); - - await pipeTo(from('hello world'), - writable.toStreamIterWriter({ backpressure: 'block' })); -} - -run().catch(console.error); -``` - -Without the `--experimental-stream-iter` flag, calling this method throws -[`ERR_STREAM_ITER_MISSING_FLAG`][]. - ##### `writable[Symbol.asyncDispose]()` - -> Stability: 1 - Experimental - -* `source` {AsyncIterable} An `AsyncIterable` source, such as - the return value of [`pull()`][] or [`from()`][stream-iter-from]. -* `options` {Object} - * `highWaterMark` {number} The internal buffer size in bytes before - backpressure is applied. **Default:** `65536` (64 KB). - * `signal` {AbortSignal} An optional signal that can be used to abort - the readable, destroying the stream and cleaning up the source iterator. -* Returns: {stream.Readable} - -Creates a byte-mode {stream.Readable} from an `AsyncIterable` -(the native batch format used by the [`stream/iter`][] API). Each -`Uint8Array` in a yielded batch is pushed as a separate chunk into the -Readable. - -This method requires the `--experimental-stream-iter` CLI flag. - -```mjs -import { Readable } from 'node:stream'; -import { createWriteStream } from 'node:fs'; -import { from, pull } from 'node:stream/iter'; -import { compressGzip } from 'node:zlib/iter'; - -// Bridge a stream/iter pipeline to a classic Readable -const source = pull(from('hello world'), compressGzip()); -const readable = Readable.fromStreamIter(source); - -readable.pipe(createWriteStream('output.gz')); -``` - -```cjs -const { Readable } = require('node:stream'); -const { createWriteStream } = require('node:fs'); -const { from, pull } = require('node:stream/iter'); -const { compressGzip } = require('node:zlib/iter'); - -const source = pull(from('hello world'), compressGzip()); -const readable = Readable.fromStreamIter(source); - -readable.pipe(createWriteStream('output.gz')); -``` - -### `stream.Readable.fromStreamIterSync(source[, options])` - - - -> Stability: 1 - Experimental - -* `source` {Iterable} An `Iterable` source, such as the - return value of [`pullSync()`][] or [`fromSync()`][]. -* `options` {Object} - * `highWaterMark` {number} The internal buffer size in bytes before - backpressure is applied. **Default:** `65536` (64 KB). -* Returns: {stream.Readable} - -Creates a byte-mode {stream.Readable} from a synchronous -`Iterable` (the native batch format used by the -[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is -pushed as a separate chunk into the Readable. - -The `_read()` method pulls from the iterator synchronously, so data is -available immediately via `readable.read()` without waiting for async -callbacks. - -This method requires the `--experimental-stream-iter` CLI flag. - -```mjs -import { Readable } from 'node:stream'; -import { fromSync } from 'node:stream/iter'; - -const source = fromSync('hello world'); -const readable = Readable.fromStreamIterSync(source); - -console.log(readable.read().toString()); // 'hello world' -``` - -```cjs -const { Readable } = require('node:stream'); -const { fromSync } = require('node:stream/iter'); - -const source = fromSync('hello world'); -const readable = Readable.fromStreamIterSync(source); - -console.log(readable.read().toString()); // 'hello world' -``` - ### `stream.Readable.fromWeb(readableStream[, options])` - -> Stability: 1 - Experimental - -* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is - required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`, - and `writev()` are optional. -* Returns: {stream.Writable} - -When the `--experimental-stream-iter` flag is enabled, creates a classic -`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer]. - -Each `_write()` / `_writev()` call attempts the Writer's synchronous method -first (`writeSync` / `writevSync`), falling back to the async method if the -sync path returns `false`. Similarly, `_final()` tries `endSync()` before -`end()`. When the sync path succeeds, the callback is deferred via -`queueMicrotask` to preserve the async resolution contract that Writable -internals expect. - -* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls - back to `await writer.write(bytes)`. -* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls - back to `await writer.writev(chunks)`. Only defined if `writer.writev` - exists. -* `_final(cb)` — tries `writer.endSync()`, falls back to - `await writer.end()`. -* `_destroy(err, cb)` — calls `writer.fail(err)`. - -```mjs -import { Writable } from 'node:stream'; -import { push, from, pipeTo } from 'node:stream/iter'; - -const { writer, readable } = push(); -const writable = Writable.fromStreamIter(writer); - -writable.write('hello'); -writable.end(); -``` - -```cjs -const { Writable } = require('node:stream'); -const { push, from, pipeTo } = require('node:stream/iter'); - -const { writer, readable } = push(); -const writable = Writable.fromStreamIter(writer); - -writable.write('hello'); -writable.end(); -``` - -This method requires the `--experimental-stream-iter` CLI flag. - ### `stream.Writable.fromWeb(writableStream[, options])` + +> Stability: 1 - Experimental + +* `readable` {stream.Readable|Object} A classic Readable stream or any object + with `read()` and `on()` methods. +* Returns: {AsyncIterable\} A stream/iter async iterable source. + +Converts a classic Readable stream (or duck-typed equivalent) into a +stream/iter async iterable source that can be passed to [`from()`][], +[`pull()`][], [`text()`][], etc. + +If the object implements the [`toAsyncStreamable`][] protocol (as +`stream.Readable` does), that protocol is used. Otherwise, the function +duck-types on `read()` and `on()` (EventEmitter) and wraps the stream with +a batched async iterator. + +The result is cached per instance -- calling `fromReadable()` twice with the +same stream returns the same iterable. + +For object-mode or encoded Readable streams, chunks are automatically +normalized to `Uint8Array`. + +```mjs +import { Readable } from 'node:stream'; +import { fromReadable, text } from 'node:stream/iter'; + +const readable = new Readable({ + read() { this.push('hello world'); this.push(null); }, +}); + +const result = await text(fromReadable(readable)); +console.log(result); // 'hello world' +``` + +```cjs +const { Readable } = require('node:stream'); +const { fromReadable, text } = require('node:stream/iter'); + +const readable = new Readable({ + read() { this.push('hello world'); this.push(null); }, +}); + +async function run() { + const result = await text(fromReadable(readable)); + console.log(result); // 'hello world' +} +run(); +``` + +### `fromWritable(writable[, options])` + + + +> Stability: 1 - Experimental + +* `writable` {stream.Writable|Object} A classic Writable stream or any object + with `write()` and `on()` methods. +* `options` {Object} + * `backpressure` {string} Backpressure policy. **Default:** `'strict'`. + * `'strict'` -- writes are rejected when the buffer is full. Catches + callers that ignore backpressure. + * `'block'` -- writes wait for drain when the buffer is full. Recommended + for use with [`pipeTo()`][]. + * `'drop-newest'` -- writes are silently discarded when the buffer is full. + * `'drop-oldest'` -- **not supported**. Throws `ERR_INVALID_ARG_VALUE`. +* Returns: {Object} A stream/iter Writer adapter. + +Creates a stream/iter Writer adapter from a classic Writable stream (or +duck-typed equivalent). The adapter can be passed to [`pipeTo()`][] as a +destination. + +Since all writes on a classic Writable are fundamentally asynchronous, +the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always +return `false` or `-1`, deferring to the async path. The per-write +`options.signal` parameter from the Writer interface is also ignored. + +The result is cached per instance -- calling `fromWritable()` twice with the +same stream returns the same Writer. + +For duck-typed streams that do not expose `writableHighWaterMark`, +`writableLength`, or similar properties, sensible defaults are used. +Object-mode writables (if detectable) are rejected since the Writer +interface is bytes-only. + +```mjs +import { Writable } from 'node:stream'; +import { from, fromWritable, pipeTo } from 'node:stream/iter'; + +const writable = new Writable({ + write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); }, +}); + +await pipeTo(from('hello world'), + fromWritable(writable, { backpressure: 'block' })); +``` + +```cjs +const { Writable } = require('node:stream'); +const { from, fromWritable, pipeTo } = require('node:stream/iter'); + +async function run() { + const writable = new Writable({ + write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); }, + }); + + await pipeTo(from('hello world'), + fromWritable(writable, { backpressure: 'block' })); +} +run(); +``` + +### `toReadable(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {AsyncIterable} An `AsyncIterable` source, such as + the return value of [`pull()`][] or [`from()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). + * `signal` {AbortSignal} An optional signal to abort the readable. +* Returns: {stream.Readable} + +Creates a byte-mode [`stream.Readable`][] from an `AsyncIterable` +(the native batch format used by the stream/iter API). Each `Uint8Array` in a +yielded batch is pushed as a separate chunk into the Readable. + +```mjs +import { createWriteStream } from 'node:fs'; +import { from, pull, toReadable } from 'node:stream/iter'; +import { compressGzip } from 'node:zlib/iter'; + +const source = pull(from('hello world'), compressGzip()); +const readable = toReadable(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +```cjs +const { createWriteStream } = require('node:fs'); +const { from, pull, toReadable } = require('node:stream/iter'); +const { compressGzip } = require('node:zlib/iter'); + +const source = pull(from('hello world'), compressGzip()); +const readable = toReadable(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +### `toReadableSync(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {Iterable} An `Iterable` source, such as the + return value of [`pullSync()`][] or [`fromSync()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). +* Returns: {stream.Readable} + +Creates a byte-mode [`stream.Readable`][] from a synchronous +`Iterable`. The `_read()` method pulls from the iterator +synchronously, so data is available immediately via `readable.read()`. + +```mjs +import { fromSync, toReadableSync } from 'node:stream/iter'; + +const source = fromSync('hello world'); +const readable = toReadableSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +```cjs +const { fromSync, toReadableSync } = require('node:stream/iter'); + +const source = fromSync('hello world'); +const readable = toReadableSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +### `toWritable(writer)` + + + +> Stability: 1 - Experimental + +* `writer` {Object} A stream/iter Writer. Only the `write()` method is + required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`, + and `writev()` are optional. +* Returns: {stream.Writable} + +Creates a classic [`stream.Writable`][] backed by a stream/iter Writer. + +Each `_write()` / `_writev()` call attempts the Writer's synchronous method +first (`writeSync` / `writevSync`), falling back to the async method if the +sync path returns `false` or throws. Similarly, `_final()` tries `endSync()` +before `end()`. When the sync path succeeds, the callback is deferred via +`queueMicrotask` to preserve the async resolution contract. + +The Writable's `highWaterMark` is set to `Number.MAX_SAFE_INTEGER` to +effectively disable its internal buffering, allowing the underlying Writer +to manage backpressure directly. + +```mjs +import { push, toWritable } from 'node:stream/iter'; + +const { writer, readable } = push(); +const writable = toWritable(writer); + +writable.write('hello'); +writable.end(); +``` + +```cjs +const { push, toWritable } = require('node:stream/iter'); + +const { writer, readable } = push(); +const writable = toWritable(writer); + +writable.write('hello'); +writable.end(); +``` + ## Protocol symbols These well-known symbols allow third-party objects to participate in the @@ -1816,10 +2068,15 @@ console.log(textSync(stream)); // 'hello world' [`arrayBuffer()`]: #arraybuffersource-options [`bytes()`]: #bytessource-options [`from()`]: #frominput +[`fromSync()`]: #fromsyncinput [`node:zlib/iter`]: zlib_iter.md [`node:zlib/iter` documentation]: zlib_iter.md [`pipeTo()`]: #pipetosource-transforms-writer-options [`pull()`]: #pullsource-transforms-options +[`pullSync()`]: #pullsyncsource-transforms-options [`share()`]: #sharesource-options +[`stream.Readable`]: stream.md#class-streamreadable +[`stream.Writable`]: stream.md#class-streamwritable [`tap()`]: #tapcallback [`text()`]: #textsource-options +[`toAsyncStreamable`]: #streamtoasyncstreamable diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js new file mode 100644 index 00000000000000..17aa43f0969202 --- /dev/null +++ b/lib/internal/streams/iter/classic.js @@ -0,0 +1,844 @@ +'use strict'; + +// Interop utilities between classic Node.js streams and the stream/iter API. +// +// These are Node.js-specific (not part of the stream/iter spec) and are +// exported from 'stream/iter' as top-level utility functions: +// +// fromReadable(readable) -- classic Readable (or duck-type) -> stream/iter source +// fromWritable(writable, opts) -- classic Writable (or duck-type) -> stream/iter Writer +// toReadable(source, opts) -- stream/iter source -> classic Readable +// toReadableSync(source, opts) -- stream/iter source (sync) -> classic Readable +// toWritable(writer) -- stream/iter Writer -> classic Writable + +const { + ArrayIsArray, + MathMax, + NumberMAX_SAFE_INTEGER, + Promise, + PromisePrototypeThen, + PromiseReject, + PromiseResolve, + PromiseWithResolvers, + SafeWeakMap, + SymbolAsyncDispose, + SymbolAsyncIterator, + SymbolDispose, + SymbolIterator, + TypedArrayPrototypeGetByteLength, +} = primordials; + +const { + AbortError, + aggregateTwoErrors, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_ARG_VALUE, + ERR_INVALID_STATE, + ERR_STREAM_WRITE_AFTER_END, + }, +} = require('internal/errors'); + +const { + validateInteger, + validateObject, +} = require('internal/validators'); + +const { eos } = require('internal/streams/end-of-stream'); +const { + addAbortSignal: addAbortSignalNoValidate, +} = require('internal/streams/add-abort-signal'); +const { + queueMicrotask, +} = require('internal/process/task_queues'); + +const { + toAsyncStreamable: kToAsyncStreamable, + kValidatedSource, + drainableProtocol, +} = require('internal/streams/iter/types'); + +const { + validateBackpressure, + toUint8Array, +} = require('internal/streams/iter/utils'); + +const { Buffer } = require('buffer'); +const destroyImpl = require('internal/streams/destroy'); + +// Lazy-loaded to avoid circular dependencies. Readable and Writable +// both require this module's parent, so we defer the require. +let Readable; +let Writable; + +function lazyReadable() { + if (Readable === undefined) { + Readable = require('internal/streams/readable'); + } + return Readable; +} + +function lazyWritable() { + if (Writable === undefined) { + Writable = require('internal/streams/writable'); + } + return Writable; +} + +// ============================================================================ +// fromReadable(readable) -- classic Readable -> stream/iter async iterable +// ============================================================================ + +// Cache: one stream/iter source per Readable instance. +const fromReadableCache = new SafeWeakMap(); + +// Maximum chunks to drain into a single batch. Bounds peak memory when +// _read() synchronously pushes many chunks into the buffer. +const MAX_DRAIN_BATCH = 128; + +const { normalizeAsyncValue } = require('internal/streams/iter/from'); +const { isUint8Array } = require('internal/util/types'); + +// Normalize a batch of raw chunks from an object-mode or encoded +// Readable into Uint8Array values. Returns the normalized batch, +// or null if normalization produced no output. +async function normalizeBatch(raw) { + const batch = []; + for (let i = 0; i < raw.length; i++) { + const value = raw[i]; + if (isUint8Array(value)) { + batch.push(value); + } else { + // normalizeAsyncValue may await for async protocols (e.g. + // toAsyncStreamable on yielded objects). Stream events during + // the suspension are queued, not lost -- errors will surface + // on the next loop iteration after this yield completes. + for await (const normalized of normalizeAsyncValue(value)) { + batch.push(normalized); + } + } + } + return batch.length > 0 ? batch : null; +} + +// Batched async iterator for Readable streams. Same mechanism as +// createAsyncIterator (same event setup, same stream.read() to +// trigger _read(), same teardown) but drains all currently buffered +// chunks into a single Uint8Array[] batch per yield, amortizing the +// Promise/microtask cost across multiple chunks. +// +// When normalize is provided (object-mode / encoded streams), each +// drained batch is passed through it to convert chunks to Uint8Array. +// When normalize is null (byte-mode), chunks are already Buffers +// (Uint8Array subclass) and are yielded directly. +const nop = () => {}; + +async function* createBatchedAsyncIterator(stream, normalize) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + stream.on('readable', next); + + let error; + const cleanup = eos(stream, { writable: false }, (err) => { + error = err ? aggregateTwoErrors(error, err) : null; + callback(); + callback = nop; + }); + + try { + while (true) { + const chunk = stream.destroyed ? null : stream.read(); + if (chunk !== null) { + const batch = [chunk]; + while (batch.length < MAX_DRAIN_BATCH && + stream._readableState?.length > 0) { + const c = stream.read(); + if (c === null) break; + batch.push(c); + } + if (normalize !== null) { + const result = await normalize(batch); + if (result !== null) { + yield result; + } + } else { + yield batch; + } + } else if (error) { + throw error; + } else if (error === null) { + return; + } else { + await new Promise(next); + } + } + } catch (err) { + error = aggregateTwoErrors(error, err); + throw error; + } finally { + if (error === undefined || + (stream._readableState?.autoDestroy)) { + destroyImpl.destroyer(stream, null); + } else { + stream.off('readable', next); + cleanup(); + } + } +} + +/** + * Convert a classic Readable (or duck-type) to a stream/iter async iterable. + * + * If the object implements the toAsyncStreamable protocol, delegates to it. + * Otherwise, duck-type checks for read() + EventEmitter (on/off) and + * wraps with a batched async iterator. + * @param {object} readable - A classic Readable or duck-type with + * read() and on()/off() methods. + * @returns {AsyncIterable} A stream/iter async iterable source. + */ +function fromReadable(readable) { + if (readable == null || typeof readable !== 'object') { + throw new ERR_INVALID_ARG_TYPE('readable', 'Readable', readable); + } + + // Check cache first. + const cached = fromReadableCache.get(readable); + if (cached !== undefined) return cached; + + // Protocol path: object implements toAsyncStreamable. + if (typeof readable[kToAsyncStreamable] === 'function') { + const result = readable[kToAsyncStreamable](); + fromReadableCache.set(readable, result); + return result; + } + + // Duck-type path: object has read() and EventEmitter methods. + if (typeof readable.read !== 'function' || + typeof readable.on !== 'function') { + throw new ERR_INVALID_ARG_TYPE('readable', 'Readable', readable); + } + + // Determine normalization. If the stream has _readableState, use it + // to detect object-mode / encoding. Otherwise assume byte-mode. + const state = readable._readableState; + const normalize = (state && (state.objectMode || state.encoding)) ? + normalizeBatch : null; + + const iter = createBatchedAsyncIterator(readable, normalize); + iter[kValidatedSource] = true; + iter.stream = readable; + + fromReadableCache.set(readable, iter); + return iter; +} + + +// ============================================================================ +// toReadable(source, options) -- stream/iter source -> classic Readable +// ============================================================================ + +const kNullPrototype = { __proto__: null }; + +/** + * Create a byte-mode Readable from an AsyncIterable. + * The source must yield Uint8Array[] batches (the stream/iter native + * format). Each Uint8Array in a batch is pushed as a separate chunk. + * @param {AsyncIterable} source + * @param {object} [options] + * @param {number} [options.highWaterMark] + * @param {AbortSignal} [options.signal] + * @returns {stream.Readable} + */ +function toReadable(source, options = kNullPrototype) { + if (typeof source?.[SymbolAsyncIterator] !== 'function') { + throw new ERR_INVALID_ARG_TYPE('source', 'AsyncIterable', source); + } + + validateObject(options, 'options'); + const { + highWaterMark = 64 * 1024, + signal, + } = options; + validateInteger(highWaterMark, 'options.highWaterMark', 0); + + const ReadableCtor = lazyReadable(); + const iterator = source[SymbolAsyncIterator](); + let backpressure; + let pumping = false; + let done = false; + + const readable = new ReadableCtor({ + __proto__: null, + highWaterMark, + read() { + if (backpressure) { + const { resolve } = backpressure; + backpressure = null; + resolve(); + } else if (!pumping && !done) { + pumping = true; + pump(); + } + }, + destroy(err, cb) { + done = true; + // Wake up the pump if it's waiting on backpressure so it + // can see done === true and exit cleanly. + if (backpressure) { + backpressure.resolve(); + backpressure = null; + } + if (typeof iterator.return === 'function') { + PromisePrototypeThen(iterator.return(), + () => cb(err), (e) => cb(e || err)); + } else { + cb(err); + } + }, + }); + + if (signal) { + addAbortSignalNoValidate(signal, readable); + } + + async function pump() { + try { + while (!done) { + const { value: batch, done: iterDone } = await iterator.next(); + if (iterDone) { + done = true; + readable.push(null); + return; + } + for (let i = 0; i < batch.length; i++) { + if (!readable.push(batch[i])) { + backpressure = PromiseWithResolvers(); + await backpressure.promise; + if (done) return; + } + } + } + } catch (err) { + done = true; + readable.destroy(err); + } + } + + return readable; +} + + +// ============================================================================ +// toReadableSync(source, options) -- stream/iter source (sync) -> Readable +// ============================================================================ + +/** + * Create a byte-mode Readable from an Iterable. + * Fully synchronous -- _read() pulls from the iterator directly. + * @param {Iterable} source + * @param {object} [options] + * @param {number} [options.highWaterMark] + * @returns {stream.Readable} + */ +function toReadableSync(source, options = kNullPrototype) { + if (typeof source?.[SymbolIterator] !== 'function') { + throw new ERR_INVALID_ARG_TYPE('source', 'Iterable', source); + } + + validateObject(options, 'options'); + const { + highWaterMark = 64 * 1024, + } = options; + validateInteger(highWaterMark, 'options.highWaterMark', 0); + + const ReadableCtor = lazyReadable(); + const iterator = source[SymbolIterator](); + + return new ReadableCtor({ + __proto__: null, + highWaterMark, + read() { + for (;;) { + const { value: batch, done } = iterator.next(); + if (done) { + this.push(null); + return; + } + for (let i = 0; i < batch.length; i++) { + if (!this.push(batch[i])) return; + } + } + }, + destroy(err, cb) { + if (typeof iterator.return === 'function') iterator.return(); + cb(err); + }, + }); +} + + +// ============================================================================ +// fromWritable(writable, options) -- classic Writable -> stream/iter Writer +// ============================================================================ + +// Cache: one Writer adapter per Writable instance. +const fromWritableCache = new SafeWeakMap(); + +/** + * Create a stream/iter Writer adapter from a classic Writable (or duck-type). + * + * Duck-type requirements: write() and on()/off() methods. + * Falls back to sensible defaults for missing properties like + * writableHighWaterMark, writableLength, writableObjectMode. + * @param {object} writable - A classic Writable or duck-type. + * @param {object} [options] + * @param {string} [options.backpressure] - 'strict', 'block', + * 'drop-newest'. 'drop-oldest' is not supported. + * @returns {object} A stream/iter Writer adapter. + */ +function fromWritable(writable, options = kNullPrototype) { + if (writable == null || + typeof writable.write !== 'function' || + typeof writable.on !== 'function') { + throw new ERR_INVALID_ARG_TYPE('writable', 'Writable', writable); + } + + // Return cached adapter if available. + const cached = fromWritableCache.get(writable); + if (cached !== undefined) return cached; + + validateObject(options, 'options'); + const { + backpressure = 'strict', + } = options; + validateBackpressure(backpressure); + + // The Writer interface is bytes-only. Object-mode Writables expect + // arbitrary JS values, which is incompatible. + if (writable.writableObjectMode) { + throw new ERR_INVALID_STATE( + 'Cannot create a stream/iter Writer from an object-mode Writable'); + } + + // drop-oldest is not supported for classic stream.Writable. The + // Writable's internal buffer stores individual { chunk, encoding, + // callback } entries with no concept of batch boundaries. A writev() + // call fans out into N separate buffer entries, so a subsequent + // drop-oldest eviction could partially tear apart an earlier atomic + // writev batch. The PushWriter avoids this because writev occupies a + // single slot. Supporting drop-oldest here would require either + // accepting partial writev eviction or adding batch tracking to the + // buffer -- neither is acceptable without a deeper rework of Writable + // internals. + if (backpressure === 'drop-oldest') { + throw new ERR_INVALID_ARG_VALUE('options.backpressure', backpressure, + 'drop-oldest is not supported for classic stream.Writable'); + } + + // Fall back to sensible defaults for duck-typed streams that may not + // expose the full stream.Writable property set. + const hwm = writable.writableHighWaterMark ?? 16384; + let totalBytes = 0; + + // Waiters pending on backpressure resolution (block policy only). + // Multiple un-awaited writes can each add a waiter, so this must be + // a list. A single persistent 'drain' listener and 'error' listener + // (installed once lazily) resolve or reject all waiters to avoid + // accumulating per-write listeners on the stream. + let waiters = []; + let listenersInstalled = false; + let onDrain; + let onError; + + function installListeners() { + if (listenersInstalled) return; + listenersInstalled = true; + onDrain = () => { + const pending = waiters; + waiters = []; + for (let i = 0; i < pending.length; i++) { + pending[i].resolve(); + } + }; + onError = (err) => { + const pending = waiters; + waiters = []; + for (let i = 0; i < pending.length; i++) { + pending[i].reject(err); + } + }; + writable.on('drain', onDrain); + writable.on('error', onError); + } + + // Reject all pending waiters and remove the drain/error listeners. + function cleanup(err) { + const pending = waiters; + waiters = []; + for (let i = 0; i < pending.length; i++) { + pending[i].reject(err ?? new AbortError()); + } + if (!listenersInstalled) return; + listenersInstalled = false; + writable.removeListener('drain', onDrain); + writable.removeListener('error', onError); + } + + function waitForDrain() { + const { promise, resolve, reject } = PromiseWithResolvers(); + waiters.push({ __proto__: null, resolve, reject }); + installListeners(); + return promise; + } + + function isWritable() { + // Duck-typed streams may not have these properties -- treat missing + // as false (i.e., writable is still open). + return !(writable.destroyed ?? false) && + !(writable.writableFinished ?? false) && + !(writable.writableEnded ?? false); + } + + function isFull() { + return (writable.writableLength ?? 0) >= hwm; + } + + const writer = { + __proto__: null, + + get desiredSize() { + if (!isWritable()) return null; + return MathMax(0, hwm - (writable.writableLength ?? 0)); + }, + + writeSync(chunk) { + return false; + }, + + writevSync(chunks) { + return false; + }, + + // Backpressure semantics: write() resolves when the data is accepted + // into the Writable's internal buffer, NOT when _write() has flushed + // it to the underlying resource. This matches the Writer spec -- the + // PushWriter resolves on buffer acceptance too. Classic Writable flow + // control works the same way: write rapidly until write() returns + // false, then wait for 'drain'. The _write callback is involved in + // backpressure indirectly -- 'drain' fires after callbacks drain the + // buffer below highWaterMark. Per-write errors from _write surface + // as 'error' events caught by our generic error handler, rejecting + // the next pending operation rather than the already-resolved one. + // + // The options.signal parameter from the Writer interface is ignored. + // Classic stream.Writable has no per-write abort signal support; + // cancellation should be handled at the pipeline level instead. + write(chunk) { + if (!isWritable()) { + return PromiseReject(new ERR_STREAM_WRITE_AFTER_END()); + } + + let bytes; + try { + bytes = toUint8Array(chunk); + } catch (err) { + return PromiseReject(err); + } + + if (backpressure === 'strict' && isFull()) { + return PromiseReject(new ERR_INVALID_STATE.RangeError( + 'Backpressure violation: buffer is full. ' + + 'Await each write() call to respect backpressure.')); + } + + if (backpressure === 'drop-newest' && isFull()) { + // Silently discard. Still count bytes for consistency with + // PushWriter, which counts dropped bytes in totalBytes. + totalBytes += TypedArrayPrototypeGetByteLength(bytes); + return PromiseResolve(); + } + + totalBytes += TypedArrayPrototypeGetByteLength(bytes); + const ok = writable.write(bytes); + if (ok) return PromiseResolve(); + + // backpressure === 'block' (or strict with room that filled on + // this write -- writable.write() accepted the data but returned + // false indicating the buffer is now at/over hwm). + if (backpressure === 'block') { + return waitForDrain(); + } + + // strict: the write was accepted (there was room before writing) + // but the buffer is now full. Resolve -- the *next* write will + // be rejected if the caller ignores backpressure. + return PromiseResolve(); + }, + + writev(chunks) { + if (!ArrayIsArray(chunks)) { + throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks); + } + if (!isWritable()) { + return PromiseReject(new ERR_STREAM_WRITE_AFTER_END()); + } + + if (backpressure === 'strict' && isFull()) { + return PromiseReject(new ERR_INVALID_STATE.RangeError( + 'Backpressure violation: buffer is full. ' + + 'Await each write() call to respect backpressure.')); + } + + if (backpressure === 'drop-newest' && isFull()) { + // Discard entire batch. + for (let i = 0; i < chunks.length; i++) { + totalBytes += + TypedArrayPrototypeGetByteLength(toUint8Array(chunks[i])); + } + return PromiseResolve(); + } + + if (typeof writable.cork === 'function') writable.cork(); + let ok = true; + for (let i = 0; i < chunks.length; i++) { + const bytes = toUint8Array(chunks[i]); + totalBytes += TypedArrayPrototypeGetByteLength(bytes); + ok = writable.write(bytes); + } + if (typeof writable.uncork === 'function') writable.uncork(); + + if (ok) return PromiseResolve(); + + if (backpressure === 'block') { + return waitForDrain(); + } + + return PromiseResolve(); + }, + + endSync() { + return -1; + }, + + // options.signal is ignored for the same reason as write(). + end() { + if ((writable.writableFinished ?? false) || + (writable.destroyed ?? false)) { + cleanup(); + return PromiseResolve(totalBytes); + } + + const { promise, resolve, reject } = PromiseWithResolvers(); + + if (!(writable.writableEnded ?? false)) { + writable.end(); + } + + eos(writable, { writable: true, readable: false }, (err) => { + cleanup(err); + if (err) reject(err); + else resolve(totalBytes); + }); + + return promise; + }, + + fail(reason) { + cleanup(reason); + if (typeof writable.destroy === 'function') { + writable.destroy(reason); + } + }, + + [SymbolAsyncDispose]() { + if (isWritable()) { + cleanup(); + if (typeof writable.destroy === 'function') { + writable.destroy(); + } + } + return PromiseResolve(); + }, + + [SymbolDispose]() { + if (isWritable()) { + cleanup(); + if (typeof writable.destroy === 'function') { + writable.destroy(); + } + } + }, + }; + + // drainableProtocol + writer[drainableProtocol] = function() { + if (!isWritable()) return null; + if ((writable.writableLength ?? 0) < hwm) { + return PromiseResolve(true); + } + const { promise, resolve } = PromiseWithResolvers(); + waiters.push({ + __proto__: null, + resolve() { resolve(true); }, + reject() { resolve(false); }, + }); + installListeners(); + return promise; + }; + + fromWritableCache.set(writable, writer); + return writer; +} + + +// ============================================================================ +// toWritable(writer) -- stream/iter Writer -> classic Writable +// ============================================================================ + +/** + * Create a classic stream.Writable backed by a stream/iter Writer. + * Each _write/_writev call delegates to the Writer's methods, + * attempting the sync path first (writeSync/writevSync/endSync) and + * falling back to async if the sync path returns false or throws. + * @param {object} writer - A stream/iter Writer (only write() is required). + * @returns {stream.Writable} + */ +function toWritable(writer) { + if (typeof writer?.write !== 'function') { + throw new ERR_INVALID_ARG_TYPE('writer', 'Writer', writer); + } + + const WritableCtor = lazyWritable(); + + const hasWriteSync = typeof writer.writeSync === 'function'; + const hasWritev = typeof writer.writev === 'function'; + const hasWritevSync = hasWritev && + typeof writer.writevSync === 'function'; + const hasEnd = typeof writer.end === 'function'; + const hasEndSync = hasEnd && + typeof writer.endSync === 'function'; + const hasFail = typeof writer.fail === 'function'; + + // Try-sync-first pattern: attempt the synchronous method and + // fall back to the async method if it returns false (indicating + // the sync path was not accepted) or throws. When the sync path + // succeeds, the callback is deferred via queueMicrotask to + // preserve the async resolution contract that Writable internals + // expect from _write/_writev/_final callbacks. + + function _write(chunk, encoding, cb) { + const bytes = typeof chunk === 'string' ? + Buffer.from(chunk, encoding) : chunk; + if (hasWriteSync) { + try { + if (writer.writeSync(bytes)) { + queueMicrotask(cb); + return; + } + } catch { + // Sync path threw -- fall through to async. + } + } + try { + PromisePrototypeThen(writer.write(bytes), () => cb(), cb); + } catch (err) { + cb(err); + } + } + + function _writev(entries, cb) { + const chunks = []; + for (let i = 0; i < entries.length; i++) { + const { chunk, encoding } = entries[i]; + chunks[i] = typeof chunk === 'string' ? + Buffer.from(chunk, encoding) : chunk; + } + if (hasWritevSync) { + try { + if (writer.writevSync(chunks)) { + queueMicrotask(cb); + return; + } + } catch { + // Sync path threw -- fall through to async. + } + } + try { + PromisePrototypeThen(writer.writev(chunks), () => cb(), cb); + } catch (err) { + cb(err); + } + } + + function _final(cb) { + if (!hasEnd) { + queueMicrotask(cb); + return; + } + if (hasEndSync) { + try { + const result = writer.endSync(); + if (result >= 0) { + queueMicrotask(cb); + return; + } + } catch { + // Sync path threw -- fall through to async. + } + } + try { + PromisePrototypeThen(writer.end(), () => cb(), cb); + } catch (err) { + cb(err); + } + } + + function _destroy(err, cb) { + if (err && hasFail) { + writer.fail(err); + } + cb(); + } + + const writableOptions = { + __proto__: null, + // Use MAX_SAFE_INTEGER to effectively disable the Writable's + // internal buffering. The underlying stream/iter Writer has its + // own backpressure handling; we want _write to be called + // immediately so the Writer can manage flow control directly. + highWaterMark: NumberMAX_SAFE_INTEGER, + write: _write, + final: _final, + destroy: _destroy, + }; + + if (hasWritev) { + writableOptions.writev = _writev; + } + + return new WritableCtor(writableOptions); +} + + +module.exports = { + // Shared helpers used by Readable.prototype[toAsyncStreamable] in + // readable.js to avoid duplicating the batched iterator logic. + createBatchedAsyncIterator, + normalizeBatch, + + // Public utilities exported from 'stream/iter'. + fromReadable, + fromWritable, + toReadable, + toReadableSync, + toWritable, +}; diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index f117808fa7d3b7..0533f0e3810398 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -37,7 +37,7 @@ const { } = require('internal/util/types'); const { - kTrustedSource, + kValidatedSource, toStreamable, toAsyncStreamable, } = require('internal/streams/iter/types'); @@ -484,8 +484,8 @@ function from(input) { throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input); } - // Fast path: trusted source already yields valid Uint8Array[] batches - if (input[kTrustedSource]) { + // Fast path: validated source already yields valid Uint8Array[] batches + if (input[kValidatedSource]) { return input; } @@ -538,17 +538,17 @@ function from(input) { // iteration protocols) if (typeof input[toAsyncStreamable] === 'function') { const result = input[toAsyncStreamable](); - // Synchronous trusted source (e.g. Readable batched iterator) - if (result?.[kTrustedSource]) { + // Synchronous validated source (e.g. Readable batched iterator) + if (result?.[kValidatedSource]) { return result; } return { __proto__: null, async *[SymbolAsyncIterator]() { - // The result may be a Promise. Check trusted on both the Promise + // The result may be a Promise. Check validated on both the Promise // itself (if tagged) and the resolved value. const resolved = await result; - if (resolved?.[kTrustedSource]) { + if (resolved?.[kValidatedSource]) { yield* resolved[SymbolAsyncIterator](); return; } diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 8c49941ddb54ed..df5ca2826a9ac5 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -51,7 +51,7 @@ const { } = require('internal/streams/iter/utils'); const { - kTrustedTransform, + kValidatedTransform, } = require('internal/streams/iter/types'); // ============================================================================= @@ -564,12 +564,12 @@ async function* applyStatefulAsyncTransform(source, transform, options) { } /** - * Fast path for trusted stateful transforms (e.g. compression). + * Fast path for validated stateful transforms (e.g. compression). * Skips withFlushAsync (transform handles done internally) and * skips isUint8ArrayBatch validation (transform guarantees valid output). * @yields {Uint8Array[]} */ -async function* applyTrustedStatefulAsyncTransform(source, transform, options) { +async function* applyValidatedStatefulAsyncTransform(source, transform, options) { const output = transform(source, options); for await (const batch of output) { if (batch.length > 0) { @@ -639,8 +639,8 @@ async function* createAsyncPipeline(source, transforms, signal) { statelessRun = []; } const opts = { __proto__: null, signal: transformSignal }; - if (transform[kTrustedTransform]) { - current = applyTrustedStatefulAsyncTransform( + if (transform[kValidatedTransform]) { + current = applyValidatedStatefulAsyncTransform( current, transform.transform, opts); } else { current = applyStatefulAsyncTransform( diff --git a/lib/internal/streams/iter/transform.js b/lib/internal/streams/iter/transform.js index 4cb417ed98ce32..9782f5f50ebf2c 100644 --- a/lib/internal/streams/iter/transform.js +++ b/lib/internal/streams/iter/transform.js @@ -37,7 +37,7 @@ const { } = require('internal/errors'); const { lazyDOMException } = require('internal/util'); const { isArrayBufferView, isAnyArrayBuffer } = require('internal/util/types'); -const { kTrustedTransform } = require('internal/streams/iter/types'); +const { kValidatedTransform } = require('internal/streams/iter/types'); const { checkRangesOrGetDefault, validateFiniteNumber, @@ -306,7 +306,7 @@ function createZstdHandle(mode, options, processCallback, onError) { function makeZlibTransform(createHandleFn, processFlag, finishFlag) { return { __proto__: null, - [kTrustedTransform]: true, + [kValidatedTransform]: true, transform: async function*(source, options) { const { signal } = options; diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index 9e528647ca9110..99ddc8fd582770 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -45,30 +45,30 @@ const shareSyncProtocol = SymbolFor('Stream.shareSyncProtocol'); const drainableProtocol = SymbolFor('Stream.drainableProtocol'); /** - * Internal sentinel for trusted stateful transforms. A transform object - * with [kTrustedTransform] = true signals that: + * Internal sentinel for validated stateful transforms. A transform object + * with [kValidatedTransform] = true signals that: * 1. It handles source exhaustion (done) internally - no withFlushAsync * wrapper needed. * 2. It always yields valid Uint8Array[] batches - no isUint8ArrayBatch * validation needed on each yield. * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for(). */ -const kTrustedTransform = Symbol('kTrustedTransform'); +const kValidatedTransform = Symbol('kValidatedTransform'); /** - * Internal sentinel for trusted sources. An async iterable with - * [kTrustedSource] = true signals that it already yields valid + * Internal sentinel for validated sources. An async iterable with + * [kValidatedSource] = true signals that it already yields valid * Uint8Array[] batches - no normalizeAsyncSource wrapper needed. * from() will return such sources directly, skipping all normalization. * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for(). */ -const kTrustedSource = Symbol('kTrustedSource'); +const kValidatedSource = Symbol('kValidatedSource'); module.exports = { broadcastProtocol, drainableProtocol, - kTrustedSource, - kTrustedTransform, + kValidatedSource, + kValidatedTransform, shareProtocol, shareSyncProtocol, toAsyncStreamable, diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index d9708ccdf0b902..8ee754edcad14b 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -30,15 +30,12 @@ const { ObjectKeys, ObjectSetPrototypeOf, Promise, - PromisePrototypeThen, - PromiseWithResolvers, ReflectApply, SafeSet, Symbol, SymbolAsyncDispose, SymbolAsyncIterator, SymbolFor, - SymbolIterator, SymbolSpecies, TypedArrayPrototypeSet, } = primordials; @@ -96,7 +93,6 @@ const { } = require('internal/errors'); const { validateAbortSignal, - validateInteger, validateObject, } = require('internal/validators'); @@ -1805,276 +1801,41 @@ Readable.wrap = function(src, options) { }).wrap(src); }; -// Efficient interop with the stream/iter API via toAsyncStreamable protocol. -// Provides a batched async iterator that drains the internal buffer into -// Uint8Array[] batches, avoiding the per-chunk Promise overhead of the -// standard Symbol.asyncIterator path. +// Interop with the stream/iter API via the toAsyncStreamable protocol. +// +// The batched iterator logic lives in classic.js (shared with the +// fromReadable() utility for duck-typed streams). This prototype method +// calls createBatchedAsyncIterator directly -- it must NOT call +// fromReadable() since fromReadable() checks for toAsyncStreamable, +// which would create infinite recursion. // // The flag cannot be checked at module load time (readable.js loads during // bootstrap before options are available). Instead, toAsyncStreamable is -// always defined but lazily initializes on first call - throwing if the -// flag is not set, or installing the real implementation if it is. +// always defined but lazily initializes on first call -- throwing if the +// flag is not set. { - const kNullPrototype = { __proto__: null }; const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable'); - let kTrustedSource; - let normalizeAsyncValue; - let isU8; - - // Maximum chunks to drain into a single batch. Bounds peak memory when - // _read() synchronously pushes many chunks into the buffer. - const MAX_DRAIN_BATCH = 128; - - function lazyInit() { - if (kTrustedSource !== undefined) return; - if (!getOptionValue('--experimental-stream-iter')) { - throw new ERR_STREAM_ITER_MISSING_FLAG(); - } - ({ kTrustedSource } = require('internal/streams/iter/types')); - ({ normalizeAsyncValue } = require('internal/streams/iter/from')); - ({ isUint8Array: isU8 } = require('internal/util/types')); - } - - // Normalize a batch of raw chunks from an object-mode or encoded - // Readable into Uint8Array values. Returns the normalized batch, - // or null if normalization produced no output. - async function normalizeBatch(raw) { - const batch = []; - for (let i = 0; i < raw.length; i++) { - const value = raw[i]; - if (isU8(value)) { - batch.push(value); - } else { - // normalizeAsyncValue may await for async protocols (e.g. - // toAsyncStreamable on yielded objects). Stream events during - // the suspension are queued, not lost - errors will surface - // on the next loop iteration after this yield completes. - for await (const normalized of normalizeAsyncValue(value)) { - batch.push(normalized); - } - } - } - return batch.length > 0 ? batch : null; - } + let createBatchedAsyncIterator; + let normalizeBatch; + let kValidatedSource; - // Batched async iterator for Readable streams. Same mechanism as - // createAsyncIterator (same event setup, same stream.read() to - // trigger _read(), same teardown) but drains all currently buffered - // chunks into a single Uint8Array[] batch per yield, amortizing the - // Promise/microtask cost across multiple chunks. - // - // When normalize is provided (object-mode / encoded streams), each - // drained batch is passed through it to convert chunks to Uint8Array. - // When normalize is null (byte-mode), chunks are already Buffers - // (Uint8Array subclass) and are yielded directly. - async function* createBatchedAsyncIterator(stream, normalize) { - let callback = nop; - - function next(resolve) { - if (this === stream) { - callback(); - callback = nop; - } else { - callback = resolve; - } - } - - stream.on('readable', next); - - let error; - const cleanup = eos(stream, { writable: false }, (err) => { - error = err ? aggregateTwoErrors(error, err) : null; - callback(); - callback = nop; - }); - - try { - while (true) { - const chunk = stream.destroyed ? null : stream.read(); - if (chunk !== null) { - // Drain any additional already-buffered chunks into the same - // batch. The first read() may trigger _read() which - // synchronously pushes more data into the buffer. We drain - // that buffered data without issuing unbounded _read() calls - - // once state.length hits 0 or MAX_DRAIN_BATCH is reached, we - // stop and yield what we have. - const batch = [chunk]; - while (batch.length < MAX_DRAIN_BATCH && - stream._readableState.length > 0) { - const c = stream.read(); - if (c === null) break; - batch.push(c); - } - if (normalize !== null) { - const result = await normalize(batch); - if (result !== null) { - yield result; - } - } else { - yield batch; - } - } else if (error) { - throw error; - } else if (error === null) { - return; - } else { - await new Promise(next); - } - } - } catch (err) { - error = aggregateTwoErrors(error, err); - throw error; - } finally { - if (error === undefined || stream._readableState.autoDestroy) { - destroyImpl.destroyer(stream, null); - } else { - stream.off('readable', next); - cleanup(); + Readable.prototype[toAsyncStreamable] = function() { + if (createBatchedAsyncIterator === undefined) { + if (!getOptionValue('--experimental-stream-iter')) { + throw new ERR_STREAM_ITER_MISSING_FLAG(); } + ({ + createBatchedAsyncIterator, + normalizeBatch, + } = require('internal/streams/iter/classic')); + ({ kValidatedSource } = require('internal/streams/iter/types')); } - } - - Readable.prototype[toAsyncStreamable] = function() { - lazyInit(); const state = this._readableState; const normalize = (state.objectMode || state.encoding) ? - normalizeBatch : - null; + normalizeBatch : null; const iter = createBatchedAsyncIterator(this, normalize); - iter[kTrustedSource] = true; + iter[kValidatedSource] = true; iter.stream = this; return iter; }; - - // Create a byte-mode Readable from an AsyncIterable. - // The source must yield Uint8Array[] batches (the stream/iter native - // format). Each Uint8Array in a batch is pushed as a separate chunk. - Readable.fromStreamIter = function fromStreamIter(source, options = kNullPrototype) { - lazyInit(); - if (typeof source?.[SymbolAsyncIterator] !== 'function') { - throw new ERR_INVALID_ARG_TYPE('source', 'AsyncIterable', source); - } - - validateObject(options, 'options'); - const { - highWaterMark = 64 * 1024, - signal, - } = options; - validateInteger(highWaterMark, 'options.highWaterMark', 0); - if (signal !== undefined) { - validateAbortSignal(signal, 'options.signal'); - } - - const iterator = source[SymbolAsyncIterator](); - let backpressure; - let pumping = false; - let done = false; - - const readable = new Readable({ - __proto__: null, - highWaterMark, - read() { - if (backpressure) { - const { resolve } = backpressure; - backpressure = null; - resolve(); - } else if (!pumping && !done) { - pumping = true; - pump(); - } - }, - destroy(err, cb) { - done = true; - // Wake up the pump if it's waiting on backpressure so it - // can see done === true and exit cleanly. - if (backpressure) { - backpressure.resolve(); - backpressure = null; - } - if (typeof iterator.return === 'function') { - PromisePrototypeThen(iterator.return(), - () => cb(err), (e) => cb(e || err)); - } else { - cb(err); - } - }, - }); - - if (signal) { - addAbortSignalNoValidate(signal, readable); - } - - async function pump() { - try { - while (!done) { - const { value: batch, done: iterDone } = await iterator.next(); - if (iterDone) { - done = true; - readable.push(null); - return; - } - // Individual chunks are not validated as Uint8Array here. - // The caller is responsible for providing a well-formed - // AsyncIterable source. If a non-Buffer chunk - // is pushed, the byte-mode Readable will throw internally. - for (let i = 0; i < batch.length; i++) { - if (!readable.push(batch[i])) { - // Backpressure: wait for next _read() call - backpressure = PromiseWithResolvers(); - await backpressure.promise; - if (done) return; // Destroyed while waiting - } - } - } - } catch (err) { - done = true; - readable.destroy(err); - } - } - - return readable; - }; - - // Create a byte-mode Readable from an Iterable. - // Fully synchronous - _read() pulls from the iterator directly. - // The source must yield Uint8Array[] batches. - Readable.fromStreamIterSync = function fromStreamIterSync(source, options = kNullPrototype) { - lazyInit(); - if (typeof source?.[SymbolIterator] !== 'function') { - throw new ERR_INVALID_ARG_TYPE('source', 'Iterable', source); - } - - validateObject(options, 'options'); - const { - highWaterMark = 64 * 1024, - } = options; - validateInteger(highWaterMark, 'options.highWaterMark', 0); - - const iterator = source[SymbolIterator](); - - return new Readable({ - __proto__: null, - highWaterMark, - read() { - for (;;) { - const { value: batch, done } = iterator.next(); - if (done) { - this.push(null); - return; - } - // Individual chunks are not validated as Uint8Array here. - // The caller is responsible for providing a well-formed - // Iterable source. If a non-Buffer chunk - // is pushed, the byte-mode Readable will throw internally. - for (let i = 0; i < batch.length; i++) { - if (!this.push(batch[i])) return; - } - } - }, - destroy(err, cb) { - if (typeof iterator.return === 'function') iterator.return(); - cb(err); - }, - }); - }; } diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 5850a256fc48c1..89d384e9f2fb8f 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -26,26 +26,17 @@ 'use strict'; const { - ArrayIsArray, ArrayPrototypeSlice, Error, FunctionPrototypeSymbolHasInstance, - MathMax, - NumberMAX_SAFE_INTEGER, ObjectDefineProperties, ObjectDefineProperty, ObjectSetPrototypeOf, Promise, - PromisePrototypeThen, - PromiseReject, - PromiseResolve, - PromiseWithResolvers, StringPrototypeToLowerCase, Symbol, SymbolAsyncDispose, - SymbolDispose, SymbolHasInstance, - TypedArrayPrototypeGetByteLength, } = primordials; module.exports = Writable; @@ -56,18 +47,10 @@ const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); const { eos } = require('internal/streams/end-of-stream'); -const { - validateObject, -} = require('internal/validators'); - const { addAbortSignal, } = require('internal/streams/add-abort-signal'); -const { - queueMicrotask, -} = require('internal/process/task_queues'); - const { getHighWaterMark, getDefaultHighWaterMark, @@ -76,20 +59,16 @@ const { AbortError, codes: { ERR_INVALID_ARG_TYPE, - ERR_INVALID_ARG_VALUE, - ERR_INVALID_STATE, ERR_METHOD_NOT_IMPLEMENTED, ERR_MULTIPLE_CALLBACK, ERR_STREAM_ALREADY_FINISHED, ERR_STREAM_CANNOT_PIPE, ERR_STREAM_DESTROYED, - ERR_STREAM_ITER_MISSING_FLAG, ERR_STREAM_NULL_VALUES, ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING, }, } = require('internal/errors'); -const { getOptionValue } = require('internal/options'); const { kState, // bitfields @@ -1179,438 +1158,3 @@ Writable.prototype[SymbolAsyncDispose] = async function() { eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))), ); }; - -// Interop with the stream/iter API. -// -// toStreamIterWriter() returns a Writer-shaped adapter that bridges a classic -// stream.Writable to the stream/iter Writer interface. Since all writes on -// stream.Writable are fundamentally async, the sync methods (writeSync, -// writevSync, endSync) return false / -1 to indicate they are not supported, -// deferring to the async path. -// -// Like the Readable interop, the flag cannot be checked at module load time. -// toStreamIterWriter is always defined but lazily initializes on first call. -{ - let drainableProtocol; - let validateBackpressure; - let toUint8Array; - - // Only one Writer adapter per Writable instance. The adapter holds - // mutable state (totalBytes, waiters, listeners) that would conflict - // if two adapters existed for the same underlying stream. - const kStreamIterWriter = Symbol('kStreamIterWriter'); - - function lazyInit() { - if (drainableProtocol !== undefined) return; - if (!getOptionValue('--experimental-stream-iter')) { - throw new ERR_STREAM_ITER_MISSING_FLAG(); - } - ({ drainableProtocol } = require('internal/streams/iter/types')); - ({ - validateBackpressure, - toUint8Array, - } = require('internal/streams/iter/utils')); - } - - Writable.prototype.toStreamIterWriter = - function toStreamIterWriter(options = { __proto__: null }) { - lazyInit(); - - if (this[kStreamIterWriter]) { - return this[kStreamIterWriter]; - } - - // The Writer interface is bytes-only. Object-mode Writables expect - // arbitrary JS values, which is incompatible. - if (this.writableObjectMode) { - throw new ERR_INVALID_STATE( - 'Cannot create a stream/iter Writer from an object-mode Writable'); - } - - validateObject(options, 'options'); - const { - backpressure = 'strict', - } = options; - validateBackpressure(backpressure); - - // drop-oldest is not supported for classic stream.Writable. The - // Writable's internal buffer stores individual { chunk, encoding, - // callback } entries with no concept of batch boundaries. A writev() - // call fans out into N separate buffer entries, so a subsequent - // drop-oldest eviction could partially tear apart an earlier atomic - // writev batch. The PushWriter avoids this because writev occupies a - // single slot. Supporting drop-oldest here would require either - // accepting partial writev eviction or adding batch tracking to the - // buffer -- neither is acceptable without a deeper rework of Writable - // internals. - if (backpressure === 'drop-oldest') { - throw new ERR_INVALID_ARG_VALUE('options.backpressure', backpressure, - 'drop-oldest is not supported for classic stream.Writable'); - } - - const writable = this; - const hwm = writable.writableHighWaterMark; - let totalBytes = 0; - - // Waiters pending on backpressure resolution (block policy only). - // Multiple un-awaited writes can each add a waiter, so this must be - // a list. A single persistent 'drain' listener and 'error' listener - // (installed once lazily) resolve or reject all waiters to avoid - // accumulating per-write listeners on the stream. - let waiters = []; - let listenersInstalled = false; - let onDrain; - let onError; - - function installListeners() { - if (listenersInstalled) return; - listenersInstalled = true; - onDrain = () => { - const pending = waiters; - waiters = []; - for (let i = 0; i < pending.length; i++) { - pending[i].resolve(); - } - }; - onError = (err) => { - const pending = waiters; - waiters = []; - for (let i = 0; i < pending.length; i++) { - pending[i].reject(err); - } - }; - writable.on('drain', onDrain); - writable.on('error', onError); - } - - // Reject all pending waiters and remove the drain/error listeners. - function cleanup(err) { - const pending = waiters; - waiters = []; - for (let i = 0; i < pending.length; i++) { - pending[i].reject(err ?? new AbortError()); - } - if (!listenersInstalled) return; - listenersInstalled = false; - writable.removeListener('drain', onDrain); - writable.removeListener('error', onError); - } - - function waitForDrain() { - const { promise, resolve, reject } = PromiseWithResolvers(); - waiters.push({ __proto__: null, resolve, reject }); - installListeners(); - return promise; - } - - function isWritable() { - return !writable.destroyed && - !writable.writableFinished && - !writable.writableEnded; - } - - function isFull() { - return writable.writableLength >= hwm; - } - - const writer = { - __proto__: null, - - get desiredSize() { - if (!isWritable()) return null; - return MathMax(0, hwm - writable.writableLength); - }, - - writeSync(chunk) { - return false; - }, - - writevSync(chunks) { - return false; - }, - - // Backpressure semantics: write() resolves when the data is accepted - // into the Writable's internal buffer, NOT when _write() has flushed - // it to the underlying resource. This matches the Writer spec -- the - // PushWriter resolves on buffer acceptance too. Classic Writable flow - // control works the same way: write rapidly until write() returns - // false, then wait for 'drain'. The _write callback is involved in - // backpressure indirectly -- 'drain' fires after callbacks drain the - // buffer below highWaterMark. Per-write errors from _write surface - // as 'error' events caught by our generic error handler, rejecting - // the next pending operation rather than the already-resolved one. - // - // The options.signal parameter from the Writer interface is ignored. - // Classic stream.Writable has no per-write abort signal support; - // cancellation should be handled at the pipeline level instead. - write(chunk) { - if (!isWritable()) { - return PromiseReject(new ERR_STREAM_WRITE_AFTER_END()); - } - - let bytes; - try { - bytes = toUint8Array(chunk); - } catch (err) { - return PromiseReject(err); - } - - if (backpressure === 'strict' && isFull()) { - return PromiseReject(new ERR_INVALID_STATE.RangeError( - 'Backpressure violation: buffer is full. ' + - 'Await each write() call to respect backpressure.')); - } - - if (backpressure === 'drop-newest' && isFull()) { - // Silently discard. Still count bytes for consistency with - // PushWriter, which counts dropped bytes in totalBytes. - totalBytes += TypedArrayPrototypeGetByteLength(bytes); - return PromiseResolve(); - } - - totalBytes += TypedArrayPrototypeGetByteLength(bytes); - const ok = writable.write(bytes); - if (ok) return PromiseResolve(); - - // backpressure === 'block' (or strict with room that filled on - // this write -- writable.write() accepted the data but returned - // false indicating the buffer is now at/over hwm). - if (backpressure === 'block') { - return waitForDrain(); - } - - // strict: the write was accepted (there was room before writing) - // but the buffer is now full. Resolve -- the *next* write will - // be rejected if the caller ignores backpressure. - return PromiseResolve(); - }, - - writev(chunks) { - if (!ArrayIsArray(chunks)) { - throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks); - } - if (!isWritable()) { - return PromiseReject(new ERR_STREAM_WRITE_AFTER_END()); - } - - if (backpressure === 'strict' && isFull()) { - return PromiseReject(new ERR_INVALID_STATE.RangeError( - 'Backpressure violation: buffer is full. ' + - 'Await each write() call to respect backpressure.')); - } - - if (backpressure === 'drop-newest' && isFull()) { - // Discard entire batch. - for (let i = 0; i < chunks.length; i++) { - totalBytes += - TypedArrayPrototypeGetByteLength(toUint8Array(chunks[i])); - } - return PromiseResolve(); - } - - writable.cork(); - let ok = true; - for (let i = 0; i < chunks.length; i++) { - const bytes = toUint8Array(chunks[i]); - totalBytes += TypedArrayPrototypeGetByteLength(bytes); - ok = writable.write(bytes); - } - writable.uncork(); - - if (ok) return PromiseResolve(); - - if (backpressure === 'block') { - return waitForDrain(); - } - - return PromiseResolve(); - }, - - endSync() { - return -1; - }, - - // options.signal is ignored for the same reason as write(). - end() { - if (writable.writableFinished || writable.destroyed) { - cleanup(); - return PromiseResolve(totalBytes); - } - - const { promise, resolve, reject } = PromiseWithResolvers(); - - if (!writable.writableEnded) { - writable.end(); - } - - eos(writable, { writable: true, readable: false }, (err) => { - cleanup(err); - if (err) reject(err); - else resolve(totalBytes); - }); - - return promise; - }, - - fail(reason) { - cleanup(reason); - writable.destroy(reason); - }, - - [SymbolAsyncDispose]() { - if (isWritable()) { - cleanup(); - writable.destroy(); - } - return PromiseResolve(); - }, - - [SymbolDispose]() { - if (isWritable()) { - cleanup(); - writable.destroy(); - } - }, - }; - - // drainableProtocol is set after lazyInit - writer[drainableProtocol] = function() { - if (!isWritable()) return null; - if (writable.writableLength < hwm) { - return PromiseResolve(true); - } - const { promise, resolve } = PromiseWithResolvers(); - waiters.push({ - __proto__: null, - resolve() { resolve(true); }, - reject() { resolve(false); }, - }); - installListeners(); - return promise; - }; - - this[kStreamIterWriter] = writer; - return writer; - }; - - // Create a classic stream.Writable backed by a stream/iter Writer. - // Each _write/_writev call delegates to the Writer's async methods, - // attempting the sync path first (writeSync/writevSync/endSync) and - // falling back to async if the sync path returns false or throws. - // _final calls writer.end(), _destroy calls writer.fail(). - Writable.fromStreamIter = function fromStreamIter(writer) { - lazyInit(); - - if (typeof writer?.write !== 'function') { - throw new ERR_INVALID_ARG_TYPE('writer', 'Writer', writer); - } - - const hasWriteSync = typeof writer.writeSync === 'function'; - const hasWritev = typeof writer.writev === 'function'; - const hasWritevSync = hasWritev && - typeof writer.writevSync === 'function'; - const hasEnd = typeof writer.end === 'function'; - const hasEndSync = hasEnd && - typeof writer.endSync === 'function'; - const hasFail = typeof writer.fail === 'function'; - - // Try-sync-first pattern: attempt the synchronous method and - // fall back to the async method if it returns false (indicating - // the sync path was not accepted) or throws. When the sync path - // succeeds, the callback is deferred via queueMicrotask to - // preserve the async resolution contract that Writable internals - // expect from _write/_writev/_final callbacks. - - function _write(chunk, encoding, cb) { - const bytes = typeof chunk === 'string' ? - Buffer.from(chunk, encoding) : chunk; - if (hasWriteSync) { - try { - if (writer.writeSync(bytes)) { - queueMicrotask(cb); - return; - } - } catch { - // Sync path threw -- fall through to async. - } - } - try { - PromisePrototypeThen(writer.write(bytes), () => cb(), cb); - } catch (err) { - cb(err); - } - } - - function _writev(entries, cb) { - const chunks = []; - for (let i = 0; i < entries.length; i++) { - const { chunk, encoding } = entries[i]; - chunks[i] = typeof chunk === 'string' ? - Buffer.from(chunk, encoding) : chunk; - } - if (hasWritevSync) { - try { - if (writer.writevSync(chunks)) { - queueMicrotask(cb); - return; - } - } catch { - // Sync path threw -- fall through to async. - } - } - try { - PromisePrototypeThen(writer.writev(chunks), () => cb(), cb); - } catch (err) { - cb(err); - } - } - - function _final(cb) { - if (!hasEnd) { - queueMicrotask(cb); - return; - } - if (hasEndSync) { - try { - const result = writer.endSync(); - if (result >= 0) { - queueMicrotask(cb); - return; - } - } catch { - // Sync path threw -- fall through to async. - } - } - try { - PromisePrototypeThen(writer.end(), () => cb(), cb); - } catch (err) { - cb(err); - } - } - - function _destroy(err, cb) { - if (err && hasFail) { - writer.fail(err); - } - cb(); - } - - const writableOptions = { - __proto__: null, - // Use MAX_SAFE_INTEGER to effectively disable the Writable's - // internal buffering. The underlying stream/iter Writer has its - // own backpressure handling; we want _write to be called - // immediately so the Writer can manage flow control directly. - highWaterMark: NumberMAX_SAFE_INTEGER, - write: _write, - final: _final, - destroy: _destroy, - }; - - if (hasWritev) { - writableOptions.writev = _writev; - } - - return new Writable(writableOptions); - }; -} diff --git a/lib/stream/iter.js b/lib/stream/iter.js index e77e485a7f2bd5..4cb499bb161694 100644 --- a/lib/stream/iter.js +++ b/lib/stream/iter.js @@ -50,6 +50,15 @@ const { ondrain, } = require('internal/streams/iter/consumers'); +// Classic stream interop (Node.js-specific, not part of the spec) +const { + fromReadable, + fromWritable, + toReadable, + toReadableSync, + toWritable, +} = require('internal/streams/iter/classic'); + // Multi-consumer const { broadcast, Broadcast } = require('internal/streams/iter/broadcast'); const { @@ -177,4 +186,11 @@ module.exports = { tap, tapSync, ondrain, + + // Classic stream interop + fromReadable, + fromWritable, + toReadable, + toReadableSync, + toWritable, }; diff --git a/test/parallel/test-stream-iter-readable-interop-disabled.js b/test/parallel/test-stream-iter-readable-interop-disabled.js index b2dbcc2df67874..86c8cc141387cd 100644 --- a/test/parallel/test-stream-iter-readable-interop-disabled.js +++ b/test/parallel/test-stream-iter-readable-interop-disabled.js @@ -38,35 +38,20 @@ async function testToAsyncStreamableWithFlag() { assert.strictEqual(code, 0); } -async function testFromStreamIterWithoutFlag() { - const { stderr, code } = await spawnPromisified(process.execPath, [ - '-e', - ` - const { Readable } = require('stream'); - async function* gen() { yield [Buffer.from('x')]; } - Readable.fromStreamIter(gen()); - `, - ]); - assert.notStrictEqual(code, 0); - assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/); -} - -async function testFromStreamIterSyncWithoutFlag() { - const { stderr, code } = await spawnPromisified(process.execPath, [ +async function testStreamIterModuleWithoutFlag() { + // Requiring 'stream/iter' without the flag should not be possible + // since the module is gated behind --experimental-stream-iter. + const { code } = await spawnPromisified(process.execPath, [ '-e', ` - const { Readable } = require('stream'); - function* gen() { yield [Buffer.from('x')]; } - Readable.fromStreamIterSync(gen()); + require('stream/iter'); `, ]); assert.notStrictEqual(code, 0); - assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/); } Promise.all([ testToAsyncStreamableWithoutFlag(), testToAsyncStreamableWithFlag(), - testFromStreamIterWithoutFlag(), - testFromStreamIterSyncWithoutFlag(), + testStreamIterModuleWithoutFlag(), ]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-readable-interop.js b/test/parallel/test-stream-iter-readable-interop.js index f201dab4f5642a..8100b54168be87 100644 --- a/test/parallel/test-stream-iter-readable-interop.js +++ b/test/parallel/test-stream-iter-readable-interop.js @@ -2,7 +2,7 @@ 'use strict'; // Tests for classic Readable stream interop with the stream/iter API -// via the toAsyncStreamable protocol and kTrustedSource optimization. +// via the toAsyncStreamable protocol and kValidatedSource optimization. const common = require('../common'); const assert = require('assert'); @@ -115,13 +115,13 @@ async function testBatchingBehavior() { } // ============================================================================= -// Byte-mode Readable: kTrustedSource is set +// Byte-mode Readable: kValidatedSource is set // ============================================================================= function testTrustedSourceByteMode() { const readable = new Readable({ read() {} }); const result = readable[toAsyncStreamable](); - // kTrustedSource is a private symbol, but we can verify the result + // kValidatedSource is a private symbol, but we can verify the result // is used directly by from() without wrapping by checking it has // Symbol.asyncIterator assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); @@ -290,7 +290,7 @@ async function testObjectModeToStreamable() { } // ============================================================================= -// Object-mode Readable: kTrustedSource is set +// Object-mode Readable: kValidatedSource is set // ============================================================================= function testTrustedSourceObjectMode() { @@ -589,14 +589,14 @@ async function testAbortSignal() { } // ============================================================================= -// kTrustedSource identity - from() returns same object for trusted sources +// kValidatedSource identity - from() returns same object for validated sources // ============================================================================= function testTrustedSourceIdentity() { const readable = new Readable({ read() {} }); const iter = readable[toAsyncStreamable](); - // from() should return the trusted iterator directly (same reference), + // from() should return the validated iterator directly (same reference), // not wrap it in another generator const result = from(iter); assert.strictEqual(result, iter); diff --git a/test/parallel/test-stream-iter-to-readable.js b/test/parallel/test-stream-iter-to-readable.js index a531df0d6a6852..4cb5600e3ba424 100644 --- a/test/parallel/test-stream-iter-to-readable.js +++ b/test/parallel/test-stream-iter-to-readable.js @@ -1,17 +1,19 @@ // Flags: --experimental-stream-iter 'use strict'; -// Tests for Readable.fromStreamIter() and Readable.fromStreamIterSync() -// which create byte-mode Readable streams from stream/iter sources. +// Tests for toReadable() and toReadableSync() which create byte-mode +// Readable streams from stream/iter sources. const common = require('../common'); const assert = require('assert'); -const { Readable, Writable } = require('stream'); +const { Writable } = require('stream'); const { from, fromSync, pull, text, + toReadable, + toReadableSync, } = require('stream/iter'); function collect(readable) { @@ -29,7 +31,7 @@ function collect(readable) { async function testBasicAsync() { const source = from('hello world'); - const readable = Readable.fromStreamIter(source); + const readable = toReadable(source); assert.strictEqual(readable.readableObjectMode, false); @@ -52,7 +54,7 @@ async function testMultiBatchAsync() { } } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); const result = await collect(readable); assert.strictEqual(result.toString(), '0-0 0-1 0-2 1-0 1-1 1-2 2-0 2-1 2-2 3-0 3-1 3-2 4-0 4-1 4-2 '); @@ -71,7 +73,7 @@ async function testBackpressureAsync() { } } - const readable = Readable.fromStreamIter(gen(), { highWaterMark: 1 }); + const readable = toReadable(gen(), { highWaterMark: 1 }); // Read one chunk at a time with delays to exercise backpressure const chunks = []; @@ -93,7 +95,7 @@ async function testErrorAsync() { throw new Error('source failed'); } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); await assert.rejects(async () => { // eslint-disable-next-line no-unused-vars @@ -112,7 +114,7 @@ async function testEmptyAsync() { // yields nothing } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); const result = await collect(readable); assert.strictEqual(result.length, 0); } @@ -128,7 +130,7 @@ async function testEmptyBatchAsync() { yield []; } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); const result = await collect(readable); assert.strictEqual(result.toString(), 'real data'); } @@ -149,7 +151,7 @@ async function testDestroyAsync() { } } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); // Read a couple chunks then destroy const chunks = []; @@ -185,7 +187,7 @@ async function testDestroyDuringBackpressure() { } } - const readable = Readable.fromStreamIter(gen(), { highWaterMark: 1 }); + const readable = toReadable(gen(), { highWaterMark: 1 }); // Read one chunk to start the pump, then destroy while it's waiting const chunk = await new Promise((resolve) => { @@ -218,7 +220,7 @@ async function testLargeDataAsync() { } } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); const result = await collect(readable); assert.strictEqual(result.length, totalSize); assert.strictEqual(result[0], 0x42); @@ -231,7 +233,7 @@ async function testLargeDataAsync() { async function testPipeAsync() { const source = from('pipe test data'); - const readable = Readable.fromStreamIter(source); + const readable = toReadable(source); const chunks = []; const writable = new Writable({ @@ -256,7 +258,7 @@ async function testPipeAsync() { function testNotObjectMode() { async function* gen() { yield [Buffer.from('x')]; } - const readable = Readable.fromStreamIter(gen()); + const readable = toReadable(gen()); assert.strictEqual(readable.readableObjectMode, false); } @@ -265,11 +267,11 @@ function testNotObjectMode() { // ============================================================================= function testInvalidSourceAsync() { - assert.throws(() => Readable.fromStreamIter(42), + assert.throws(() => toReadable(42), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIter('not iterable'), + assert.throws(() => toReadable('not iterable'), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIter(null), + assert.throws(() => toReadable(null), { code: 'ERR_INVALID_ARG_TYPE' }); } @@ -285,7 +287,7 @@ async function testSignalAsync() { } const ac = new AbortController(); - const readable = Readable.fromStreamIter(gen(), { signal: ac.signal }); + const readable = toReadable(gen(), { signal: ac.signal }); const chunks = []; await assert.rejects(async () => { @@ -311,7 +313,7 @@ async function testSignalAlreadyAborted() { const ac = new AbortController(); ac.abort(); - const readable = Readable.fromStreamIter(gen(), { signal: ac.signal }); + const readable = toReadable(gen(), { signal: ac.signal }); await assert.rejects(async () => { // eslint-disable-next-line no-unused-vars @@ -330,37 +332,37 @@ function testOptionsValidationAsync() { async function* gen() { yield [Buffer.from('x')]; } // Options must be an object - assert.throws(() => Readable.fromStreamIter(gen(), 'bad'), + assert.throws(() => toReadable(gen(), 'bad'), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIter(gen(), 42), + assert.throws(() => toReadable(gen(), 42), { code: 'ERR_INVALID_ARG_TYPE' }); // highWaterMark must be a non-negative integer - assert.throws(() => Readable.fromStreamIter(gen(), { highWaterMark: -1 }), + assert.throws(() => toReadable(gen(), { highWaterMark: -1 }), { code: 'ERR_OUT_OF_RANGE' }); - assert.throws(() => Readable.fromStreamIter(gen(), { highWaterMark: 'big' }), + assert.throws(() => toReadable(gen(), { highWaterMark: 'big' }), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIter(gen(), { highWaterMark: 1.5 }), + assert.throws(() => toReadable(gen(), { highWaterMark: 1.5 }), { code: 'ERR_OUT_OF_RANGE' }); // Signal must be an AbortSignal - assert.throws(() => Readable.fromStreamIter(gen(), { signal: 'not a signal' }), + assert.throws(() => toReadable(gen(), { signal: 'not a signal' }), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIter(gen(), { signal: 42 }), + assert.throws(() => toReadable(gen(), { signal: 42 }), { code: 'ERR_INVALID_ARG_TYPE' }); // Valid options should work - const r = Readable.fromStreamIter(gen(), { highWaterMark: 0 }); + const r = toReadable(gen(), { highWaterMark: 0 }); assert.strictEqual(r.readableHighWaterMark, 0); r.destroy(); // Default highWaterMark should be 16KB - const r2 = Readable.fromStreamIter(gen()); + const r2 = toReadable(gen()); assert.strictEqual(r2.readableHighWaterMark, 64 * 1024); r2.destroy(); // Explicit undefined options should work (uses defaults) - const r3 = Readable.fromStreamIter(gen(), undefined); + const r3 = toReadable(gen(), undefined); assert.strictEqual(r3.readableHighWaterMark, 64 * 1024); r3.destroy(); } @@ -383,7 +385,7 @@ async function testWithTransformAsync() { } const source = pull(from('hello world'), upper); - const readable = Readable.fromStreamIter(source); + const readable = toReadable(source); const result = await collect(readable); assert.strictEqual(result.toString(), 'HELLO WORLD'); } @@ -394,7 +396,7 @@ async function testWithTransformAsync() { async function testBasicSync() { const source = fromSync('sync hello'); - const readable = Readable.fromStreamIterSync(source); + const readable = toReadableSync(source); assert.strictEqual(readable.readableObjectMode, false); @@ -408,7 +410,7 @@ async function testBasicSync() { function testSyncRead() { const source = fromSync('immediate'); - const readable = Readable.fromStreamIterSync(source); + const readable = toReadableSync(source); // Synchronous read should return data right away const chunk = readable.read(); @@ -427,7 +429,7 @@ async function testBackpressureSync() { } } - const readable = Readable.fromStreamIterSync(gen(), { highWaterMark: 1 }); + const readable = toReadableSync(gen(), { highWaterMark: 1 }); const chunks = []; for await (const chunk of readable) { @@ -447,7 +449,7 @@ async function testErrorSync() { throw new Error('sync source failed'); } - const readable = Readable.fromStreamIterSync(gen()); + const readable = toReadableSync(gen()); await assert.rejects(async () => { // eslint-disable-next-line no-unused-vars @@ -466,7 +468,7 @@ function testEmptySync() { // yields nothing } - const readable = Readable.fromStreamIterSync(gen()); + const readable = toReadableSync(gen()); const result = readable.read(); assert.strictEqual(result, null); } @@ -487,7 +489,7 @@ async function testDestroySync() { } } - const readable = Readable.fromStreamIterSync(gen()); + const readable = toReadableSync(gen()); readable.read(); // Start iteration readable.destroy(); @@ -501,7 +503,7 @@ async function testDestroySync() { function testNotObjectModeSync() { function* gen() { yield [Buffer.from('x')]; } - const readable = Readable.fromStreamIterSync(gen()); + const readable = toReadableSync(gen()); assert.strictEqual(readable.readableObjectMode, false); } @@ -510,9 +512,9 @@ function testNotObjectModeSync() { // ============================================================================= function testInvalidSourceSync() { - assert.throws(() => Readable.fromStreamIterSync(42), + assert.throws(() => toReadableSync(42), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIterSync(null), + assert.throws(() => toReadableSync(null), { code: 'ERR_INVALID_ARG_TYPE' }); } @@ -524,26 +526,26 @@ function testOptionsValidationSync() { function* gen() { yield [Buffer.from('x')]; } // Options must be an object - assert.throws(() => Readable.fromStreamIterSync(gen(), 'bad'), + assert.throws(() => toReadableSync(gen(), 'bad'), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIterSync(gen(), 42), + assert.throws(() => toReadableSync(gen(), 42), { code: 'ERR_INVALID_ARG_TYPE' }); // highWaterMark must be a non-negative integer - assert.throws(() => Readable.fromStreamIterSync(gen(), { highWaterMark: -1 }), + assert.throws(() => toReadableSync(gen(), { highWaterMark: -1 }), { code: 'ERR_OUT_OF_RANGE' }); - assert.throws(() => Readable.fromStreamIterSync(gen(), { highWaterMark: 'big' }), + assert.throws(() => toReadableSync(gen(), { highWaterMark: 'big' }), { code: 'ERR_INVALID_ARG_TYPE' }); - assert.throws(() => Readable.fromStreamIterSync(gen(), { highWaterMark: 1.5 }), + assert.throws(() => toReadableSync(gen(), { highWaterMark: 1.5 }), { code: 'ERR_OUT_OF_RANGE' }); // Valid options should work - const r = Readable.fromStreamIterSync(gen(), { highWaterMark: 0 }); + const r = toReadableSync(gen(), { highWaterMark: 0 }); assert.strictEqual(r.readableHighWaterMark, 0); r.destroy(); // Default highWaterMark should be 16KB - const r2 = Readable.fromStreamIterSync(gen()); + const r2 = toReadableSync(gen()); assert.strictEqual(r2.readableHighWaterMark, 64 * 1024); r2.destroy(); } @@ -557,7 +559,7 @@ async function testRoundTrip() { // stream/iter -> classic Readable const source = from(original); - const readable = Readable.fromStreamIter(source); + const readable = toReadable(source); // classic Readable -> stream/iter (via toAsyncStreamable) const result = await text(from(readable)); @@ -575,7 +577,7 @@ async function testRoundTripWithCompression() { // Compress via stream/iter, bridge to classic Readable const compressed = pull(from(original), compressGzip()); - const readable = Readable.fromStreamIter(compressed); + const readable = toReadable(compressed); // Classic Readable back to stream/iter for decompression const result = await text(pull(from(readable), decompressGzip())); diff --git a/test/parallel/test-stream-iter-writable-from.js b/test/parallel/test-stream-iter-writable-from.js index ec4d059ca3287e..fd922c5cf99537 100644 --- a/test/parallel/test-stream-iter-writable-from.js +++ b/test/parallel/test-stream-iter-writable-from.js @@ -1,15 +1,15 @@ // Flags: --experimental-stream-iter 'use strict'; -// Tests for Writable.fromStreamIter() - creating a classic stream.Writable +// Tests for toWritable() - creating a classic stream.Writable // backed by a stream/iter Writer. const common = require('../common'); const assert = require('assert'); -const { Writable } = require('stream'); const { push, text, + toWritable, } = require('stream/iter'); // ============================================================================= @@ -18,7 +18,7 @@ const { async function testBasicWrite() { const { writer, readable } = push({ backpressure: 'block' }); - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); writable.write('hello'); writable.write(' world'); @@ -44,7 +44,7 @@ async function testWriteDelegatesToWriter() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve, reject) => { writable.write('hello', (err) => { @@ -77,7 +77,7 @@ async function testWritevDelegation() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); // Cork to batch writes, then uncork to trigger _writev writable.cork(); @@ -101,7 +101,7 @@ function testNoWritevWithoutWriterWritev() { write(chunk) { return Promise.resolve(); }, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); // The _writev should be null (Writable default) when writer lacks writev assert.strictEqual(writable._writev, null); } @@ -127,7 +127,7 @@ async function testWriteSyncFirst() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => { writable.write('test', resolve); @@ -158,7 +158,7 @@ async function testWriteSyncFallback() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => { writable.write('test', resolve); @@ -189,7 +189,7 @@ async function testEndSyncFirst() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => writable.end(resolve)); @@ -218,7 +218,7 @@ async function testEndSyncFallback() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => writable.end(resolve)); @@ -241,7 +241,7 @@ async function testFinalDelegatesToEnd() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => writable.end(resolve)); @@ -260,7 +260,7 @@ async function testDestroyDelegatesToFail() { fail(reason) { failReason = reason; }, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); writable.on('error', () => {}); // Prevent unhandled const testErr = new Error('destroy test'); @@ -285,7 +285,7 @@ async function testWriteErrorPropagation() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await assert.rejects(new Promise((resolve, reject) => { writable.write('data', (err) => { @@ -301,20 +301,20 @@ async function testWriteErrorPropagation() { function testInvalidWriterThrows() { assert.throws( - () => Writable.fromStreamIter(null), + () => toWritable(null), { code: 'ERR_INVALID_ARG_TYPE' }, ); assert.throws( - () => Writable.fromStreamIter({}), + () => toWritable({}), { code: 'ERR_INVALID_ARG_TYPE' }, ); assert.throws( - () => Writable.fromStreamIter('not a writer'), + () => toWritable('not a writer'), { code: 'ERR_INVALID_ARG_TYPE' }, ); // Object with write is valid (only write is required). // This should not throw. - Writable.fromStreamIter({ + toWritable({ write() { return Promise.resolve(); }, }); } @@ -325,7 +325,7 @@ function testInvalidWriterThrows() { async function testRoundTrip() { const { writer, readable } = push({ backpressure: 'block' }); - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); const data = 'round trip test data'; writable.write(data); @@ -341,7 +341,7 @@ async function testRoundTrip() { async function testSequentialWrites() { const { writer, readable } = push({ backpressure: 'block' }); - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); for (let i = 0; i < 10; i++) { writable.write(`chunk${i}`); @@ -374,7 +374,7 @@ async function testSyncCallbackDeferred() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); const p = new Promise((resolve) => { writable.write('test', () => { @@ -403,7 +403,7 @@ async function testMinimalWriter() { // No end, fail, writeSync, writev, etc. }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => { writable.write('minimal'); @@ -424,7 +424,7 @@ async function testDestroyWithoutError() { fail() { failCalled = true; }, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); writable.destroy(); await new Promise((resolve) => setTimeout(resolve, 10)); @@ -443,7 +443,7 @@ async function testDestroyWithError() { fail(reason) { failReason = reason; }, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); writable.on('error', () => {}); const err = new Error('test'); @@ -464,7 +464,7 @@ async function testDestroyWithoutFail() { // No fail method }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); writable.on('error', () => {}); // Should not throw even though writer has no fail() @@ -485,7 +485,7 @@ function testHighWaterMarkIsMaxSafeInt() { // HWM is set to MAX_SAFE_INTEGER to disable Writable's internal // buffering. The underlying Writer manages backpressure directly. - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); assert.strictEqual(writable.writableHighWaterMark, Number.MAX_SAFE_INTEGER); } @@ -508,7 +508,7 @@ async function testWriteSyncThrowsFallback() { fail() {}, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await new Promise((resolve) => { writable.write('test', resolve); @@ -529,7 +529,7 @@ async function testWriteThrowsSyncPropagation() { }, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); await assert.rejects(new Promise((resolve, reject) => { writable.write('data', (err) => { @@ -552,7 +552,7 @@ async function testEndThrowsSyncPropagation() { }, }; - const writable = Writable.fromStreamIter(writer); + const writable = toWritable(writer); writable.on('error', () => {}); await new Promise((resolve) => { diff --git a/test/parallel/test-stream-iter-writable-interop.js b/test/parallel/test-stream-iter-writable-interop.js index bdc9d37a5c88b3..8a2ead0d0ee579 100644 --- a/test/parallel/test-stream-iter-writable-interop.js +++ b/test/parallel/test-stream-iter-writable-interop.js @@ -2,27 +2,25 @@ 'use strict'; // Tests for classic Writable stream interop with the stream/iter API -// via toStreamIterWriter(). +// via fromWritable(). const common = require('../common'); const assert = require('assert'); const { Writable } = require('stream'); const { from, + fromWritable, pipeTo, text, ondrain, } = require('stream/iter'); // ============================================================================= -// toStreamIterWriter() is present on Writable.prototype +// fromWritable() is exported from stream/iter // ============================================================================= -function testMethodExists() { - assert.strictEqual(typeof Writable.prototype.toStreamIterWriter, 'function'); - - const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - assert.strictEqual(typeof writable.toStreamIterWriter, 'function'); +function testFunctionExists() { + assert.strictEqual(typeof fromWritable, 'function'); } // ============================================================================= @@ -35,7 +33,7 @@ async function testDefaultIsStrict() { write(chunk, encoding, cb) { cb(); }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); // Should work fine when buffer has room await writer.write('hello'); await writer.end(); @@ -54,7 +52,7 @@ async function testBasicWrite() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); await pipeTo(from('hello world'), writer); assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world'); @@ -74,7 +72,7 @@ async function testWriteNoDrain() { }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); await writer.write('hello'); await writer.write(' world'); await writer.end(); @@ -97,7 +95,7 @@ async function testBlockWaitsForDrain() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); await writer.write('a'); await writer.write('b'); @@ -119,7 +117,7 @@ async function testBlockErrorRejectsPendingWrite() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); // First write fills the buffer, waits for drain const writePromise = writer.write('data that will block'); @@ -142,7 +140,7 @@ async function testStrictRejectsWhenFull() { }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); // First write fills the buffer (5 bytes = hwm) await writer.write('12345'); @@ -166,7 +164,7 @@ async function testStrictWritevRejectsWhenFull() { }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); // Fill buffer await writer.write('12345'); @@ -195,7 +193,7 @@ async function testDropNewestDiscards() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'drop-newest' }); + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); // First write fills the buffer await writer.write('12345'); @@ -222,7 +220,7 @@ async function testDropNewestWritevDiscards() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'drop-newest' }); + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); // Fill buffer await writer.write('12345'); @@ -248,7 +246,7 @@ async function testDropNewestCountsBytes() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'drop-newest' }); + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); await writer.write('12345'); // 5 bytes, accepted await writer.write('67890'); // 5 bytes, dropped @@ -264,7 +262,7 @@ async function testDropNewestCountsBytes() { function testDropOldestThrows() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); assert.throws( - () => writable.toStreamIterWriter({ backpressure: 'drop-oldest' }), + () => fromWritable(writable, { backpressure: 'drop-oldest' }), { code: 'ERR_INVALID_ARG_VALUE' }, ); } @@ -276,7 +274,7 @@ function testDropOldestThrows() { function testInvalidBackpressureThrows() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); assert.throws( - () => writable.toStreamIterWriter({ backpressure: 'invalid' }), + () => fromWritable(writable, { backpressure: 'invalid' }), { code: 'ERR_INVALID_ARG_VALUE' }, ); } @@ -301,7 +299,7 @@ async function testWritev() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); await writer.writev([ new TextEncoder().encode('hello'), new TextEncoder().encode(' '), @@ -318,7 +316,7 @@ async function testWritev() { function testSyncMethodsReturnFalse() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); assert.strictEqual(writer.writeSync(new Uint8Array(1)), false); assert.strictEqual(writer.writevSync([new Uint8Array(1)]), false); @@ -330,7 +328,7 @@ function testSyncMethodsReturnFalse() { function testEndSyncReturnsNegativeOne() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); assert.strictEqual(writer.endSync(), -1); } @@ -344,7 +342,7 @@ async function testEndReturnsByteCount() { write(chunk, encoding, cb) { cb(); }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); await writer.write('hello'); // 5 bytes await writer.write(' world'); // 6 bytes const total = await writer.end(); @@ -359,7 +357,7 @@ async function testEndReturnsByteCount() { async function testFail() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); writable.on('error', () => {}); // Prevent unhandled error - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); writer.fail(new Error('test fail')); @@ -378,7 +376,7 @@ function testDesiredSize() { }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); assert.strictEqual(writer.desiredSize, 100); } @@ -388,7 +386,7 @@ function testDesiredSize() { function testDesiredSizeNull() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); writable.destroy(); assert.strictEqual(writer.desiredSize, null); @@ -404,7 +402,7 @@ async function testDrainableNoPressure() { write(chunk, enc, cb) { cb(); }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); const result = await ondrain(writer); assert.strictEqual(result, true); } @@ -415,7 +413,7 @@ async function testDrainableNoPressure() { function testDrainableNull() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); writable.destroy(); assert.strictEqual(ondrain(writer), null); @@ -427,7 +425,7 @@ function testDrainableNull() { async function testWriteAfterEnd() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); await writer.end(); @@ -450,7 +448,7 @@ async function testSequentialWrites() { }, }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); for (let i = 0; i < 10; i++) { await writer.write(`chunk${i}`); @@ -483,7 +481,7 @@ async function testPipeToWithTransform() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); await pipeTo(from('hello via transform'), compressGzip(), writer); const decompressed = await text( @@ -498,7 +496,7 @@ async function testPipeToWithTransform() { async function testDispose() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); writer[Symbol.dispose](); assert.ok(writable.destroyed); @@ -506,7 +504,7 @@ async function testDispose() { async function testAsyncDispose() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); await writer[Symbol.asyncDispose](); assert.ok(writable.destroyed); @@ -518,7 +516,7 @@ async function testAsyncDispose() { async function testWriteInvalidChunkType() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); await assert.rejects( writer.write(42), @@ -540,7 +538,7 @@ async function testWriteInvalidChunkType() { function testWritevInvalidChunksType() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer = writable.toStreamIterWriter(); + const writer = fromWritable(writable); assert.throws( () => writer.writev('not an array'), @@ -558,8 +556,8 @@ function testWritevInvalidChunksType() { function testCachedWriter() { const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); - const writer1 = writable.toStreamIterWriter(); - const writer2 = writable.toStreamIterWriter(); + const writer1 = fromWritable(writable); + const writer2 = fromWritable(writable); assert.strictEqual(writer1, writer2); } @@ -577,7 +575,7 @@ async function testFailRejectsPendingWaiters() { }); writable.on('error', () => {}); // Prevent unhandled error - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); // This write will block on drain const writePromise = writer.write('blocked data'); @@ -600,7 +598,7 @@ async function testDisposeRejectsPendingWaiters() { }, }); - const writer = writable.toStreamIterWriter({ backpressure: 'block' }); + const writer = fromWritable(writable, { backpressure: 'block' }); // This write will block on drain const writePromise = writer.write('blocked data'); @@ -614,7 +612,7 @@ async function testDisposeRejectsPendingWaiters() { // Run all tests // ============================================================================= -testMethodExists(); +testFunctionExists(); testSyncMethodsReturnFalse(); // ============================================================================= // Object-mode Writable throws @@ -626,12 +624,12 @@ function testObjectModeThrows() { write(chunk, enc, cb) { cb(); }, }); assert.throws( - () => writable.toStreamIterWriter(), + () => fromWritable(writable), { code: 'ERR_INVALID_STATE' }, ); } -testMethodExists(); +testFunctionExists(); testSyncMethodsReturnFalse(); testEndSyncReturnsNegativeOne(); testDesiredSize();