From 8fcf8986c0a26718e9aadc50ee1a43b8b0dda39f Mon Sep 17 00:00:00 2001 From: lete114 Date: Sat, 23 May 2026 14:15:07 +0800 Subject: [PATCH] feat: route messages by channel in ScriptEvent ID for fast pre-filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Embed channel in ScriptEvent event ID (ipc::) so receivers can discard irrelevant messages via string comparison on event.id before JSON.parse. @response channel always passes through to preserve invoke semantics. Transport: - send(channel, payload) — event ID becomes ipc:: - onReceive callback now returns (channel, payload) - Filter changed from === exact match to startsWith(prefix) IPC: - #handleReceive pre-filters by channel (except @response) - #sendPacket propagates packet.e on every send (incl. chunks) --- src/ipc.ts | 16 +++++-- src/transport.ts | 17 ++++--- .../tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts | 4 +- .../tsnapi/@mcbe-mods/ipc/index.snapshot.js | 4 +- test/ipc.bench.ts | 11 +++-- test/ipc.test.ts | 48 ++++++++++--------- 6 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/ipc.ts b/src/ipc.ts index 3c6604b..0c00304 100644 --- a/src/ipc.ts +++ b/src/ipc.ts @@ -72,9 +72,9 @@ export class IPC { this.#compressor = new Compressor(this.#options.compressThreshold) this.#chunker = new Chunker(this.#options.chunkSize) - this.#transportUnsubscribe = this.#transport.onReceive((payload) => { + this.#transportUnsubscribe = this.#transport.onReceive((channel, payload) => { try { - this.#handleReceive(payload) + this.#handleReceive(channel, payload) } catch (e) { this.events.emit(IPC_SYSTEM_EVENTS.ERROR, e as Error) @@ -310,17 +310,23 @@ export class IPC { const { value, compressed } = this.#compressor.compress(raw) if (value.length <= this.#options.chunkSize && !compressed) { - this.#transport.send(value) + this.#transport.send(packet.e, value) return } const chunks = this.#chunker.split(packet.id, value, compressed) for (const chunk of chunks) { - this.#transport.send(JSON.stringify(chunk)) + this.#transport.send(packet.e, JSON.stringify(chunk)) } } - #handleReceive(payload: string): void { + #handleReceive(channel: string, payload: string): void { + if (channel !== RESPONSE_ENDPOINT + && !this.#onHandlers.has(channel) + && !this.#handleHandlers.has(channel)) { + return + } + const parsed = JSON.parse(payload) as Packet | Chunk if ('v' in parsed) { diff --git a/src/transport.ts b/src/transport.ts index 8a1bb84..0f9c9bf 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -17,27 +17,30 @@ export class Transport { } /** - * Broadcast a raw string payload to all addons listening on the same namespace. + * Broadcast a raw string payload to all addons listening on the same namespace and channel. + * @param channel - The channel name to send on (appended to event ID for fast routing) * @param payload - The raw string to send (usually a serialized packet) */ - send(payload: string): void { - system.sendScriptEvent(this.#id, payload) + send(channel: string, payload: string): void { + system.sendScriptEvent(`${this.#id}:${channel}`, payload) } /** * Subscribe to incoming messages from other addons. - * @param handler - Called with each incoming message + * @param handler - Called with each incoming message, pre-routed by channel * @returns A function that unsubscribes this handler */ - onReceive(handler: (payload: string) => void): () => void { + onReceive(handler: (channel: string, payload: string) => void): () => void { + const prefix = `${this.#id}:` const callback = (event: { id: string, message: string, sourceType: ScriptEventSource }): void => { if (event.sourceType !== ScriptEventSource.Server) { return } - if (event.id !== this.#id) { + if (!event.id.startsWith(prefix)) { return } - handler(event.message) + const channel = event.id.slice(prefix.length) + handler(channel, event.message) } system.afterEvents.scriptEventReceive.subscribe(callback) diff --git a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts index 414bdce..9564bdc 100644 --- a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts +++ b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.d.ts @@ -84,8 +84,8 @@ export declare class IPC { export declare class Transport { #private; constructor(_: string); - send(_: string): void; - onReceive(_: (_: string) => void): () => void; + send(_: string, _: string): void; + onReceive(_: (_: string, _: string) => void): () => void; } // #endregion diff --git a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js index 685acc5..2a2515c 100644 --- a/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js +++ b/test/__snapshots__/tsnapi/@mcbe-mods/ipc/index.snapshot.js @@ -34,7 +34,7 @@ export class IPC { handle(_, _) {} invokeImpl(_, _, _, _) {} sendPacket(_) {} - handleReceive(_) {} + handleReceive(_, _) {} handleDirectPacket(_) {} handleChunk(_) {} sendResponse(_, _) {} @@ -42,7 +42,7 @@ export class IPC { export class Transport { id constructor(_) {} - send(_) {} + send(_, _) {} onReceive(_) {} } // #endregion diff --git a/test/ipc.bench.ts b/test/ipc.bench.ts index 03066c4..9100276 100644 --- a/test/ipc.bench.ts +++ b/test/ipc.bench.ts @@ -30,12 +30,13 @@ describe('IPC.send — fire-and-forget', () => { describe('IPC.send + on — full fire-and-forget cycle', () => { const ipc = new IPC({ namespace: 'cycle' }) + ipc.on('e', () => {}) // ensure pre-filter passes through bench('small — send then simulate receive', () => { mockTransport.send.mockClear() ipc.send('e', SMALL) const payload = mockTransport.send.mock.calls[0][1] - mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle`, payload) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle:e`, payload) }) bench('large — send (chunked) then simulate all chunks', () => { @@ -43,7 +44,7 @@ describe('IPC.send + on — full fire-and-forget cycle', () => { ipc.send('e', LARGE) const calls = mockTransport.send.mock.calls for (const [, payload] of calls) { - mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle`, payload) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:cycle:e`, payload) } }) }) @@ -63,7 +64,7 @@ describe('IPC.invoke + handle — RPC round-trip', () => { const p = ipc.invoke('echo', SMALL) const id = invokeId(mockTransport.send.mock.calls[0][1]) const resp = JSON.stringify({ v: PROTOCOL_VERSION, id, e: RESPONSE_ENDPOINT, d: { ok: true, data: SMALL } }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc`, resp) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc:@response`, resp) await p }) @@ -72,7 +73,7 @@ describe('IPC.invoke + handle — RPC round-trip', () => { const p = ipc.invoke('echo', MEDIUM) const id = invokeId(mockTransport.send.mock.calls[0][1]) const resp = JSON.stringify({ v: PROTOCOL_VERSION, id, e: RESPONSE_ENDPOINT, d: { ok: true, data: MEDIUM } }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc`, resp) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc:@response`, resp) await p }) @@ -81,7 +82,7 @@ describe('IPC.invoke + handle — RPC round-trip', () => { const p = ipc.invoke('echo', LARGE) const id = invokeId(mockTransport.send.mock.calls[0][1]) const resp = JSON.stringify({ v: PROTOCOL_VERSION, id, e: RESPONSE_ENDPOINT, d: { ok: true, data: LARGE } }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc`, resp) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:rpc:@response`, resp) await p }) }) diff --git a/test/ipc.test.ts b/test/ipc.test.ts index bc4b56c..56c99f0 100644 --- a/test/ipc.test.ts +++ b/test/ipc.test.ts @@ -21,7 +21,7 @@ describe('IPC', () => { expect(mockTransport.send).toHaveBeenCalledTimes(1) const [id, payload] = mockTransport.send.mock.calls[0] - expect(id).toBe(`${IPC_NAMESPACE}:test`) + expect(id).toBe(`${IPC_NAMESPACE}:test:ping`) const parsed = JSON.parse(payload) expect(parsed.v).toBe(1) @@ -34,7 +34,7 @@ describe('IPC', () => { ipc.on<{ msg: string }>('ping', handler) const packet = JSON.stringify({ v: PROTOCOL_VERSION, id: 'ABC123', e: 'ping', d: { msg: 'hello' } }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, packet) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:ping`, packet) expect(handler).toHaveBeenCalledTimes(1) expect(handler).toHaveBeenCalledWith({ msg: 'hello' }) @@ -46,7 +46,7 @@ describe('IPC', () => { ipc.on('test', h1) ipc.on('test', h2) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 })) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 })) expect(h1).toHaveBeenCalledWith(42) expect(h2).toHaveBeenCalledWith(42) @@ -57,7 +57,7 @@ describe('IPC', () => { const off = ipc.on('test', handler) off() - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 })) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 })) expect(handler).not.toHaveBeenCalled() }) @@ -79,7 +79,7 @@ describe('IPC', () => { e: RESPONSE_ENDPOINT, d: { ok: true, data: { y: '42' } }, }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket) const result = await promise expect(result).toEqual({ y: '42' }) @@ -92,7 +92,7 @@ describe('IPC', () => { // Simulate incoming invoke request const reqPacket = JSON.stringify({ v: PROTOCOL_VERSION, id: 'REQ1', e: 'fail', d: {} }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, reqPacket) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:fail`, reqPacket) // Let microtasks settle await vi.runAllTimersAsync() @@ -119,9 +119,9 @@ describe('IPC', () => { // Should have sent multiple scriptEvents expect(mockTransport.send.mock.calls.length).toBeGreaterThan(1) - // All should use the ipc:test ID + // All should use the ipc:test:big ID for (const [id] of mockTransport.send.mock.calls) { - expect(id).toBe(`${IPC_NAMESPACE}:test`) + expect(id).toBe(`${IPC_NAMESPACE}:test:big`) } // First call should be a chunk (has 'i' field) @@ -147,7 +147,7 @@ describe('IPC', () => { // Send chunks for (let i = 0; i < chunks.length; i++) { const chunk = JSON.stringify({ i: 'CHUNKID', s: i, t: chunks.length, d: chunks[i] }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, chunk) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:big`, chunk) } expect(handler).toHaveBeenCalledTimes(1) @@ -157,9 +157,10 @@ describe('IPC', () => { it('emits error on malformed chunk reassembly', () => { const errorHandler = vi.fn() ipc.events.on('error', errorHandler) + ipc.on('dummy', () => {}) // register listener so pre-filter passes const chunk = JSON.stringify({ i: 'BADID', s: 0, t: 1, d: 'not-json!!' }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, chunk) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:dummy`, chunk) expect(errorHandler).toHaveBeenCalled() }) @@ -183,7 +184,7 @@ describe('IPC', () => { ipc.on('custom', customDeserializer, handler) const packet = JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'custom', d: 'num:42' }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, packet) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:custom`, packet) expect(handler).toHaveBeenCalledWith(42) }) @@ -213,7 +214,7 @@ describe('IPC', () => { e: RESPONSE_ENDPOINT, d: { ok: false, err: 'No handler registered for "ghost"' }, }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket) await expect(promise).rejects.toThrow('No handler registered for "ghost"') }) @@ -227,13 +228,13 @@ describe('IPC', () => { ipc.send('ping', { msg: 'hello' }) const sentPayload = mockTransport.send.mock.lastCall?.[1] - // Simulate packet arriving on ipc:test (sender's namespace) + // Simulate packet arriving on ipc:test:ping (sender's namespace) // ipc2 listens on ipc:ns2, so it should NOT receive this - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, sentPayload) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:ping`, sentPayload) expect(handler).not.toHaveBeenCalled() - // Simulate packet arriving on ipc:ns2 — ipc2 SHOULD receive it - mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2`, sentPayload) + // Simulate packet arriving on ipc:ns2:ping — ipc2 SHOULD receive it + mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2:ping`, sentPayload) expect(handler).toHaveBeenCalledTimes(1) expect(handler).toHaveBeenCalledWith({ msg: 'hello' }) }) @@ -249,7 +250,7 @@ describe('IPC', () => { const sentPayload = mockTransport.send.mock.lastCall?.[1] // Simulate on ns2's namespace — ipc2 should NOT handle - mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2`, sentPayload) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:ns2:ping`, sentPayload) expect(handler).not.toHaveBeenCalled() // No RESPONSE_ENDPOINT should have been sent from ipc2 back @@ -268,7 +269,7 @@ describe('IPC', () => { const sentPacket = JSON.parse(sentPayload) // Simulate loopback: invoke packet returns to sender - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify(sentPacket)) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:echo`, JSON.stringify(sentPacket)) // Simulate normal response from the other side const responsePacket = JSON.stringify({ @@ -277,7 +278,7 @@ describe('IPC', () => { e: RESPONSE_ENDPOINT, d: { ok: true, data: 'echo:hello' }, }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket) await expect(promise).resolves.toBe('echo:hello') }) @@ -292,7 +293,7 @@ describe('IPC', () => { const sentPacket = JSON.parse(sentPayload) // Simulate loopback — handle() should NOT be triggered - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify(sentPacket)) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify(sentPacket)) expect(handler).not.toHaveBeenCalled() // Resolve with a response from "the other side" @@ -302,7 +303,7 @@ describe('IPC', () => { e: RESPONSE_ENDPOINT, d: { ok: true, data: 'ok' }, }) - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, responsePacket) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:@response`, responsePacket) await expect(promise).resolves.toBe('ok') }) @@ -311,7 +312,7 @@ describe('IPC', () => { ipc.on('test', handler) ipc.dispose() - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 })) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:test`, JSON.stringify({ v: PROTOCOL_VERSION, id: 'X', e: 'test', d: 42 })) expect(handler).not.toHaveBeenCalled() }) @@ -319,8 +320,9 @@ describe('IPC', () => { it('emits invalid-packet event for unrecognized payloads', () => { const handler = vi.fn() ipc.events.on(IPC_SYSTEM_EVENTS.INVALID_PACKET, handler) + ipc.on('dummy', () => {}) // register listener so pre-filter passes - mockTransport.simulateReceive(`${IPC_NAMESPACE}:test`, JSON.stringify({ foo: 'bar' })) + mockTransport.simulateReceive(`${IPC_NAMESPACE}:test:dummy`, JSON.stringify({ foo: 'bar' })) expect(handler).toHaveBeenCalledTimes(1) expect(handler).toHaveBeenCalledWith({ payload: JSON.stringify({ foo: 'bar' }) })