diff --git a/src/orchestrator/src/cook-cli.ts b/src/orchestrator/src/cook-cli.ts index a630f089d..04e08409d 100644 --- a/src/orchestrator/src/cook-cli.ts +++ b/src/orchestrator/src/cook-cli.ts @@ -175,6 +175,9 @@ export async function runCook(opts: CookOptions): Promise { console.error( ` ${ok ? '✓' : '✗'} ${result.status}${result.reason ? ` — ${result.reason}` : ''} (${duration})`, ); + for (const warning of result.warnings) { + console.error(` ! ${warning}`); + } console.error(''); for (const e of result.epics) { diff --git a/src/orchestrator/src/engine-contract.test.ts b/src/orchestrator/src/engine-contract.test.ts index 291815f28..229cf6e60 100644 --- a/src/orchestrator/src/engine-contract.test.ts +++ b/src/orchestrator/src/engine-contract.test.ts @@ -5,8 +5,13 @@ import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import { createOrchestrator } from './engine.js'; -import { compilePlan, compileTopology } from './net-compiler.js'; +import { compilePlan, compileTopology, wireHandlers } from './net-compiler.js'; import type { NetEvent } from './petri-net.js'; +import { + createPetrinautEventStream, + type PetrinautEvent, + type PetrinautTransitionFiredEvent, +} from './petrinaut-events.js'; import { InMemoryReportSink } from './report-sink.js'; import type { ActionContext, ActionHandlers, OrchestratorInput, Plan, RunCtx, TestRunner } from './types.js'; @@ -906,6 +911,96 @@ describe('Adapter: §7 event vocabulary', () => { }); }); +// --------------------------------------------------------------------------- +// FE-763 — Petrinaut event stream end-to-end on the orchestrator +// --------------------------------------------------------------------------- + +describe('FE-763: Petrinaut event stream on a real run', () => { + it('emits initial_marking + transition_fired (with token payload) for simplePlan happy path', async () => { + const fakes = createFakes(); + const ctx: RunCtx = { + reportIds: [], + sliceOutcomes: new Map(), + epicOutcomes: new Map(), + }; + const input: OrchestratorInput = { + plan: simplePlan, + sandboxDir: '/tmp/fake', + actions: fakes.actions, + reports: fakes.reports, + testRunner: fakes.testRunner, + policy: { maxRetries: 3 }, + }; + + const blueprint = compileTopology(input.plan, input.policy); + const net = wireHandlers(blueprint, input, ctx); + + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-e2e', + onEvent: (e) => events.push(e), + }); + stream.emitInitialMarking(blueprint); + await net.run('serial', () => net.hasHaltToken(), stream.sink); + + // 1. initial_marking is first. + expect(events[0]!.kind).toBe('initial_marking'); + + // 2. every event carries the runId. + expect(events.every((e) => 'runId' in e && e.runId === 'run-e2e')).toBe(true); + + // 3. transition_fired events expose the FE-761 Slice 4 dispatch/complete + // topology directly in Petrinaut's wire format. + const fired = events.filter((e): e is PetrinautTransitionFiredEvent => e.kind === 'transition_fired'); + const names = fired.map((e) => e.transitionName); + expect(names).toContain('slice-1:evaluate:dispatch'); + expect(names).toContain('slice-1:evaluate:complete'); + expect(names).toContain('slice-1:assess-semantic:dispatch'); + expect(names).toContain('slice-1:assess-semantic:complete'); + + // 4. each transition_fired carries per-place token data with a UUID + // (cross-team-agreed shape: { id: , ...payload }). + for (const e of fired) { + for (const tokens of Object.values(e.input)) { + for (const tok of tokens) expect(typeof tok.id).toBe('string'); + } + for (const tokens of Object.values(e.output)) { + for (const tok of tokens) expect(typeof tok.id).toBe('string'); + } + } + + // 5. happy path: no net_halted / net_deadlocked emitted (engine exits + // the loop cleanly when nothing remains enabled). When the cook + // fails — retry exhaustion etc. — Petrinaut sees the halt token + // travel through the topology as a transition_fired event landing + // in `slice::halted`, plus the engine emits net_halted. + expect(events.filter((e) => e.kind === 'net_halted')).toHaveLength(0); + expect(events.filter((e) => e.kind === 'net_deadlocked')).toHaveLength(0); + }); + + it('surfaces Petrinaut integration failures as warnings without halting the run', async () => { + const fakes = createFakes(); + const result = await createOrchestrator('serial').run({ + plan: simplePlan, + sandboxDir: '/tmp/fake', + actions: fakes.actions, + reports: fakes.reports, + testRunner: fakes.testRunner, + policy: { maxRetries: 3 }, + runId: 'run-warnings', + runDir: join(tmpdir(), 'brunch-missing-run-dir', 'child'), + }); + + expect(result.status).toBe('completed'); + expect(result.warnings).toEqual( + expect.arrayContaining([ + expect.stringContaining('Petrinaut net export disabled:'), + expect.stringContaining('Petrinaut event stream disabled:'), + ]), + ); + }); +}); + // --------------------------------------------------------------------------- // Contract test #12 — parallel fires concurrently // --------------------------------------------------------------------------- diff --git a/src/orchestrator/src/engine.ts b/src/orchestrator/src/engine.ts index fa81bff12..d5f83a251 100644 --- a/src/orchestrator/src/engine.ts +++ b/src/orchestrator/src/engine.ts @@ -2,7 +2,8 @@ import { writeFileSync } from 'node:fs'; import { join } from 'node:path'; import { compileTopology, wireHandlers } from './net-compiler.js'; -import type { FiringPolicy } from './petri-net.js'; +import type { FiringPolicy, NetEventSink } from './petri-net.js'; +import { createPetrinautEventStream } from './petrinaut-events.js'; import { serializeBlueprint } from './petrinaut-export.js'; import { toSdcpnFile } from './petrinaut-sdcpn.js'; import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from './types.js'; @@ -17,6 +18,10 @@ import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from // comes from the halt token itself (`token.haltReason`). // --------------------------------------------------------------------------- +function errorMessage(err: unknown): string { + return err instanceof Error ? err.message : String(err); +} + export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { return { async run(input: OrchestratorInput): Promise { @@ -24,6 +29,7 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { reportIds: [], sliceOutcomes: new Map(), epicOutcomes: new Map(), + warnings: [], }; let haltReason: string | undefined; @@ -44,13 +50,34 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { // straight into the Petrinaut editor's file-picker import. const sdcpn = toSdcpnFile(serialized, {}); writeFileSync(join(input.runDir, 'net.sdcpn.json'), `${JSON.stringify(sdcpn, null, 2)}\n`); - } catch { + } catch (err) { + // Best-effort integration output — don't fail the cook run. + ctx.warnings?.push(`Petrinaut net export disabled: ${errorMessage(err)}`); + } + } + + // FE-763: open a Petrinaut event stream when runDir is present. + // Emits an initial_marking event up-front, then transition_fired / + // net_halted / net_deadlocked events as the net runs. Library + // callers without a runDir get the existing no-op behavior. + let eventSink: NetEventSink | undefined; + if (input.runDir) { + try { + const stream = createPetrinautEventStream({ + runId: input.runId ?? 'unknown', + filePath: join(input.runDir, 'petrinaut-events.jsonl'), + onError: (message) => ctx.warnings?.push(message), + }); + stream.emitInitialMarking(blueprint); + eventSink = stream.sink; + } catch (err) { // Best-effort integration output — don't fail the cook run. + ctx.warnings?.push(`Petrinaut event stream disabled: ${errorMessage(err)}`); } } const net = wireHandlers(blueprint, input, ctx); - await net.run(firingPolicy, () => net.hasHaltToken()); + await net.run(firingPolicy, () => net.hasHaltToken(), eventSink); hasStructuralHalt = net.hasHaltToken(); // Derive halt reason from any halt token deposited during the run. @@ -64,7 +91,8 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { } catch (err) { return { status: 'halted', - reason: err instanceof Error ? err.message : String(err), + reason: errorMessage(err), + warnings: ctx.warnings ?? [], reports: [...ctx.reportIds], epics: input.plan.epics.map( (e) => ctx.epicOutcomes.get(e.id) ?? { epicId: e.id, status: 'halted' as const }, @@ -98,6 +126,7 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator { return { status: halted ? 'halted' : 'completed', reason: haltReason, + warnings: ctx.warnings ?? [], reports: [...ctx.reportIds], epics: input.plan.epics.map((e) => ctx.epicOutcomes.get(e.id)!), slices: input.plan.slices.map((s) => ctx.sliceOutcomes.get(s.id)!), diff --git a/src/orchestrator/src/net-compiler.ts b/src/orchestrator/src/net-compiler.ts index 4677c624d..b8d84b253 100644 --- a/src/orchestrator/src/net-compiler.ts +++ b/src/orchestrator/src/net-compiler.ts @@ -643,7 +643,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, } return out; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; @@ -740,7 +740,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, retryCount: retryCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; @@ -795,7 +795,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, { place: budgetPlace, token: { ...baseToken, reworkCount: reworkCount + 1 } }, ]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; @@ -877,7 +877,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput, // happens in sibling-passthrough transitions downstream. return [{ place: intermediatePlace, token: { ...inputToken, reportId } }]; })(); - net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred); + net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred); return []; }; break; diff --git a/src/orchestrator/src/petri-net.ts b/src/orchestrator/src/petri-net.ts index 661aad16d..74dacfef3 100644 --- a/src/orchestrator/src/petri-net.ts +++ b/src/orchestrator/src/petri-net.ts @@ -65,14 +65,24 @@ export type FiringPolicy = 'serial' | 'parallel'; /** Event kinds aligned with spec doc §7. */ export type NetEventKind = 'transition_fired' | 'net_deadlocked' | 'net_halted'; -/** Structured event emitted during net execution. */ +/** + * Structured event emitted during net execution. + * + * `consumed` / `produced` are place-name lists (one entry per arc). + * `consumedTokens` / `producedTokens` carry the single token that traversed + * each corresponding arc. Brunch currently models one-token-per-arc firing; + * if multi-token arcs become real, this contract should change deliberately + * with tests that exercise that capability. + */ export type NetEvent = { kind: NetEventKind; ts: string; transitionId?: string; contract?: TransitionContract; consumed?: string[]; + consumedTokens?: Token[]; produced?: string[]; + producedTokens?: Token[]; }; /** Sink for structured net events. Optional — defaults to no-op. */ @@ -101,6 +111,7 @@ export function placeName(placeId: string): string { } type TransitionClaim = { transition: TransitionDef; consumed: Token[] }; +export type ConsumedClaim = { places: string[]; tokens: Token[] }; export class PetriNet { private places = new Map(); @@ -141,12 +152,12 @@ export class PetriNet { scheduleDeferred( transitionId: string, contract: TransitionContract | undefined, - consumedPlaces: string[], + consumed: ConsumedClaim, work: Promise<{ place: string; token: Token }[]>, ): void { this.pendingDeferred++; work - .then((outputs) => this.completeDeferred(transitionId, contract, consumedPlaces, outputs)) + .then((outputs) => this.completeDeferred(transitionId, contract, consumed, outputs)) .catch((err) => { this.deferredError ??= err; this.pendingDeferred--; @@ -157,21 +168,25 @@ export class PetriNet { private completeDeferred( transitionId: string, contract: TransitionContract | undefined, - consumedPlaces: string[], + consumed: ConsumedClaim, outputs: { place: string; token: Token }[], ): void { const producedPlaces: string[] = []; + const producedTokens: Token[] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); + producedTokens.push(token); } this.deferredEventSink?.emit({ kind: 'transition_fired', ts: new Date().toISOString(), transitionId, contract, - consumed: consumedPlaces, + consumed: consumed.places, + consumedTokens: consumed.tokens, produced: producedPlaces, + producedTokens, }); this.pendingDeferred--; this.wakeOneWaiter(); @@ -277,14 +292,16 @@ export class PetriNet { } private depositClaim( - { transition, consumed: _consumed }: TransitionClaim, + { transition, consumed }: TransitionClaim, outputs: { place: string; token: Token }[], eventSink?: NetEventSink, ): void { const producedPlaces: string[] = []; + const producedTokens: Token[] = []; for (const { place, token } of outputs) { this.addToken(place, token); producedPlaces.push(place); + producedTokens.push(token); } // Deferred handlers return [] synchronously; their transition_fired // event is emitted once from completeDeferred when outputs land. @@ -295,7 +312,9 @@ export class PetriNet { transitionId: transition.id, contract: transition.contract, consumed: transition.inputs, + consumedTokens: consumed, produced: producedPlaces, + producedTokens, }); } diff --git a/src/orchestrator/src/petrinaut-events.test.ts b/src/orchestrator/src/petrinaut-events.test.ts new file mode 100644 index 000000000..fda8b787b --- /dev/null +++ b/src/orchestrator/src/petrinaut-events.test.ts @@ -0,0 +1,240 @@ +import { chmodSync, mkdtempSync, readFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; + +import { describe, expect, it } from 'vitest'; + +import { compileTopology } from './net-compiler.js'; +import { + createPetrinautEventStream, + type PetrinautEvent, + type PetrinautTransitionFiredEvent, +} from './petrinaut-events.js'; +import type { Plan } from './types.js'; + +const simplePlan: Plan = { + epics: [{ id: 'epic-1', summary: 'E', depends_on: [], verification: [] }], + slices: [ + { + id: 'slice-1', + epic_id: 'epic-1', + definition: 'D', + depends_on: [], + verification: [{ kind: 'unit-test', target: 't' }], + }, + ], +}; + +/** Deterministic UUID stub for stable event snapshots. */ +function deterministicTokenId(): () => string { + let n = 0; + return () => `tok-${++n}`; +} + +// --------------------------------------------------------------------------- +// Unit tests — createPetrinautEventStream as a NetEventSink adapter +// --------------------------------------------------------------------------- + +describe('createPetrinautEventStream — initial_marking', () => { + it('emits one initial_marking event grouping every initial token by place', () => { + const blueprint = compileTopology(simplePlan, { maxRetries: 3 }); + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.emitInitialMarking(blueprint); + + expect(events).toHaveLength(1); + const ev = events[0]!; + expect(ev.kind).toBe('initial_marking'); + if (ev.kind !== 'initial_marking') return; // narrow + + expect(ev.runId).toBe('run-1'); + expect(Object.keys(ev.marking).sort()).toEqual( + [ + 'pool:code-agent', + 'pool:test-agent', + 'slice:slice-1:eligible', + 'slice:slice-1:retry-budget', + 'slice:slice-1:semantic-budget', + ].sort(), + ); + + // Every token carries an id; semantic payloads preserved. + const retry = ev.marking['slice:slice-1:retry-budget']!; + expect(retry).toHaveLength(1); + expect(retry[0]!.id).toBeDefined(); + expect(retry[0]!.retryCount).toBe(0); + expect(retry[0]!.sliceId).toBe('slice-1'); + }); +}); + +describe('createPetrinautEventStream — transition_fired adapter', () => { + it('translates a NetEvent into the cross-team-agreed transition_fired shape', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + consumedTokens: [ + { sliceId: 'slice-1', epicId: 'epic-1' }, + { sliceId: '', epicId: '' }, + ], + produced: ['slice:slice-1:evaluate:running'], + producedTokens: [{ sliceId: 'slice-1', epicId: 'epic-1' }], + }); + + expect(events).toHaveLength(1); + const ev = events[0]! as PetrinautTransitionFiredEvent; + expect(ev.kind).toBe('transition_fired'); + expect(ev.runId).toBe('run-1'); + expect(ev.transitionName).toBe('slice-1:evaluate:dispatch'); + expect(Object.keys(ev.input).sort()).toEqual(['pool:test-agent', 'slice:slice-1:spec-ready']); + expect(ev.input['slice:slice-1:spec-ready']).toHaveLength(1); + expect(ev.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); + expect(Object.keys(ev.output)).toEqual(['slice:slice-1:evaluate:running']); + expect(ev.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); + }); + + it('throws when transition_fired is missing transitionId', () => { + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + }); + expect(() => + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + consumed: ['slice:slice-1:spec-ready'], + consumedTokens: [{ sliceId: 'slice-1', epicId: 'epic-1' }], + produced: [], + producedTokens: [], + }), + ).toThrow(/missing transitionId/); + }); + + it('emits empty token arrays per place when places are present without tokens', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + produced: ['slice:slice-1:evaluate:running'], + }); + + const ev = events[0]! as PetrinautTransitionFiredEvent; + expect(Object.keys(ev.input).sort()).toEqual(['pool:test-agent', 'slice:slice-1:spec-ready']); + expect(ev.input['slice:slice-1:spec-ready']).toEqual([]); + expect(ev.input['pool:test-agent']).toEqual([]); + expect(ev.output['slice:slice-1:evaluate:running']).toEqual([]); + }); + + it('forwards net_halted and net_deadlocked as terminal events', () => { + const events: PetrinautEvent[] = []; + const stream = createPetrinautEventStream({ + runId: 'run-1', + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + }); + stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:00.000Z' }); + stream.sink.emit({ kind: 'net_deadlocked', ts: '2026-05-27T00:00:01.000Z' }); + + expect(events.map((e) => e.kind)).toEqual(['net_halted', 'net_deadlocked']); + expect(events.every((e) => 'runId' in e && e.runId === 'run-1')).toBe(true); + }); +}); + +// --------------------------------------------------------------------------- +// File output — JSONL roundtrip +// --------------------------------------------------------------------------- + +describe('createPetrinautEventStream — JSONL file output', () => { + it('appends one event per line and reloads as parsed events', () => { + const dir = mkdtempSync(join(tmpdir(), 'brunch-petrinaut-events-')); + const filePath = join(dir, 'petrinaut-events.jsonl'); + const blueprint = compileTopology(simplePlan, { maxRetries: 3 }); + + const stream = createPetrinautEventStream({ + runId: 'run-jsonl', + filePath, + tokenIdFn: deterministicTokenId(), + }); + + // Up-front initial marking. + stream.emitInitialMarking(blueprint); + + // A synthetic transition_fired (the production path goes through the + // NetEventSink during PetriNet.run; here we exercise the same adapter + // directly to avoid coupling this test to the heavy orchestrator path). + stream.sink.emit({ + kind: 'transition_fired', + ts: '2026-05-27T00:00:00.000Z', + transitionId: 'slice-1:evaluate:dispatch', + consumed: ['slice:slice-1:spec-ready', 'pool:test-agent'], + consumedTokens: [ + { sliceId: 'slice-1', epicId: 'epic-1' }, + { sliceId: '', epicId: '' }, + ], + produced: ['slice:slice-1:evaluate:running'], + producedTokens: [{ sliceId: 'slice-1', epicId: 'epic-1' }], + }); + + // Terminal halt. + stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:01.000Z' }); + + const raw = readFileSync(filePath, 'utf8'); + const lines = raw + .trim() + .split('\n') + .map((l) => JSON.parse(l) as PetrinautEvent); + + expect(lines).toHaveLength(3); + expect(lines[0]!.kind).toBe('initial_marking'); + expect(lines[1]!.kind).toBe('transition_fired'); + expect(lines[2]!.kind).toBe('net_halted'); + + // Every event carries runId for cross-run isolation. + expect(lines.every((e) => 'runId' in e && e.runId === 'run-jsonl')).toBe(true); + + // Transition_fired arcs carry tokens with payload. + const fired = lines[1] as PetrinautTransitionFiredEvent; + expect(fired.transitionName).toBe('slice-1:evaluate:dispatch'); + expect(fired.input['slice:slice-1:spec-ready']![0]!.sliceId).toBe('slice-1'); + expect(fired.output['slice:slice-1:evaluate:running']![0]!.id).toBeDefined(); + }); + + it('disables file output after an append failure without throwing', () => { + const dir = mkdtempSync(join(tmpdir(), 'brunch-petrinaut-events-')); + const filePath = join(dir, 'petrinaut-events.jsonl'); + const events: PetrinautEvent[] = []; + const warnings: string[] = []; + + const stream = createPetrinautEventStream({ + runId: 'run-jsonl', + filePath, + tokenIdFn: deterministicTokenId(), + onEvent: (e) => events.push(e), + onError: (message) => warnings.push(message), + }); + chmodSync(filePath, 0o444); + + expect(() => stream.sink.emit({ kind: 'net_halted', ts: '2026-05-27T00:00:01.000Z' })).not.toThrow(); + expect(events.map((e) => e.kind)).toEqual(['net_halted']); + expect(warnings).toHaveLength(1); + expect(warnings[0]).toContain('Petrinaut event stream disabled:'); + }); +}); diff --git a/src/orchestrator/src/petrinaut-events.ts b/src/orchestrator/src/petrinaut-events.ts new file mode 100644 index 000000000..953c50742 --- /dev/null +++ b/src/orchestrator/src/petrinaut-events.ts @@ -0,0 +1,206 @@ +// --------------------------------------------------------------------------- +// FE-763 — Petrinaut event stream for a live cook run. +// +// Adapts the orchestrator's internal NetEvent stream into the cross-team- +// agreed Petrinaut event format, plus an `initial_marking` event emitted +// once at run start from the compiled blueprint. +// +// Wire format (2026-05-26 alignment): +// +// transition_fired: +// { kind, ts, runId, transitionName, +// input: { : [{ id: , ...payload }] }, +// output: { : [{ id: , ...payload }] } } +// +// initial_marking: +// { kind, ts, runId, +// marking: { : [{ id: , ...payload }] } } +// +// net_halted / net_deadlocked: +// { kind, ts, runId } +// +// Halt outcomes appear in two complementary forms: +// 1. structurally — as halt tokens on `slice::halted` / `epic::halted` +// places (deposited by the FE-761 Slice 2b halted-as-place refactor). +// These flow naturally through `transition_fired` events as token payload. +// 2. as a terminal `net_halted` event marking the run's end state. +// +// Decision needed with Petrinaut before treating token ids as durable +// identities: today every emission generates fresh UUIDs (no lineage across +// consume→emit). This module is the seam to evolve once identity semantics are +// settled. +// --------------------------------------------------------------------------- + +import { randomUUID } from 'node:crypto'; +import { appendFileSync, writeFileSync } from 'node:fs'; + +import type { NetBlueprint, TokenSeed } from './net-blueprint.js'; +import type { NetEvent, NetEventSink, Token } from './petri-net.js'; + +export type PetrinautToken = { + id: string; + sliceId?: string; + epicId?: string; + retryCount?: number; + reworkCount?: number; + /** Halt reason carried by halt tokens (FE-761 Slice 2b). */ + haltReason?: string; +}; + +export type PetrinautInitialMarkingEvent = { + kind: 'initial_marking'; + ts: string; + runId: string; + marking: Record; +}; + +export type PetrinautTransitionFiredEvent = { + kind: 'transition_fired'; + ts: string; + runId: string; + transitionName: string; + input: Record; + output: Record; +}; + +export type PetrinautTerminalEvent = { + kind: 'net_halted' | 'net_deadlocked'; + ts: string; + runId: string; +}; + +export type PetrinautEvent = + | PetrinautInitialMarkingEvent + | PetrinautTransitionFiredEvent + | PetrinautTerminalEvent; + +export type CreatePetrinautEventStreamOpts = { + runId: string; + /** When set, every event is appended as one JSON object per line. */ + filePath?: string; + /** Override the per-token UUID generator (tests). */ + tokenIdFn?: () => string; + /** Fan-out for in-memory consumers (tests, sync-server forwarder). */ + onEvent?: (event: PetrinautEvent) => void; + /** Receives best-effort file-output failures without failing the cook run. */ + onError?: (message: string) => void; +}; + +export type PetrinautEventStream = { + /** NetEventSink to pass into `PetriNet.run()`. */ + sink: NetEventSink; + /** Emit the initial marking event from a compiled blueprint. Call once before `net.run()`. */ + emitInitialMarking(blueprint: NetBlueprint): void; +}; + +/** + * Create a Petrinaut-shaped event stream. Returns a NetEventSink adapter and + * a helper to emit the initial marking up-front. The stream writes one JSON + * object per line to `filePath` when provided, and also fans out to + * `onEvent` so in-process consumers (tests, the sync server) can subscribe + * without re-reading the file. + */ +export function createPetrinautEventStream(opts: CreatePetrinautEventStreamOpts): PetrinautEventStream { + const { runId, filePath, onEvent, onError } = opts; + const tokenId = opts.tokenIdFn ?? randomUUID; + let fileOutputDisabled = false; + + // Initialize the file as empty so the first append produces a well-formed JSONL file. + if (filePath) writeFileSync(filePath, ''); + + function publish(event: PetrinautEvent): void { + if (filePath && !fileOutputDisabled) { + try { + appendFileSync(filePath, `${JSON.stringify(event)}\n`); + } catch (err) { + fileOutputDisabled = true; + onError?.(`Petrinaut event stream disabled: ${err instanceof Error ? err.message : String(err)}`); + } + } + onEvent?.(event); + } + + function groupTokens( + places: string[] | undefined, + tokens: Token[] | undefined, + ): Record { + const out: Record = {}; + if (!places) return out; + if (!tokens) { + for (const place of places) out[place] = []; + return out; + } + for (let i = 0; i < places.length; i++) { + const place = places[i]!; + const list = out[place] ?? []; + const token = tokens[i]; + if (token) list.push(tokenToPetrinaut(token, tokenId)); + out[place] = list; + } + return out; + } + + const sink: NetEventSink = { + emit(event: NetEvent): void { + switch (event.kind) { + case 'transition_fired': { + if (!event.transitionId) { + throw new Error('transition_fired NetEvent missing transitionId'); + } + publish({ + kind: 'transition_fired', + ts: event.ts, + runId, + transitionName: event.transitionId, + input: groupTokens(event.consumed, event.consumedTokens), + output: groupTokens(event.produced, event.producedTokens), + }); + return; + } + case 'net_halted': + case 'net_deadlocked': { + publish({ kind: event.kind, ts: event.ts, runId }); + return; + } + } + }, + }; + + function emitInitialMarking(blueprint: NetBlueprint): void { + const marking: Record = {}; + for (const { place, token } of blueprint.initialTokens) { + const list = marking[place] ?? []; + list.push(seedToPetrinaut(token, tokenId())); + marking[place] = list; + } + publish({ + kind: 'initial_marking', + ts: new Date().toISOString(), + runId, + marking, + }); + } + + return { sink, emitInitialMarking }; +} + +function tokenToPetrinaut(token: Token, idFn: () => string): PetrinautToken { + return { + id: idFn(), + ...(token.sliceId ? { sliceId: token.sliceId } : {}), + ...(token.epicId ? { epicId: token.epicId } : {}), + ...(token.retryCount !== undefined ? { retryCount: token.retryCount } : {}), + ...(token.reworkCount !== undefined ? { reworkCount: token.reworkCount } : {}), + ...(token.haltReason !== undefined ? { haltReason: token.haltReason } : {}), + }; +} + +function seedToPetrinaut(seed: TokenSeed, id: string): PetrinautToken { + return { + id, + ...(seed.sliceId ? { sliceId: seed.sliceId } : {}), + ...(seed.epicId ? { epicId: seed.epicId } : {}), + ...(seed.retryCount !== undefined ? { retryCount: seed.retryCount } : {}), + ...(seed.reworkCount !== undefined ? { reworkCount: seed.reworkCount } : {}), + }; +} diff --git a/src/orchestrator/src/types.ts b/src/orchestrator/src/types.ts index e1be280d3..d2acda31b 100644 --- a/src/orchestrator/src/types.ts +++ b/src/orchestrator/src/types.ts @@ -131,6 +131,7 @@ export type SliceOutcome = { export type OrchestratorResult = { status: 'completed' | 'halted'; reason?: string; + warnings: string[]; reports: string[]; epics: EpicOutcome[]; slices: SliceOutcome[]; @@ -155,4 +156,5 @@ export type RunCtx = { reportIds: string[]; sliceOutcomes: Map; epicOutcomes: Map; + warnings?: string[]; };