From 8f92a1d5e35ae91d775a468faf70e1447d07179b Mon Sep 17 00:00:00 2001 From: lete114 Date: Tue, 26 May 2026 21:24:55 +0800 Subject: [PATCH] fix: strengthen isInvokeOptions guard, add chunk timeout and version validation --- README.md | 6 +++++ src/chunk.ts | 13 ++++++++++ src/ipc.ts | 25 ++++++++++++++++--- src/types.ts | 5 ++++ src/utils.ts | 25 ++++++++++++++++--- .../tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts | 3 ++- .../tsnapi/@mcbe-mods/ipc/index.snapshot.js | 3 ++- test/ipc.test.ts | 4 +-- 8 files changed, 74 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index eb7f4c5..97248d4 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,12 @@ interface IPCOptions { * @default 30_000 */ invokeTimeout?: number + /** + * Max time (ms) a partially-assembled chunked packet is kept + * in the buffer. 0 disables timeout. + * @default 30_000 + */ + chunkTimeout?: number } ``` diff --git a/src/chunk.ts b/src/chunk.ts index 5f9270c..07943f8 100644 --- a/src/chunk.ts +++ b/src/chunk.ts @@ -5,6 +5,7 @@ interface PendingPacket { received: number total: number compressed: boolean + timestamp: number } /** @@ -53,14 +54,25 @@ export class Chunker { * Feed a received chunk to the reassembly buffer. * Returns `{ done: true }` with the full data once all fragments have arrived. * @param chunk - The incoming chunk fragment + * @param maxAge - Discard pending packets older than this many ms (0 = no timeout) */ assemble( chunk: Chunk, + maxAge = 0, ): { done: false } | { done: true, data: string, compressed: boolean } { if (chunk.total <= 0) { return { done: false } } + if (maxAge > 0) { + const cutoff = Date.now() - maxAge + for (const [key, p] of this.#buffer) { + if (p.timestamp < cutoff) { + this.#buffer.delete(key) + } + } + } + let pending = this.#buffer.get(chunk.id) if (!pending) { @@ -69,6 +81,7 @@ export class Chunker { received: 0, total: chunk.total, compressed: chunk.compressed === true, + timestamp: Date.now(), } this.#buffer.set(chunk.id, pending) } diff --git a/src/ipc.ts b/src/ipc.ts index 1061aa9..33f32bd 100644 --- a/src/ipc.ts +++ b/src/ipc.ts @@ -26,6 +26,7 @@ const DEFAULT_OPTIONS: Required = { compressThreshold: 800, maxPacketSize: 1_000_000, invokeTimeout: 30_000, + chunkTimeout: 30_000, } /** @@ -213,7 +214,7 @@ export class IPC { options?: InvokeOptions, ): Promise { if (dataOrOptions === undefined) { - return this.#invokeImpl(channel) + return this.#invokeImpl(channel, undefined, options) } if (isInvokeOptions(dataOrOptions)) { @@ -350,7 +351,10 @@ export class IPC { if (systemDomain === SYSTEM_DOMAINS.RESPONSE) { const parsed = JSON.parse(payload) as Packet | Chunk if ('version' in parsed) { - this.#responses.emit(`${SYSTEM_EVENTS.INVOKE_RESPONSE}:${route}`, (parsed as Packet).data) + const packet = parsed as Packet + if (!this.#checkVersion(packet)) + return + this.#responses.emit(`${SYSTEM_EVENTS.INVOKE_RESPONSE}:${route}`, packet.data) } else if ('seq' in parsed) { this.#handleChunk(parsed as Chunk, systemDomain, route) @@ -379,7 +383,20 @@ export class IPC { } } + #checkVersion(packet: Packet): boolean { + if (packet.version !== PROTOCOL_VERSION) { + this.events.emit(EVENTS.ERROR, new Error( + `Protocol version mismatch: expected ${PROTOCOL_VERSION}, got ${packet.version} [channel: ${packet.channel}]`, + )) + return false + } + return true + } + #handleDirectPacket(packet: Packet): void { + if (!this.#checkVersion(packet)) + return + const { channel, data, id } = packet // Packet was sent by this instance itself (loopback via ScriptEvent) @@ -422,7 +439,7 @@ export class IPC { } #handleChunk(chunk: Chunk, systemDomain: string, route: string): void { - const result = this.#chunker.assemble(chunk) + const result = this.#chunker.assemble(chunk, this.#options.chunkTimeout) if (result.done) { const raw = this.#compressor.decompress(result.data, result.compressed) @@ -435,6 +452,8 @@ export class IPC { return } if (systemDomain === SYSTEM_DOMAINS.RESPONSE) { + if (!this.#checkVersion(packet)) + return this.#responses.emit(`${SYSTEM_EVENTS.INVOKE_RESPONSE}:${route}`, packet.data) } else { diff --git a/src/types.ts b/src/types.ts index d419565..44c7e0f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -66,6 +66,11 @@ export interface IPCOptions { maxPacketSize?: number /** Default timeout for {@link IPC.invoke} in milliseconds. 0 disables timeout. @default 30_000 */ invokeTimeout?: number + /** + * Maximum time (ms) a partially-assembled chunked packet is kept in the buffer + * before being discarded. 0 disables timeout. @default 30_000 + */ + chunkTimeout?: number } /** diff --git a/src/utils.ts b/src/utils.ts index 2bf3e91..b3678c2 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -13,8 +13,27 @@ export function generateId(): string { return r + c } -/** Type guard: checks whether an unknown value is an {@link InvokeOptions} object. */ +/** + * Type guard: checks whether an unknown value is an {@link InvokeOptions} object. + * Uses a stricter check — requires `serializer` or `deserializer` to have + * an actual `serialize` / `deserialize` method. The plain `timeout` key is + * no longer sufficient on its own to avoid false-positives when a data + * payload happens to contain that property name. + * + * If you only need to pass a timeout without data, use the three-argument form: + * `invoke(channel, undefined, { timeout })`. + */ export function isInvokeOptions(obj: unknown): obj is InvokeOptions { - return typeof obj === 'object' && obj !== null - && ('timeout' in obj || 'serializer' in obj || 'deserializer' in obj) + if (typeof obj !== 'object' || obj === null) + return false + const o = obj as Record + if ('serializer' in o && typeof o.serializer === 'object' && o.serializer !== null) { + if (typeof (o.serializer as Record).serialize === 'function') + return true + } + if ('deserializer' in o && typeof o.deserializer === 'object' && o.deserializer !== null) { + if (typeof (o.deserializer as Record).deserialize === 'function') + return true + } + return false } diff --git a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts index 44fa10d..6903b13 100644 --- a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts +++ b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts @@ -36,6 +36,7 @@ export interface IPCOptions { compressThreshold?: number; maxPacketSize?: number; invokeTimeout?: number; + chunkTimeout?: number; } export interface OnOptions { deserializer?: Deserializer; @@ -63,7 +64,7 @@ export declare class Chunker { #private; constructor(_: number); split(_: string, _: string, _: boolean): Chunk[]; - assemble(_: Chunk): { + assemble(_: Chunk, _?: number): { done: false; } | { done: true; diff --git a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js index c22851f..2404e9f 100644 --- a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js +++ b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js @@ -7,7 +7,7 @@ export class Chunker { buffer constructor(_) {} split(_, _, _) {} - assemble(_) {} + assemble(_, _) {} get pendingCount() {} } export class Compressor { @@ -35,6 +35,7 @@ export class IPC { invokeImpl(_, _, _) {} sendPacket(_, _) {} handleReceive(_, _, _) {} + checkVersion(_) {} handleDirectPacket(_) {} handleChunk(_, _, _) {} sendResponse(_, _) {} diff --git a/test/ipc.test.ts b/test/ipc.test.ts index 779f250..38d6243 100644 --- a/test/ipc.test.ts +++ b/test/ipc.test.ts @@ -440,7 +440,7 @@ describe('IPC', () => { }) it('invoke times out with per-call timeout', async () => { - const promise = ipc.invoke('ghost', { timeout: 100 }) + const promise = ipc.invoke('ghost', undefined, { timeout: 100 }) const rejection = promise.catch(e => e) await vi.advanceTimersByTimeAsync(200) const err = await rejection @@ -449,7 +449,7 @@ describe('IPC', () => { it('invoke resolves before timeout when response arrives in time', async () => { ipc.handle('fast', () => 'pong') - const promise = ipc.invoke('fast', { timeout: 5000 }) + const promise = ipc.invoke('fast', undefined, { timeout: 5000 }) const sent = JSON.parse(mockTransport.send.mock.calls[0][1]) mockTransport.simulateReceive(`${SYSTEM_DOMAINS.PREFIX}:${SYSTEM_DOMAINS.RESPONSE}:test:${sent.id}`, JSON.stringify({