Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ export class IPC {
this.#compressor = new Compressor(this.#options.compressThreshold)
this.#chunker = new Chunker(this.#options.chunkSize)

this.#transportUnsubscribe = this.#transport.onReceive((payload) => {
this.#transportUnsubscribe = this.#transport.onReceive((channel, payload) => {
try {
this.#handleReceive(payload)
this.#handleReceive(channel, payload)
}
catch (e) {
this.events.emit(IPC_SYSTEM_EVENTS.ERROR, e as Error)
Expand Down Expand Up @@ -310,17 +310,23 @@ export class IPC {
const { value, compressed } = this.#compressor.compress(raw)

if (value.length <= this.#options.chunkSize && !compressed) {
this.#transport.send(value)
this.#transport.send(packet.e, value)
return
}

const chunks = this.#chunker.split(packet.id, value, compressed)
for (const chunk of chunks) {
this.#transport.send(JSON.stringify(chunk))
this.#transport.send(packet.e, JSON.stringify(chunk))
}
}

#handleReceive(payload: string): void {
#handleReceive(channel: string, payload: string): void {
if (channel !== RESPONSE_ENDPOINT
&& !this.#onHandlers.has(channel)
&& !this.#handleHandlers.has(channel)) {
return
}

const parsed = JSON.parse(payload) as Packet | Chunk

if ('v' in parsed) {
Expand Down
17 changes: 10 additions & 7 deletions src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@ export class Transport {
}

/**
* Broadcast a raw string payload to all addons listening on the same namespace.
* Broadcast a raw string payload to all addons listening on the same namespace and channel.
* @param channel - The channel name to send on (appended to event ID for fast routing)
* @param payload - The raw string to send (usually a serialized packet)
*/
send(payload: string): void {
system.sendScriptEvent(this.#id, payload)
send(channel: string, payload: string): void {
system.sendScriptEvent(`${this.#id}:${channel}`, payload)
}

/**
* Subscribe to incoming messages from other addons.
* @param handler - Called with each incoming message
* @param handler - Called with each incoming message, pre-routed by channel
* @returns A function that unsubscribes this handler
*/
onReceive(handler: (payload: string) => void): () => void {
onReceive(handler: (channel: string, payload: string) => void): () => void {
const prefix = `${this.#id}:`
const callback = (event: { id: string, message: string, sourceType: ScriptEventSource }): void => {
if (event.sourceType !== ScriptEventSource.Server) {
return
}
if (event.id !== this.#id) {
if (!event.id.startsWith(prefix)) {
return
}
handler(event.message)
const channel = event.id.slice(prefix.length)
handler(channel, event.message)
}

system.afterEvents.scriptEventReceive.subscribe(callback)
Expand Down
4 changes: 2 additions & 2 deletions test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ export declare class IPC {
export declare class Transport {
#private;
constructor(_: string);
send(_: string): void;
onReceive(_: (_: string) => void): () => void;
send(_: string, _: string): void;
onReceive(_: (_: string, _: string) => void): () => void;
}
// #endregion

Expand Down
4 changes: 2 additions & 2 deletions test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ export class IPC {
handle(_, _) {}
invokeImpl(_, _, _, _) {}
sendPacket(_) {}
handleReceive(_) {}
handleReceive(_, _) {}
handleDirectPacket(_) {}
handleChunk(_) {}
sendResponse(_, _) {}
}
export class Transport {
id
constructor(_) {}
send(_) {}
send(_, _) {}
onReceive(_) {}
}
// #endregion
Expand Down
11 changes: 6 additions & 5 deletions test/ipc.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ describe('IPC.send — fire-and-forget', () => {

describe('IPC.send + on — full fire-and-forget cycle', () => {
const ipc = new IPC({ namespace: 'cycle' })
ipc.on('e', () => {}) // ensure pre-filter passes through

bench('small — send then simulate receive', () => {
mockTransport.send.mockClear()
ipc.send('e', SMALL)
const payload = mockTransport.send.mock.calls[0][1]
mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle`, payload)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle:e`, payload)
})

bench('large — send (chunked) then simulate all chunks', () => {
mockTransport.send.mockClear()
ipc.send('e', LARGE)
const calls = mockTransport.send.mock.calls
for (const [, payload] of calls) {
mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle`, payload)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle:e`, payload)
}
})
})
Expand All @@ -63,7 +64,7 @@ describe('IPC.invoke + handle — RPC round-trip', () => {
const p = ipc.invoke<string, string>('echo', SMALL)
const id = invokeId(mockTransport.send.mock.calls[0][1])
const resp = JSON.stringify({ v: PROTOCOL_VERSION, id, e: RESPONSE_ENDPOINT, d: { ok: true, data: SMALL } })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc`, resp)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc:@response`, resp)
await p
})

Expand All @@ -72,7 +73,7 @@ describe('IPC.invoke + handle — RPC round-trip', () => {
const p = ipc.invoke<string, string>('echo', MEDIUM)
const id = invokeId(mockTransport.send.mock.calls[0][1])
const resp = JSON.stringify({ v: PROTOCOL_VERSION, id, e: RESPONSE_ENDPOINT, d: { ok: true, data: MEDIUM } })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc`, resp)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc:@response`, resp)
await p
})

Expand All @@ -81,7 +82,7 @@ describe('IPC.invoke + handle — RPC round-trip', () => {
const p = ipc.invoke<string, string>('echo', LARGE)
const id = invokeId(mockTransport.send.mock.calls[0][1])
const resp = JSON.stringify({ v: PROTOCOL_VERSION, id, e: RESPONSE_ENDPOINT, d: { ok: true, data: LARGE } })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc`, resp)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc:@response`, resp)
await p
})
})
Expand Down
48 changes: 25 additions & 23 deletions test/ipc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('IPC', () => {

expect(mockTransport.send).toHaveBeenCalledTimes(1)
const [id, payload] = mockTransport.send.mock.calls[0]
expect(id).toBe(`${IPC_NAMESPACE}:test`)
expect(id).toBe(`${IPC_NAMESPACE}:test:ping`)

const parsed = JSON.parse(payload)
expect(parsed.v).toBe(1)
Expand All @@ -34,7 +34,7 @@ describe('IPC', () => {
ipc.on<{ msg: string }>('ping', handler)

const packet = JSON.stringify({ v: PROTOCOL_VERSION, id: 'ABC123', e: 'ping', d: { msg: 'hello' } })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, packet)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:ping`, packet)

expect(handler).toHaveBeenCalledTimes(1)
expect(handler).toHaveBeenCalledWith({ msg: 'hello' })
Expand All @@ -46,7 +46,7 @@ describe('IPC', () => {
ipc.on('test', h1)
ipc.on('test', h2)

mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 }))
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 }))

expect(h1).toHaveBeenCalledWith(42)
expect(h2).toHaveBeenCalledWith(42)
Expand All @@ -57,7 +57,7 @@ describe('IPC', () => {
const off = ipc.on('test', handler)
off()

mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 }))
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 }))
expect(handler).not.toHaveBeenCalled()
})

Expand All @@ -79,7 +79,7 @@ describe('IPC', () => {
e: RESPONSE_ENDPOINT,
d: { ok: true, data: { y: '42' } },
})
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket)

const result = await promise
expect(result).toEqual({ y: '42' })
Expand All @@ -92,7 +92,7 @@ describe('IPC', () => {

// Simulate incoming invoke request
const reqPacket = JSON.stringify({ v: PROTOCOL_VERSION, id: 'REQ1', e: 'fail', d: {} })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, reqPacket)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:fail`, reqPacket)

// Let microtasks settle
await vi.runAllTimersAsync()
Expand All @@ -119,9 +119,9 @@ describe('IPC', () => {
// Should have sent multiple scriptEvents
expect(mockTransport.send.mock.calls.length).toBeGreaterThan(1)

// All should use the ipc:test ID
// All should use the ipc:test:big ID
for (const [id] of mockTransport.send.mock.calls) {
expect(id).toBe(`${IPC_NAMESPACE}:test`)
expect(id).toBe(`${IPC_NAMESPACE}:test:big`)
}

// First call should be a chunk (has 'i' field)
Expand All @@ -147,7 +147,7 @@ describe('IPC', () => {
// Send chunks
for (let i = 0; i < chunks.length; i++) {
const chunk = JSON.stringify({ i: 'CHUNKID', s: i, t: chunks.length, d: chunks[i] })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, chunk)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:big`, chunk)
}

expect(handler).toHaveBeenCalledTimes(1)
Expand All @@ -157,9 +157,10 @@ describe('IPC', () => {
it('emits error on malformed chunk reassembly', () => {
const errorHandler = vi.fn()
ipc.events.on('error', errorHandler)
ipc.on('dummy', () => {}) // register listener so pre-filter passes

const chunk = JSON.stringify({ i: 'BADID', s: 0, t: 1, d: 'not-json!!' })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, chunk)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:dummy`, chunk)

expect(errorHandler).toHaveBeenCalled()
})
Expand All @@ -183,7 +184,7 @@ describe('IPC', () => {
ipc.on('custom', customDeserializer, handler)

const packet = JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'custom', d: 'num:42' })
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, packet)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:custom`, packet)

expect(handler).toHaveBeenCalledWith(42)
})
Expand Down Expand Up @@ -213,7 +214,7 @@ describe('IPC', () => {
e: RESPONSE_ENDPOINT,
d: { ok: false, err: 'No handler registered for "ghost"' },
})
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket)

await expect(promise).rejects.toThrow('No handler registered for "ghost"')
})
Expand All @@ -227,13 +228,13 @@ describe('IPC', () => {
ipc.send('ping', { msg: 'hello' })
const sentPayload = mockTransport.send.mock.lastCall?.[1]

// Simulate packet arriving on ipc:test (sender's namespace)
// Simulate packet arriving on ipc:test:ping (sender's namespace)
// ipc2 listens on ipc:ns2, so it should NOT receive this
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, sentPayload)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:ping`, sentPayload)
expect(handler).not.toHaveBeenCalled()

// Simulate packet arriving on ipc:ns2 — ipc2 SHOULD receive it
mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2`, sentPayload)
// Simulate packet arriving on ipc:ns2:ping — ipc2 SHOULD receive it
mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2:ping`, sentPayload)
expect(handler).toHaveBeenCalledTimes(1)
expect(handler).toHaveBeenCalledWith({ msg: 'hello' })
})
Expand All @@ -249,7 +250,7 @@ describe('IPC', () => {
const sentPayload = mockTransport.send.mock.lastCall?.[1]

// Simulate on ns2's namespace — ipc2 should NOT handle
mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2`, sentPayload)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2:ping`, sentPayload)
expect(handler).not.toHaveBeenCalled()

// No RESPONSE_ENDPOINT should have been sent from ipc2 back
Expand All @@ -268,7 +269,7 @@ describe('IPC', () => {
const sentPacket = JSON.parse(sentPayload)

// Simulate loopback: invoke packet returns to sender
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify(sentPacket))
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:echo`, JSON.stringify(sentPacket))

// Simulate normal response from the other side
const responsePacket = JSON.stringify({
Expand All @@ -277,7 +278,7 @@ describe('IPC', () => {
e: RESPONSE_ENDPOINT,
d: { ok: true, data: 'echo:hello' },
})
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket)

await expect(promise).resolves.toBe('echo:hello')
})
Expand All @@ -292,7 +293,7 @@ describe('IPC', () => {
const sentPacket = JSON.parse(sentPayload)

// Simulate loopback — handle() should NOT be triggered
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify(sentPacket))
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify(sentPacket))
expect(handler).not.toHaveBeenCalled()

// Resolve with a response from "the other side"
Expand All @@ -302,7 +303,7 @@ describe('IPC', () => {
e: RESPONSE_ENDPOINT,
d: { ok: true, data: 'ok' },
})
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket)
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket)
await expect(promise).resolves.toBe('ok')
})

Expand All @@ -311,16 +312,17 @@ describe('IPC', () => {
ipc.on('test', handler)
ipc.dispose()

mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 }))
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 }))

expect(handler).not.toHaveBeenCalled()
})

it('emits invalid-packet event for unrecognized payloads', () => {
const handler = vi.fn()
ipc.events.on(IPC_SYSTEM_EVENTS.INVALID_PACKET, handler)
ipc.on('dummy', () => {}) // register listener so pre-filter passes

mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ foo: 'bar' }))
mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:dummy`, JSON.stringify({ foo: 'bar' }))

expect(handler).toHaveBeenCalledTimes(1)
expect(handler).toHaveBeenCalledWith({ payload: JSON.stringify({ foo: 'bar' }) })
Expand Down
Loading