Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ interface IPCOptions {
* @default 30_000
*/
invokeTimeout?: number
/**
* Max time (ms) a partially-assembled chunked packet is kept
* in the buffer. 0 disables timeout.
* @default 30_000
*/
chunkTimeout?: number
}
```

Expand Down
13 changes: 13 additions & 0 deletions src/chunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ interface PendingPacket {
received: number
total: number
compressed: boolean
timestamp: number
}

/**
Expand Down Expand Up @@ -53,14 +54,25 @@ export class Chunker {
* Feed a received chunk to the reassembly buffer.
* Returns `{ done: true }` with the full data once all fragments have arrived.
* @param chunk - The incoming chunk fragment
* @param maxAge - Discard pending packets older than this many ms (0 = no timeout)
*/
assemble(
chunk: Chunk,
maxAge = 0,
): { done: false } | { done: true, data: string, compressed: boolean } {
if (chunk.total <= 0) {
return { done: false }
}

if (maxAge > 0) {
const cutoff = Date.now() - maxAge
for (const [key, p] of this.#buffer) {
if (p.timestamp < cutoff) {
this.#buffer.delete(key)
}
}
}

let pending = this.#buffer.get(chunk.id)

if (!pending) {
Expand All @@ -69,6 +81,7 @@ export class Chunker {
received: 0,
total: chunk.total,
compressed: chunk.compressed === true,
timestamp: Date.now(),
}
this.#buffer.set(chunk.id, pending)
}
Expand Down
25 changes: 22 additions & 3 deletions src/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const DEFAULT_OPTIONS: Required<IPCOptions> = {
compressThreshold: 800,
maxPacketSize: 1_000_000,
invokeTimeout: 30_000,
chunkTimeout: 30_000,
}

/**
Expand Down Expand Up @@ -213,7 +214,7 @@ export class IPC {
options?: InvokeOptions<T, R>,
): Promise<R> {
if (dataOrOptions === undefined) {
return this.#invokeImpl(channel)
return this.#invokeImpl(channel, undefined, options)
}

if (isInvokeOptions(dataOrOptions)) {
Expand Down Expand Up @@ -350,7 +351,10 @@ export class IPC {
if (systemDomain === SYSTEM_DOMAINS.RESPONSE) {
const parsed = JSON.parse(payload) as Packet | Chunk
if ('version' in parsed) {
this.#responses.emit(`${SYSTEM_EVENTS.INVOKE_RESPONSE}:${route}`, (parsed as Packet).data)
const packet = parsed as Packet
if (!this.#checkVersion(packet))
return
this.#responses.emit(`${SYSTEM_EVENTS.INVOKE_RESPONSE}:${route}`, packet.data)
}
else if ('seq' in parsed) {
this.#handleChunk(parsed as Chunk, systemDomain, route)
Expand Down Expand Up @@ -379,7 +383,20 @@ export class IPC {
}
}

#checkVersion(packet: Packet): boolean {
if (packet.version !== PROTOCOL_VERSION) {
this.events.emit(EVENTS.ERROR, new Error(
`Protocol version mismatch: expected ${PROTOCOL_VERSION}, got ${packet.version} [channel: ${packet.channel}]`,
))
return false
}
return true
}

#handleDirectPacket(packet: Packet): void {
if (!this.#checkVersion(packet))
return

const { channel, data, id } = packet

// Packet was sent by this instance itself (loopback via ScriptEvent)
Expand Down Expand Up @@ -422,7 +439,7 @@ export class IPC {
}

#handleChunk(chunk: Chunk, systemDomain: string, route: string): void {
const result = this.#chunker.assemble(chunk)
const result = this.#chunker.assemble(chunk, this.#options.chunkTimeout)

if (result.done) {
const raw = this.#compressor.decompress(result.data, result.compressed)
Expand All @@ -435,6 +452,8 @@ export class IPC {
return
}
if (systemDomain === SYSTEM_DOMAINS.RESPONSE) {
if (!this.#checkVersion(packet))
return
this.#responses.emit(`${SYSTEM_EVENTS.INVOKE_RESPONSE}:${route}`, packet.data)
}
else {
Expand Down
5 changes: 5 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ export interface IPCOptions {
maxPacketSize?: number
/** Default timeout for {@link IPC.invoke} in milliseconds. 0 disables timeout. @default 30_000 */
invokeTimeout?: number
/**
* Maximum time (ms) a partially-assembled chunked packet is kept in the buffer
* before being discarded. 0 disables timeout. @default 30_000
*/
chunkTimeout?: number
}

/**
Expand Down
25 changes: 22 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,27 @@ export function generateId(): string {
return r + c
}

/** Type guard: checks whether an unknown value is an {@link InvokeOptions} object. */
/**
* Type guard: checks whether an unknown value is an {@link InvokeOptions} object.
* Uses a stricter check — requires `serializer` or `deserializer` to have
* an actual `serialize` / `deserialize` method. The plain `timeout` key is
* no longer sufficient on its own to avoid false-positives when a data
* payload happens to contain that property name.
*
* If you only need to pass a timeout without data, use the three-argument form:
* `invoke(channel, undefined, { timeout })`.
*/
export function isInvokeOptions(obj: unknown): obj is InvokeOptions {
return typeof obj === 'object' && obj !== null
&& ('timeout' in obj || 'serializer' in obj || 'deserializer' in obj)
if (typeof obj !== 'object' || obj === null)
return false
const o = obj as Record<string, unknown>
if ('serializer' in o && typeof o.serializer === 'object' && o.serializer !== null) {
if (typeof (o.serializer as Record<string, unknown>).serialize === 'function')
return true
}
if ('deserializer' in o && typeof o.deserializer === 'object' && o.deserializer !== null) {
if (typeof (o.deserializer as Record<string, unknown>).deserialize === 'function')
return true
}
return false
}
3 changes: 2 additions & 1 deletion test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface IPCOptions {
compressThreshold?: number;
maxPacketSize?: number;
invokeTimeout?: number;
chunkTimeout?: number;
}
export interface OnOptions<T = never> {
deserializer?: Deserializer<T>;
Expand Down Expand Up @@ -63,7 +64,7 @@ export declare class Chunker {
#private;
constructor(_: number);
split(_: string, _: string, _: boolean): Chunk[];
assemble(_: Chunk): {
assemble(_: Chunk, _?: number): {
done: false;
} | {
done: true;
Expand Down
3 changes: 2 additions & 1 deletion test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class Chunker {
buffer
constructor(_) {}
split(_, _, _) {}
assemble(_) {}
assemble(_, _) {}
get pendingCount() {}
}
export class Compressor {
Expand Down Expand Up @@ -35,6 +35,7 @@ export class IPC {
invokeImpl(_, _, _) {}
sendPacket(_, _) {}
handleReceive(_, _, _) {}
checkVersion(_) {}
handleDirectPacket(_) {}
handleChunk(_, _, _) {}
sendResponse(_, _) {}
Expand Down
4 changes: 2 additions & 2 deletions test/ipc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ describe('IPC', () => {
})

it('invoke times out with per-call timeout', async () => {
const promise = ipc.invoke('ghost', { timeout: 100 })
const promise = ipc.invoke('ghost', undefined, { timeout: 100 })
const rejection = promise.catch(e => e)
await vi.advanceTimersByTimeAsync(200)
const err = await rejection
Expand All @@ -449,7 +449,7 @@ describe('IPC', () => {

it('invoke resolves before timeout when response arrives in time', async () => {
ipc.handle('fast', () => 'pong')
const promise = ipc.invoke('fast', { timeout: 5000 })
const promise = ipc.invoke('fast', undefined, { timeout: 5000 })

const sent = JSON.parse(mockTransport.send.mock.calls[0][1])
mockTransport.simulateReceive(`${SYSTEM_DOMAINS.PREFIX}:${SYSTEM_DOMAINS.RESPONSE}:test:${sent.id}`, JSON.stringify({
Expand Down
Loading