From 44b239133d67704f4e324319ebcf1fe55a25786d Mon Sep 17 00:00:00 2001 From: Kostandin Angjellari Date: Thu, 28 May 2026 01:11:04 +0200 Subject: [PATCH 1/4] =?UTF-8?q?FE-763:=20Petrinaut=20event=20stream=20?= =?UTF-8?q?=E2=80=94=20initial=20markings=20+=20transition=20firings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Emits the runtime events Petrinaut needs to visualize a live cook run, in the cross-team-agreed payload shape (2026-05-26 alignment): initial_marking: { kind, ts, runId, marking: { : [{id, ...payload}] } } transition_fired: { kind, ts, runId, transitionName, input: { : [{id, ...payload}] }, output: { : [{id, ...payload}] } } net_halted / net_deadlocked: { kind, ts, runId } What landed: - New module src/orchestrator/src/petrinaut-events.ts: pure adapter createPetrinautEventStream({ runId, filePath?, tokenIdFn?, onEvent? }) returning { sink: NetEventSink, emitInitialMarking(blueprint) }. Writes one JSONL object per line to filePath when set and fans out to onEvent for in-process consumers (tests, future sync-server forwarder). - petri-net.ts: NetEvent gains parallel optional consumedTokens?: Token[][] and producedTokens?: Token[][] (one entry per arc, same indexing as the existing consumed/produced place-name lists). These are populated for transition_fired events so the adapter can render the per-place { id, ...payload } shape Petrinaut expects without re-reading the net. scheduleDeferred gains a consumedTokens parameter alongside the existing consumedPlaces so async fires emit the same shape as sync fires. - net-compiler.ts: all four scheduleDeferred call sites pass the captured consumed tokens through to the deferred event. - engine.ts: when input.runDir is present, opens a Petrinaut event stream writing to /petrinaut-events.jsonl, emits initial_marking from the compiled blueprint up-front, then passes the sink to net.run(). The same gate that drives FE-762's /net.json write — library callers without a runDir get the existing no-op behavior. Halt outcomes (FE-761 Slice 2b halted-as-place): - Petrinaut sees halt as a halt token traveling through the topology via transition_fired events landing on slice::halted / epic::halted, plus a terminal net_halted event from the engine. Open coordination item: token UUID lifecycle across consume->emit (lineage tracing). v1 generates fresh UUIDs per emission. When Petrinaut decides whether identity should persist, this module is the seam to evolve. Tests: - 4 unit tests in petrinaut-events.test.ts covering initial_marking, transition_fired adapter shape, terminal events, and JSONL file roundtrip via mkdtempSync. - 1 end-to-end test in engine-contract.test.ts running simplePlan happy path with the Petrinaut sink — asserts initial_marking first, runId on every event, the FE-761 Slice 4 dispatch + complete transition names both appear, every token carries an id, and happy paths emit no net_halted / net_deadlocked. All 130 orchestrator tests pass; npm run fix + check + build all green. Co-authored-by: Amp --- src/orchestrator/src/engine-contract.test.ts | 75 ++++++- src/orchestrator/src/engine.ts | 19 +- src/orchestrator/src/net-compiler.ts | 8 +- src/orchestrator/src/petri-net.ts | 28 ++- src/orchestrator/src/petrinaut-events.test.ts | 174 ++++++++++++++++ src/orchestrator/src/petrinaut-events.ts | 189 ++++++++++++++++++ 6 files changed, 483 insertions(+), 10 deletions(-) create mode 100644 src/orchestrator/src/petrinaut-events.test.ts create mode 100644 src/orchestrator/src/petrinaut-events.ts diff --git a/src/orchestrator/src/engine-contract.test.ts b/src/orchestrator/src/engine-contract.test.ts index 291815f28..842bf2b67 100644 --- a/src/orchestrator/src/engine-contract.test.ts +++ b/src/orchestrator/src/engine-contract.test.ts @@ -5,8 +5,13 @@ import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import { createOrchestrator } from './engine.js'; -import { compilePlan, compileTopology } from './net-compiler.js'; +import { compilePlan, compileTopology, wireHandlers } from './net-compiler.js'; import type { NetEvent } from './petri-net.js'; +import { + createPetrinautEventStream, + type PetrinautEvent, + type PetrinautTransitionFiredEvent, +} from './petrinaut-events.js'; import { InMemoryReportSink } from './report-sink.js'; import type { ActionContext, ActionHandlers, OrchestratorInput, Plan, RunCtx, TestRunner } from './types.js'; @@ -906,6 +911,74 @@ describe('Adapter: §7 event vocabulary', () => { }); }); +// --------------------------------------------------------------------------- +// FE-763 — Petrinaut event stream end-to-end on the orchestrator +// --------------------------------------------------------------------------- + +describe('FE-763: Petrinaut event stream on a real run', () => { + it('emits initial_marking + transition_fired (with token payload) + net_halted for simplePlan happy path', async () => { + const fakes = createFakes(); + const ctx: RunCtx = { + reportIds: [], + sliceOutcomes: new Map(), + epicOutcomes: new Map(), + }; + const input: OrchestratorInput = { + plan: simplePlan, + sandboxDir: '/tmp/fake', + actions: fakes.actions, + reports: fakes.reports, + testRunner: fakes.testRunner, + policy: { maxRetries: 3 }, + }; + + const blueprint = compileTopology(input.plan, input.policy); + const net = wireHandlers(blueprint, input, ctx); + + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-e2e', + onEvent: (e) => events.push(e), + }); + stream.emitInitialMarking(blueprint); + await net.run('serial', () => net.hasHaltToken(), stream.sink); + + // 1. initial_marking is first. + expect(events[0]!.kind).toBe('initial_marking'); + + // 2. every event carries the runId. + expect(events.every((e) => 'runId' in e && e.runId === 'run-e2e')).toBe(true); + + // 3. transition_fired events expose the FE-761 Slice 4 dispatch/complete + // topology directly in Petrinaut's wire format. + const fired = events.filter((e): e is PetrinautTransitionFiredEvent => e.kind === 'transition_fired'); + const names = fired.map((e) => e.transitionName); + expect(names).toContain('slice-1:evaluate:dispatch'); + expect(names).toContain('slice-1:evaluate:complete'); + expect(names).toContain('slice-1:assess-semantic:dispatch'); + expect(names).toContain('slice-1:assess-semantic:complete'); + + // 4. each transition_fired carries per-place token data with a UUID + // (cross-team-agreed shape: { id: , ...payload }). + for (const e of fired) { + for (const tokens of Object.values(e.input)) { + for (const tok of tokens) expect(typeof tok.id).toBe('string'); + } + for (const tokens of Object.values(e.output)) { + for (const tok of tokens) expect(typeof tok.id).toBe('string'); + } + } + + // 5. happy path: no net_halted / net_deadlocked emitted (engine exits + // the loop cleanly when nothing remains enabled). When the cook + // fails — retry exhaustion etc. — Petrinaut sees the halt token + // travel through the topology as a transition_fired event landing + // in `slice::halted`, plus the engine emits net_halted. + expect(events.filter((e) => e.kind === 'net_halted')).toHaveLength(0); + expect(events.filter((e) => e.kind === 'net_deadlocked')).toHaveLength(0); + }); +}); + // --------------------------------------------------------------------------- // Contract test #12 — parallel fires concurrently // --------------------------------------------------------------------------- diff --git a/src/orchestrator/src/engine.ts b/src/orchestrator/src/engine.ts index fa81bff12..ee293a407 100644 --- a/src/orchestrator/src/engine.ts +++ b/src/orchestrator/src/engine.ts @@ -2,7 +2,8 @@ import { writeFileSync } from 'node:fs'; import { join } from 'node:path'; import { compileTopology, wireHandlers } from './net-compiler.js'; -import type { FiringPolicy } from './petri-net.js'; +import type { FiringPolicy, NetEventSink } from './petri-net.js'; +import { createPetrinautEventStream } from './petrinaut-events.js'; import { serializeBlueprint } from './petrinaut-export.js'; import { toSdcpnFile } from './petrinaut-sdcpn.js'; import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from './types.js'; @@ -49,8 +50,22 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { } } + // FE-763: open a Petrinaut event stream when runDir is present. + // Emits an initial_marking event up-front, then transition_fired / + // net_halted / net_deadlocked events as the net runs. Library + // callers without a runDir get the existing no-op behavior. + let eventSink: NetEventSink | undefined; + if (input.runDir) { + const stream = createPetrinautEventStream({ + runId: input.runId ?? 'unknown', + filePath: join(input.runDir, 'petrinaut-events.jsonl'), + }); + stream.emitInitialMarking(blueprint); + eventSink = stream.sink; + } + const net = wireHandlers(blueprint, input, ctx); - await net.run(firingPolicy, () => net.hasHaltToken()); + await net.run(firingPolicy, () => net.hasHaltToken(), eventSink); hasStructuralHalt = net.hasHaltToken(); // Derive halt reason from any halt token deposited during the run. diff --git a/src/orchestrator/src/net-compiler.ts b/src/orchestrator/src/net-compiler.ts index 4677c624d..eddec9d2e 100644 --- a/src/orchestrator/src/net-compiler.ts +++ b/src/orchestrator/src/net-compiler.ts @@ -643,7 +643,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, } return out; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; @@ -740,7 +740,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, retryCount: retryCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; @@ -795,7 +795,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, reworkCount: reworkCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; @@ -877,7 +877,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, // happens in sibling-passthrough transitions downstream. return [{ place: intermediatePlace, token: { ...inputToken, reportId } }]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); return []; }; break; diff --git a/src/orchestrator/src/petri-net.ts b/src/orchestrator/src/petri-net.ts index 661aad16d..d6649f8b0 100644 --- a/src/orchestrator/src/petri-net.ts +++ b/src/orchestrator/src/petri-net.ts @@ -65,14 +65,24 @@ export type FiringPolicy = 'serial' | 'parallel'; /** Event kinds aligned with spec doc §7. */ export type NetEventKind = 'transition_fired' | 'net_deadlocked' | 'net_halted'; -/** Structured event emitted during net execution. */ +/** + * Structured event emitted during net execution. + * + * `consumed` / `produced` are place-name lists (one entry per arc). The + * parallel `consumedTokens` / `producedTokens` carry the actual tokens + * that traversed each arc, indexed the same way — they are populated for + * `transition_fired` events so downstream adapters (e.g. the FE-763 + * Petrinaut event stream) can include token payload in the wire format. + */ export type NetEvent = { kind: NetEventKind; ts: string; transitionId?: string; contract?: TransitionContract; consumed?: string[]; + consumedTokens?: Token[][]; produced?: string[]; + producedTokens?: Token[][]; }; /** Sink for structured net events. Optional — defaults to no-op. */ @@ -142,11 +152,14 @@ export class PetriNet { transitionId: string, contract: TransitionContract | undefined, consumedPlaces: string[], + consumedTokens: Token[], work: Promise<{ place: string; token: Token }[]>, ): void { this.pendingDeferred++; work - .then((outputs) => this.completeDeferred(transitionId, contract, consumedPlaces, outputs)) + .then((outputs) => + this.completeDeferred(transitionId, contract, consumedPlaces, consumedTokens, outputs), + ) .catch((err) => { this.deferredError ??= err; this.pendingDeferred--; @@ -158,12 +171,15 @@ export class PetriNet { transitionId: string, contract: TransitionContract | undefined, consumedPlaces: string[], + consumedTokens: Token[], outputs: { place: string; token: Token }[], ): void { const producedPlaces: string[] = []; + const producedTokens: Token[][] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); + producedTokens.push([token]); } this.deferredEventSink?.emit({ kind: 'transition_fired', @@ -171,7 +187,9 @@ export class PetriNet { transitionId, contract, consumed: consumedPlaces, + consumedTokens: consumedTokens.map((t) => [t]), produced: producedPlaces, + producedTokens, }); this.pendingDeferred--; this.wakeOneWaiter(); @@ -277,14 +295,16 @@ export class PetriNet { } private depositClaim( - { transition, consumed: _consumed }: TransitionClaim, + { transition, consumed }: TransitionClaim, outputs: { place: string; token: Token }[], eventSink?: NetEventSink, ): void { const producedPlaces: string[] = []; + const producedTokens: Token[][] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); + producedTokens.push([token]); } // Deferred handlers return [] synchronously; their transition_fired // event is emitted once from completeDeferred when outputs land. @@ -295,7 +315,9 @@ export class PetriNet { transitionId: transition.id, contract: transition.contract, consumed: transition.inputs, + consumedTokens: consumed.map((t) => [t]), produced: producedPlaces, + producedTokens, }); } diff --git a/src/orchestrator/src/petrinaut-events.test.ts b/src/orchestrator/src/petrinaut-events.test.ts new file mode 100644 index 000000000..eef00cd16 --- /dev/null +++ b/src/orchestrator/src/petrinaut-events.test.ts @@ -0,0 +1,174 @@ +import { mkdtempSync, readFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; + +import { describe, expect, it } from 'vitest'; + +import { compileTopology } from './net-compiler.js'; +import { + createPetrinautEventStream, + type PetrinautEvent, + type PetrinautTransitionFiredEvent, +} from './petrinaut-events.js'; +import type { Plan } from './types.js'; + +const simplePlan: Plan = { + epics: [{ id: 'epic-1', summary: 'E', depends_on: [], verification: [] }], + slices: [ + { + id: 'slice-1', + epic_id: 'epic-1', + definition: 'D', + depends_on: [], + verification: [{ kind: 'unit-test', target: 't' }], + }, + ], +}; + +/** Deterministic UUID stub for stable event snapshots. */ +function deterministicTokenId(): () => string { + let n = 0; + return () => `tok-${++n}`; +} + +// --------------------------------------------------------------------------- +// Unit tests — createPetrinautEventStream as a NetEventSink adapter +// --------------------------------------------------------------------------- + +describe('createPetrinautEventStream — initial_marking', () => { + it('emits one initial_marking event grouping every initial token by place', () => { + const blueprint = compileTopology(simplePlan, { maxRetries: 3 }); + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.emitInitialMarking(blueprint); + + expect(events).toHaveLength(1); + const ev = events[0]!; + expect(ev.kind).toBe('initial_marking'); + if (ev.kind !== 'initial_marking') return; // narrow + + expect(ev.runId).toBe('run-1'); + expect(Object.keys(ev.marking).sort()).toEqual( + [ + 'pool:code-agent', + 'pool:test-agent', + 'slice:slice-1:eligible', + 'slice:slice-1:retry-budget', + 'slice:slice-1:semantic-budget', + ].sort(), + ); + + // Every token carries an id; semantic payloads preserved. + const retry = ev.marking['slice:slice-1:retry-budget']!; + expect(retry).toHaveLength(1); + expect(retry[0]!.id).toBeDefined(); + expect(retry[0]!.retryCount).toBe(0); + expect(retry[0]!.sliceId).toBe('slice-1'); + }); +}); + +describe('createPetrinautEventStream — transition_fired adapter', () => { + it('translates a NetEvent into the cross-team-agreed transition_fired shape', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }], [{ sliceId: '', epicId: '' }]], + produced: ['slice:slice-1:evaluate:running'], + producedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + }); + + expect(events).toHaveLength(1); + const ev = events[0]! as PetrinautTransitionFiredEvent; + expect(ev.kind).toBe('transition_fired'); + expect(ev.runId).toBe('run-1'); + expect(ev.transitionName).toBe('slice-1:evaluate:dispatch'); + expect(Object.keys(ev.input).sort()).toEqual(['pool:test-agent', 'slice:slice-1:spec-ready']); + expect(ev.input['slice:slice-1:spec-ready']).toHaveLength(1); + expect(ev.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); + expect(Object.keys(ev.output)).toEqual(['slice:slice-1:evaluate:running']); + expect(ev.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); + }); + + it('forwards net_halted and net_deadlocked as terminal events', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:00.000Z' }); + stream.sink.emit({ kind: 'net_deadlocked', ts: '2026-05-27T00:00:01.000Z' }); + + expect(events.map((e) => e.kind)).toEqual(['net_halted', 'net_deadlocked']); + expect(events.every((e) => 'runId' in e && e.runId === 'run-1')).toBe(true); + }); +}); + +// --------------------------------------------------------------------------- +// File output — JSONL roundtrip +// --------------------------------------------------------------------------- + +describe('createPetrinautEventStream — JSONL file output', () => { + it('appends one event per line and reloads as parsed events', () => { + const dir = mkdtempSync(join(tmpdir(), 'brunch-petrinaut-events-')); + const filePath = join(dir, 'petrinaut-events.jsonl'); + const blueprint = compileTopology(simplePlan, { maxRetries: 3 }); + + const stream = createPetrinautEventStream({ + runId: 'run-jsonl', + filePath, + tokenIdFn: deterministicTokenId(), + }); + + // Up-front initial marking. + stream.emitInitialMarking(blueprint); + + // A synthetic transition_fired (the production path goes through the + // NetEventSink during PetriNet.run; here we exercise the same adapter + // directly to avoid coupling this test to the heavy orchestrator path). + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }], [{ sliceId: '', epicId: '' }]], + produced: ['slice:slice-1:evaluate:running'], + producedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + }); + + // Terminal halt. + stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:01.000Z' }); + + const raw = readFileSync(filePath, 'utf8'); + const lines = raw + .trim() + .split('\n') + .map((l) => JSON.parse(l) as PetrinautEvent); + + expect(lines).toHaveLength(3); + expect(lines[0]!.kind).toBe('initial_marking'); + expect(lines[1]!.kind).toBe('transition_fired'); + expect(lines[2]!.kind).toBe('net_halted'); + + // Every event carries runId for cross-run isolation. + expect(lines.every((e) => 'runId' in e && e.runId === 'run-jsonl')).toBe(true); + + // Transition_fired arcs carry tokens with payload. + const fired = lines[1] as PetrinautTransitionFiredEvent; + expect(fired.transitionName).toBe('slice-1:evaluate:dispatch'); + expect(fired.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); + expect(fired.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); + }); +}); diff --git a/src/orchestrator/src/petrinaut-events.ts b/src/orchestrator/src/petrinaut-events.ts new file mode 100644 index 000000000..aa4024c07 --- /dev/null +++ b/src/orchestrator/src/petrinaut-events.ts @@ -0,0 +1,189 @@ +// --------------------------------------------------------------------------- +// FE-763 — Petrinaut event stream for a live cook run. +// +// Adapts the orchestrator's internal NetEvent stream into the cross-team- +// agreed Petrinaut event format, plus an `initial_marking` event emitted +// once at run start from the compiled blueprint. +// +// Wire format (2026-05-26 alignment): +// +// transition_fired: +// { kind, ts, runId, transitionName, +// input: { : [{ id: , ...payload }] }, +// output: { : [{ id: , ...payload }] } } +// +// initial_marking: +// { kind, ts, runId, +// marking: { : [{ id: , ...payload }] } } +// +// net_halted / net_deadlocked: +// { kind, ts, runId } +// +// Halt outcomes appear in two complementary forms: +// 1. structurally — as halt tokens on `slice::halted` / `epic::halted` +// places (deposited by the FE-761 Slice 2b halted-as-place refactor). +// These flow naturally through `transition_fired` events as token payload. +// 2. as a terminal `net_halted` event marking the run's end state. +// +// Open coordination item (tracked on FE-763): token UUID lifecycle — +// today every emission generates fresh UUIDs (no lineage across +// consume→emit). When Petrinaut decides whether to persist token +// identity across firings this module is the seam to evolve. +// --------------------------------------------------------------------------- + +import { randomUUID } from 'node:crypto'; +import { appendFileSync, writeFileSync } from 'node:fs'; + +import type { NetBlueprint, TokenSeed } from './net-blueprint.js'; +import type { NetEvent, NetEventSink, Token } from './petri-net.js'; + +export type PetrinautToken = { + id: string; + sliceId?: string; + epicId?: string; + retryCount?: number; + reworkCount?: number; + /** Halt reason carried by halt tokens (FE-761 Slice 2b). */ + haltReason?: string; +}; + +export type PetrinautInitialMarkingEvent = { + kind: 'initial_marking'; + ts: string; + runId: string; + marking: Record; +}; + +export type PetrinautTransitionFiredEvent = { + kind: 'transition_fired'; + ts: string; + runId: string; + transitionName: string; + input: Record; + output: Record; +}; + +export type PetrinautTerminalEvent = { + kind: 'net_halted' | 'net_deadlocked'; + ts: string; + runId: string; +}; + +export type PetrinautEvent = + | PetrinautInitialMarkingEvent + | PetrinautTransitionFiredEvent + | PetrinautTerminalEvent; + +export type CreatePetrinautEventStreamOpts = { + runId: string; + /** When set, every event is appended as one JSON object per line. */ + filePath?: string; + /** Override the per-token UUID generator (tests). */ + tokenIdFn?: () => string; + /** Fan-out for in-memory consumers (tests, sync-server forwarder). */ + onEvent?: (event: PetrinautEvent) => void; +}; + +export type PetrinautEventStream = { + /** NetEventSink to pass into `PetriNet.run()`. */ + sink: NetEventSink; + /** Emit the initial marking event from a compiled blueprint. Call once before `net.run()`. */ + emitInitialMarking(blueprint: NetBlueprint): void; +}; + +/** + * Create a Petrinaut-shaped event stream. Returns a NetEventSink adapter and + * a helper to emit the initial marking up-front. The stream writes one JSON + * object per line to `filePath` when provided, and also fans out to + * `onEvent` so in-process consumers (tests, the sync server) can subscribe + * without re-reading the file. + */ +export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts): PetrinautEventStream { + const { runId, filePath, onEvent } = opts; + const tokenId = opts.tokenIdFn ?? randomUUID; + + // Initialize the file as empty so the first append produces a well-formed JSONL file. + if (filePath) writeFileSync(filePath, ''); + + function publish(event: PetrinautEvent): void { + if (filePath) appendFileSync(filePath, `${JSON.stringify(event)}\n`); + onEvent?.(event); + } + + function groupTokens( + places: string[] | undefined, + tokens: Token[][] | undefined, + ): Record { + const out: Record = {}; + if (!places || !tokens) return out; + for (let i = 0; i < places.length; i++) { + const place = places[i]!; + const placeTokens = tokens[i] ?? []; + const list = out[place] ?? []; + for (const t of placeTokens) list.push(tokenToPetrinaut(t, tokenId)); + out[place] = list; + } + return out; + } + + const sink: NetEventSink = { + emit(event: NetEvent): void { + switch (event.kind) { + case 'transition_fired': { + publish({ + kind: 'transition_fired', + ts: event.ts, + runId, + transitionName: event.transitionId ?? '', + input: groupTokens(event.consumed, event.consumedTokens), + output: groupTokens(event.produced, event.producedTokens), + }); + return; + } + case 'net_halted': + case 'net_deadlocked': { + publish({ kind: event.kind, ts: event.ts, runId }); + return; + } + } + }, + }; + + function emitInitialMarking(blueprint: NetBlueprint): void { + const marking: Record = {}; + for (const { place, token } of blueprint.initialTokens) { + const list = marking[place] ?? []; + list.push(seedToPetrinaut(token, tokenId())); + marking[place] = list; + } + publish({ + kind: 'initial_marking', + ts: new Date().toISOString(), + runId, + marking, + }); + } + + return { sink, emitInitialMarking }; +} + +function tokenToPetrinaut(token: Token, idFn: () => string): PetrinautToken { + return { + id: idFn(), + ...(token.sliceId ? { sliceId: token.sliceId } : {}), + ...(token.epicId ? { epicId: token.epicId } : {}), + ...(token.retryCount !== undefined ? { retryCount: token.retryCount } : {}), + ...(token.reworkCount !== undefined ? { reworkCount: token.reworkCount } : {}), + ...(token.haltReason !== undefined ? { haltReason: token.haltReason } : {}), + }; +} + +function seedToPetrinaut(seed: TokenSeed, id: string): PetrinautToken { + return { + id, + ...(seed.sliceId ? { sliceId: seed.sliceId } : {}), + ...(seed.epicId ? { epicId: seed.epicId } : {}), + ...(seed.retryCount !== undefined ? { retryCount: seed.retryCount } : {}), + ...(seed.reworkCount !== undefined ? { reworkCount: seed.reworkCount } : {}), + }; +} From d9813bacc8c403b94ad0f4014de8f2481a4b13ec Mon Sep 17 00:00:00 2001 From: Kostandin Angjellari Date: Thu, 28 May 2026 12:33:16 +0200 Subject: [PATCH 2/4] fix: harden Petrinaut event stream adapter Fail fast on malformed transition_fired events, preserve per-place empty arrays when arc metadata is partial, and align happy-path test naming. --- src/orchestrator/src/engine-contract.test.ts | 2 +- src/orchestrator/src/petrinaut-events.test.ts | 39 +++++++++++++++++++ src/orchestrator/src/petrinaut-events.ts | 11 +++++- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/src/orchestrator/src/engine-contract.test.ts b/src/orchestrator/src/engine-contract.test.ts index 842bf2b67..07d5bb189 100644 --- a/src/orchestrator/src/engine-contract.test.ts +++ b/src/orchestrator/src/engine-contract.test.ts @@ -916,7 +916,7 @@ describe('Adapter: §7 event vocabulary', () => { // --------------------------------------------------------------------------- describe('FE-763: Petrinaut event stream on a real run', () => { - it('emits initial_marking + transition_fired (with token payload) + net_halted for simplePlan happy path', async () => { + it('emits initial_marking + transition_fired (with token payload) for simplePlan happy path', async () => { const fakes = createFakes(); const ctx: RunCtx = { reportIds: [], diff --git a/src/orchestrator/src/petrinaut-events.test.ts b/src/orchestrator/src/petrinaut-events.test.ts index eef00cd16..41ead47de 100644 --- a/src/orchestrator/src/petrinaut-events.test.ts +++ b/src/orchestrator/src/petrinaut-events.test.ts @@ -101,6 +101,45 @@ describe('createPetrinautEventStream — transition_fired adapter', () => { expect(ev.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); }); + it('throws when transition_fired is missing transitionId', () => { + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + }); + expect(() => + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + consumed: ['slice:slice-1:spec-ready'], + consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + produced: [], + producedTokens: [], + }), + ).toThrow(/missing transitionId/); + }); + + it('emits empty token arrays per place when places are present without tokens', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + produced: ['slice:slice-1:evaluate:running'], + }); + + const ev = events[0]! as PetrinautTransitionFiredEvent; + expect(Object.keys(ev.input).sort()).toEqual(['pool:test-agent', 'slice:slice-1:spec-ready']); + expect(ev.input['slice:slice-1:spec-ready']).toEqual([]); + expect(ev.input['pool:test-agent']).toEqual([]); + expect(ev.output['slice:slice-1:evaluate:running']).toEqual([]); + }); + it('forwards net_halted and net_deadlocked as terminal events', () => { const events: PetrinautEvent[] = []; const stream = createPetrinautEventStream({ diff --git a/src/orchestrator/src/petrinaut-events.ts b/src/orchestrator/src/petrinaut-events.ts index aa4024c07..a21c839b5 100644 --- a/src/orchestrator/src/petrinaut-events.ts +++ b/src/orchestrator/src/petrinaut-events.ts @@ -115,7 +115,11 @@ export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts) tokens: Token[][] | undefined, ): Record { const out: Record = {}; - if (!places || !tokens) return out; + if (!places) return out; + if (!tokens) { + for (const place of places) out[place] = []; + return out; + } for (let i = 0; i < places.length; i++) { const place = places[i]!; const placeTokens = tokens[i] ?? []; @@ -130,11 +134,14 @@ export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts) emit(event: NetEvent): void { switch (event.kind) { case 'transition_fired': { + if (!event.transitionId) { + throw new Error('transition_fired NetEvent missing transitionId'); + } publish({ kind: 'transition_fired', ts: event.ts, runId, - transitionName: event.transitionId ?? '', + transitionName: event.transitionId, input: groupTokens(event.consumed, event.consumedTokens), output: groupTokens(event.produced, event.producedTokens), }); From e9e9d6601257a8b109db9374b5fb3b02f9d7a8aa Mon Sep 17 00:00:00 2001 From: Kostandin Angjellari Date: Fri, 29 May 2026 14:27:22 +0200 Subject: [PATCH 3/4] Make Petrinaut event stream best-effort Wrap stream creation in try/catch so disk failures match net.json export semantics and cannot halt an otherwise valid cook run. Co-authored-by: Cursor --- src/orchestrator/src/engine.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/orchestrator/src/engine.ts b/src/orchestrator/src/engine.ts index ee293a407..1874942aa 100644 --- a/src/orchestrator/src/engine.ts +++ b/src/orchestrator/src/engine.ts @@ -56,12 +56,16 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { // callers without a runDir get the existing no-op behavior. let eventSink: NetEventSink | undefined; if (input.runDir) { - const stream = createPetrinautEventStream({ - runId: input.runId ?? 'unknown', - filePath: join(input.runDir, 'petrinaut-events.jsonl'), - }); - stream.emitInitialMarking(blueprint); - eventSink = stream.sink; + try { + const stream = createPetrinautEventStream({ + runId: input.runId ?? 'unknown', + filePath: join(input.runDir, 'petrinaut-events.jsonl'), + }); + stream.emitInitialMarking(blueprint); + eventSink = stream.sink; + } catch { + // Best-effort integration output — don't fail the cook run. + } } const net = wireHandlers(blueprint, input, ctx); From 41779a0746f476df63bad0cb0d4481d80509ceb0 Mon Sep 17 00:00:00 2001 From: Kostandin Angjellari Date: Tue, 2 Jun 2026 11:00:15 +0200 Subject: [PATCH 4/4] Surface Petrinaut stream integration failures. Co-authored-by: Cursor --- src/orchestrator/src/cook-cli.ts | 3 ++ src/orchestrator/src/engine-contract.test.ts | 22 +++++++++++ src/orchestrator/src/engine.ts | 16 ++++++-- src/orchestrator/src/net-compiler.ts | 8 ++-- src/orchestrator/src/petri-net.ts | 39 +++++++++---------- src/orchestrator/src/petrinaut-events.test.ts | 39 ++++++++++++++++--- src/orchestrator/src/petrinaut-events.ts | 28 ++++++++----- src/orchestrator/src/types.ts | 2 + 8 files changed, 114 insertions(+), 43 deletions(-) diff --git a/src/orchestrator/src/cook-cli.ts b/src/orchestrator/src/cook-cli.ts index a630f089d..04e08409d 100644 --- a/src/orchestrator/src/cook-cli.ts +++ b/src/orchestrator/src/cook-cli.ts @@ -175,6 +175,9 @@ export async function runCook(opts: CookOptions): Promise { console.error( ` ${ok ? '✓' : '✗'} ${result.status}${result.reason ? ` — ${result.reason}` : ''} (${duration})`, ); + for (const warning of result.warnings) { + console.error(` ! ${warning}`); + } console.error(''); for (const e of result.epics) { diff --git a/src/orchestrator/src/engine-contract.test.ts b/src/orchestrator/src/engine-contract.test.ts index 07d5bb189..229cf6e60 100644 --- a/src/orchestrator/src/engine-contract.test.ts +++ b/src/orchestrator/src/engine-contract.test.ts @@ -977,6 +977,28 @@ describe('FE-763: Petrinaut event stream on a real run', () => { expect(events.filter((e) => e.kind === 'net_halted')).toHaveLength(0); expect(events.filter((e) => e.kind === 'net_deadlocked')).toHaveLength(0); }); + + it('surfaces Petrinaut integration failures as warnings without halting the run', async () => { + const fakes = createFakes(); + const result = await createOrchestrator('serial').run({ + plan: simplePlan, + sandboxDir: '/tmp/fake', + actions: fakes.actions, + reports: fakes.reports, + testRunner: fakes.testRunner, + policy: { maxRetries: 3 }, + runId: 'run-warnings', + runDir: join(tmpdir(), 'brunch-missing-run-dir', 'child'), + }); + + expect(result.status).toBe('completed'); + expect(result.warnings).toEqual( + expect.arrayContaining([ + expect.stringContaining('Petrinaut net export disabled:'), + expect.stringContaining('Petrinaut event stream disabled:'), + ]), + ); + }); }); // --------------------------------------------------------------------------- diff --git a/src/orchestrator/src/engine.ts b/src/orchestrator/src/engine.ts index 1874942aa..d5f83a251 100644 --- a/src/orchestrator/src/engine.ts +++ b/src/orchestrator/src/engine.ts @@ -18,6 +18,10 @@ import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from // comes from the halt token itself (`token.haltReason`). // --------------------------------------------------------------------------- +function errorMessage(err: unknown): string { + return err instanceof Error ? err.message : String(err); +} + export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { return { async run(input: OrchestratorInput): Promise { @@ -25,6 +29,7 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { reportIds: [], sliceOutcomes: new Map(), epicOutcomes: new Map(), + warnings: [], }; let haltReason: string | undefined; @@ -45,8 +50,9 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { // straight into the Petrinaut editor's file-picker import. const sdcpn = toSdcpnFile(serialized, {}); writeFileSync(join(input.runDir, 'net.sdcpn.json'), `${JSON.stringify(sdcpn, null, 2)}\n`); - } catch { + } catch (err) { // Best-effort integration output — don't fail the cook run. + ctx.warnings?.push(`Petrinaut net export disabled: ${errorMessage(err)}`); } } @@ -60,11 +66,13 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { const stream = createPetrinautEventStream({ runId: input.runId ?? 'unknown', filePath: join(input.runDir, 'petrinaut-events.jsonl'), + onError: (message) => ctx.warnings?.push(message), }); stream.emitInitialMarking(blueprint); eventSink = stream.sink; - } catch { + } catch (err) { // Best-effort integration output — don't fail the cook run. + ctx.warnings?.push(`Petrinaut event stream disabled: ${errorMessage(err)}`); } } @@ -83,7 +91,8 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { } catch (err) { return { status: 'halted', - reason: err instanceof Error ? err.message : String(err), + reason: errorMessage(err), + warnings: ctx.warnings ?? [], reports: [...ctx.reportIds], epics: input.plan.epics.map( (e) => ctx.epicOutcomes.get(e.id) ?? { epicId: e.id, status: 'halted' as const }, @@ -117,6 +126,7 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { return { status: halted ? 'halted' : 'completed', reason: haltReason, + warnings: ctx.warnings ?? [], reports: [...ctx.reportIds], epics: input.plan.epics.map((e) => ctx.epicOutcomes.get(e.id)!), slices: input.plan.slices.map((s) => ctx.sliceOutcomes.get(s.id)!), diff --git a/src/orchestrator/src/net-compiler.ts b/src/orchestrator/src/net-compiler.ts index eddec9d2e..b8d84b253 100644 --- a/src/orchestrator/src/net-compiler.ts +++ b/src/orchestrator/src/net-compiler.ts @@ -643,7 +643,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, } return out; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; @@ -740,7 +740,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, retryCount: retryCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; @@ -795,7 +795,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, reworkCount: reworkCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; @@ -877,7 +877,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, // happens in sibling-passthrough transitions downstream. return [{ place: intermediatePlace, token: { ...inputToken, reportId } }]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, consumed, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; diff --git a/src/orchestrator/src/petri-net.ts b/src/orchestrator/src/petri-net.ts index d6649f8b0..74dacfef3 100644 --- a/src/orchestrator/src/petri-net.ts +++ b/src/orchestrator/src/petri-net.ts @@ -68,11 +68,11 @@ export type NetEventKind = 'transition_fired' | 'net_deadlocked' | 'net_halted'; /** * Structured event emitted during net execution. * - * `consumed` / `produced` are place-name lists (one entry per arc). The - * parallel `consumedTokens` / `producedTokens` carry the actual tokens - * that traversed each arc, indexed the same way — they are populated for - * `transition_fired` events so downstream adapters (e.g. the FE-763 - * Petrinaut event stream) can include token payload in the wire format. + * `consumed` / `produced` are place-name lists (one entry per arc). + * `consumedTokens` / `producedTokens` carry the single token that traversed + * each corresponding arc. Brunch currently models one-token-per-arc firing; + * if multi-token arcs become real, this contract should change deliberately + * with tests that exercise that capability. */ export type NetEvent = { kind: NetEventKind; @@ -80,9 +80,9 @@ export type NetEvent = { transitionId?: string; contract?: TransitionContract; consumed?: string[]; - consumedTokens?: Token[][]; + consumedTokens?: Token[]; produced?: string[]; - producedTokens?: Token[][]; + producedTokens?: Token[]; }; /** Sink for structured net events. Optional — defaults to no-op. */ @@ -111,6 +111,7 @@ export function placeName(placeId: string): string { } type TransitionClaim = { transition: TransitionDef; consumed: Token[] }; +export type ConsumedClaim = { places: string[]; tokens: Token[] }; export class PetriNet { private places = new Map(); @@ -151,15 +152,12 @@ export class PetriNet { scheduleDeferred( transitionId: string, contract: TransitionContract | undefined, - consumedPlaces: string[], - consumedTokens: Token[], + consumed: ConsumedClaim, work: Promise<{ place: string; token: Token }[]>, ): void { this.pendingDeferred++; work - .then((outputs) => - this.completeDeferred(transitionId, contract, consumedPlaces, consumedTokens, outputs), - ) + .then((outputs) => this.completeDeferred(transitionId, contract, consumed, outputs)) .catch((err) => { this.deferredError ??= err; this.pendingDeferred--; @@ -170,24 +168,23 @@ export class PetriNet { private completeDeferred( transitionId: string, contract: TransitionContract | undefined, - consumedPlaces: string[], - consumedTokens: Token[], + consumed: ConsumedClaim, outputs: { place: string; token: Token }[], ): void { const producedPlaces: string[] = []; - const producedTokens: Token[][] = []; + const producedTokens: Token[] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); - producedTokens.push([token]); + producedTokens.push(token); } this.deferredEventSink?.emit({ kind: 'transition_fired', ts: new Date().toISOString(), transitionId, contract, - consumed: consumedPlaces, - consumedTokens: consumedTokens.map((t) => [t]), + consumed: consumed.places, + consumedTokens: consumed.tokens, produced: producedPlaces, producedTokens, }); @@ -300,11 +297,11 @@ export class PetriNet { eventSink?: NetEventSink, ): void { const producedPlaces: string[] = []; - const producedTokens: Token[][] = []; + const producedTokens: Token[] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); - producedTokens.push([token]); + producedTokens.push(token); } // Deferred handlers return [] synchronously; their transition_fired // event is emitted once from completeDeferred when outputs land. @@ -315,7 +312,7 @@ export class PetriNet { transitionId: transition.id, contract: transition.contract, consumed: transition.inputs, - consumedTokens: consumed.map((t) => [t]), + consumedTokens: consumed, produced: producedPlaces, producedTokens, }); diff --git a/src/orchestrator/src/petrinaut-events.test.ts b/src/orchestrator/src/petrinaut-events.test.ts index 41ead47de..fda8b787b 100644 --- a/src/orchestrator/src/petrinaut-events.test.ts +++ b/src/orchestrator/src/petrinaut-events.test.ts @@ -1,4 +1,4 @@ -import { mkdtempSync, readFileSync } from 'node:fs'; +import { chmodSync, mkdtempSync, readFileSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; @@ -84,9 +84,12 @@ describe('createPetrinautEventStream — transition_fired adapter', () => { ts: '2026-05-27T00:00:00.000Z', transitionId: 'slice-1:evaluate:dispatch', consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], - consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }], [{ sliceId: '', epicId: '' }]], + consumedTokens: [ + { sliceId: 'slice-1', epicId: 'epic-1' }, + { sliceId: '', epicId: '' }, + ], produced: ['slice:slice-1:evaluate:running'], - producedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + producedTokens: [{ sliceId: 'slice-1', epicId: 'epic-1' }], }); expect(events).toHaveLength(1); @@ -111,7 +114,7 @@ describe('createPetrinautEventStream — transition_fired adapter', () => { kind: 'transition_fired', ts: '2026-05-27T00:00:00.000Z', consumed: ['slice:slice-1:spec-ready'], - consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + consumedTokens: [{ sliceId: 'slice-1', epicId: 'epic-1' }], produced: [], producedTokens: [], }), @@ -182,9 +185,12 @@ describe('createPetrinautEventStream — JSONL file output', () => { ts: '2026-05-27T00:00:00.000Z', transitionId: 'slice-1:evaluate:dispatch', consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], - consumedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }], [{ sliceId: '', epicId: '' }]], + consumedTokens: [ + { sliceId: 'slice-1', epicId: 'epic-1' }, + { sliceId: '', epicId: '' }, + ], produced: ['slice:slice-1:evaluate:running'], - producedTokens: [[{ sliceId: 'slice-1', epicId: 'epic-1' }]], + producedTokens: [{ sliceId: 'slice-1', epicId: 'epic-1' }], }); // Terminal halt. @@ -210,4 +216,25 @@ describe('createPetrinautEventStream — JSONL file output', () => { expect(fired.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); expect(fired.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); }); + + it('disables file output after an append failure without throwing', () => { + const dir = mkdtempSync(join(tmpdir(), 'brunch-petrinaut-events-')); + const filePath = join(dir, 'petrinaut-events.jsonl'); + const events: PetrinautEvent[] = []; + const warnings: string[] = []; + + const stream = createPetrinautEventStream({ + runId: 'run-jsonl', + filePath, + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + onError: (message) => warnings.push(message), + }); + chmodSync(filePath, 0o444); + + expect(() => stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:01.000Z' })).not.toThrow(); + expect(events.map((e) => e.kind)).toEqual(['net_halted']); + expect(warnings).toHaveLength(1); + expect(warnings[0]).toContain('Petrinaut event stream disabled:'); + }); }); diff --git a/src/orchestrator/src/petrinaut-events.ts b/src/orchestrator/src/petrinaut-events.ts index a21c839b5..953c50742 100644 --- a/src/orchestrator/src/petrinaut-events.ts +++ b/src/orchestrator/src/petrinaut-events.ts @@ -25,10 +25,10 @@ // These flow naturally through `transition_fired` events as token payload. // 2. as a terminal `net_halted` event marking the run's end state. // -// Open coordination item (tracked on FE-763): token UUID lifecycle — -// today every emission generates fresh UUIDs (no lineage across -// consume→emit). When Petrinaut decides whether to persist token -// identity across firings this module is the seam to evolve. +// Decision needed with Petrinaut before treating token ids as durable +// identities: today every emission generates fresh UUIDs (no lineage across +// consume→emit). This module is the seam to evolve once identity semantics are +// settled. // --------------------------------------------------------------------------- import { randomUUID } from 'node:crypto'; @@ -82,6 +82,8 @@ export type CreatePetrinautEventStreamOpts = { tokenIdFn?: () => string; /** Fan-out for in-memory consumers (tests, sync-server forwarder). */ onEvent?: (event: PetrinautEvent) => void; + /** Receives best-effort file-output failures without failing the cook run. */ + onError?: (message: string) => void; }; export type PetrinautEventStream = { @@ -99,20 +101,28 @@ export type PetrinautEventStream = { * without re-reading the file. */ export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts): PetrinautEventStream { - const { runId, filePath, onEvent } = opts; + const { runId, filePath, onEvent, onError } = opts; const tokenId = opts.tokenIdFn ?? randomUUID; + let fileOutputDisabled = false; // Initialize the file as empty so the first append produces a well-formed JSONL file. if (filePath) writeFileSync(filePath, ''); function publish(event: PetrinautEvent): void { - if (filePath) appendFileSync(filePath, `${JSON.stringify(event)}\n`); + if (filePath && !fileOutputDisabled) { + try { + appendFileSync(filePath, `${JSON.stringify(event)}\n`); + } catch (err) { + fileOutputDisabled = true; + onError?.(`Petrinaut event stream disabled: ${err instanceof Error ? err.message : String(err)}`); + } + } onEvent?.(event); } function groupTokens( places: string[] | undefined, - tokens: Token[][] | undefined, + tokens: Token[] | undefined, ): Record { const out: Record = {}; if (!places) return out; @@ -122,9 +132,9 @@ export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts) } for (let i = 0; i < places.length; i++) { const place = places[i]!; - const placeTokens = tokens[i] ?? []; const list = out[place] ?? []; - for (const t of placeTokens) list.push(tokenToPetrinaut(t, tokenId)); + const token = tokens[i]; + if (token) list.push(tokenToPetrinaut(token, tokenId)); out[place] = list; } return out; diff --git a/src/orchestrator/src/types.ts b/src/orchestrator/src/types.ts index e1be280d3..d2acda31b 100644 --- a/src/orchestrator/src/types.ts +++ b/src/orchestrator/src/types.ts @@ -131,6 +131,7 @@ export type SliceOutcome = { export type OrchestratorResult = { status: 'completed' | 'halted'; reason?: string; + warnings: string[]; reports: string[]; epics: EpicOutcome[]; slices: SliceOutcome[]; @@ -155,4 +156,5 @@ export type RunCtx = { reportIds: string[]; sliceOutcomes: Map; epicOutcomes: Map; + warnings?: string[]; };