Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/orchestrator/src/cook-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ export async function runCook(opts: CookOptions): Promise<void> {
console.error(
` ${ok ? '✓' : '✗'} ${result.status}${result.reason ? ` — ${result.reason}` : ''} (${duration})`,
);
for (const warning of result.warnings) {
console.error(` ! ${warning}`);
}
console.error('');

for (const e of result.epics) {
Expand Down
97 changes: 96 additions & 1 deletion src/orchestrator/src/engine-contract.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ import { join } from 'node:path';
import { describe, expect, it } from 'vitest';

import { createOrchestrator } from './engine.js';
import { compilePlan, compileTopology } from './net-compiler.js';
import { compilePlan, compileTopology, wireHandlers } from './net-compiler.js';
import type { NetEvent } from './petri-net.js';
import {
createPetrinautEventStream,
type PetrinautEvent,
type PetrinautTransitionFiredEvent,
} from './petrinaut-events.js';
import { InMemoryReportSink } from './report-sink.js';
import type { ActionContext, ActionHandlers, OrchestratorInput, Plan, RunCtx, TestRunner } from './types.js';

Expand Down Expand Up @@ -906,6 +911,96 @@ describe('Adapter: §7 event vocabulary', () => {
});
});

// ---------------------------------------------------------------------------
// FE-763 — Petrinaut event stream end-to-end on the orchestrator
// ---------------------------------------------------------------------------

describe('FE-763: Petrinaut event stream on a real run', () => {
it('emits initial_marking + transition_fired (with token payload) for simplePlan happy path', async () => {
const fakes = createFakes();
const ctx: RunCtx = {
reportIds: [],
sliceOutcomes: new Map(),
epicOutcomes: new Map(),
};
const input: OrchestratorInput = {
plan: simplePlan,
sandboxDir: '/tmp/fake',
actions: fakes.actions,
reports: fakes.reports,
testRunner: fakes.testRunner,
policy: { maxRetries: 3 },
};

const blueprint = compileTopology(input.plan, input.policy);
const net = wireHandlers(blueprint, input, ctx);

const events: PetrinautEvent[] = [];
const stream = createPetrinautEventStream({
runId: 'run-e2e',
onEvent: (e) => events.push(e),
});
stream.emitInitialMarking(blueprint);
await net.run('serial', () => net.hasHaltToken(), stream.sink);

// 1. initial_marking is first.
expect(events[0]!.kind).toBe('initial_marking');

// 2. every event carries the runId.
expect(events.every((e) => 'runId' in e && e.runId === 'run-e2e')).toBe(true);

// 3. transition_fired events expose the FE-761 Slice 4 dispatch/complete
// topology directly in Petrinaut's wire format.
const fired = events.filter((e): e is PetrinautTransitionFiredEvent => e.kind === 'transition_fired');
const names = fired.map((e) => e.transitionName);
expect(names).toContain('slice-1:evaluate:dispatch');
expect(names).toContain('slice-1:evaluate:complete');
expect(names).toContain('slice-1:assess-semantic:dispatch');
expect(names).toContain('slice-1:assess-semantic:complete');

// 4. each transition_fired carries per-place token data with a UUID
// (cross-team-agreed shape: { id: <UUID>, ...payload }).
for (const e of fired) {
for (const tokens of Object.values(e.input)) {
for (const tok of tokens) expect(typeof tok.id).toBe('string');
}
for (const tokens of Object.values(e.output)) {
for (const tok of tokens) expect(typeof tok.id).toBe('string');
}
}

// 5. happy path: no net_halted / net_deadlocked emitted (engine exits
// the loop cleanly when nothing remains enabled). When the cook
// fails — retry exhaustion etc. — Petrinaut sees the halt token
// travel through the topology as a transition_fired event landing
// in `slice:<sid>:halted`, plus the engine emits net_halted.
expect(events.filter((e) => e.kind === 'net_halted')).toHaveLength(0);
expect(events.filter((e) => e.kind === 'net_deadlocked')).toHaveLength(0);
});

it('surfaces Petrinaut integration failures as warnings without halting the run', async () => {
const fakes = createFakes();
const result = await createOrchestrator('serial').run({
plan: simplePlan,
sandboxDir: '/tmp/fake',
actions: fakes.actions,
reports: fakes.reports,
testRunner: fakes.testRunner,
policy: { maxRetries: 3 },
runId: 'run-warnings',
runDir: join(tmpdir(), 'brunch-missing-run-dir', 'child'),
});

expect(result.status).toBe('completed');
expect(result.warnings).toEqual(
expect.arrayContaining([
expect.stringContaining('Petrinaut net export disabled:'),
expect.stringContaining('Petrinaut event stream disabled:'),
]),
);
});
});

// ---------------------------------------------------------------------------
// Contract test #12 — parallel fires concurrently
// ---------------------------------------------------------------------------
Expand Down
37 changes: 33 additions & 4 deletions src/orchestrator/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { writeFileSync } from 'node:fs';
import { join } from 'node:path';

import { compileTopology, wireHandlers } from './net-compiler.js';
import type { FiringPolicy } from './petri-net.js';
import type { FiringPolicy, NetEventSink } from './petri-net.js';
import { createPetrinautEventStream } from './petrinaut-events.js';
import { serializeBlueprint } from './petrinaut-export.js';
import { toSdcpnFile } from './petrinaut-sdcpn.js';
import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from './types.js';
Expand All @@ -17,13 +18,18 @@ import type { Orchestrator, OrchestratorInput, OrchestratorResult, RunCtx } from
// comes from the halt token itself (`token.haltReason`).
// ---------------------------------------------------------------------------

function errorMessage(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}

export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator {
return {
async run(input: OrchestratorInput): Promise<OrchestratorResult> {
const ctx: RunCtx = {
reportIds: [],
sliceOutcomes: new Map(),
epicOutcomes: new Map(),
warnings: [],
};

let haltReason: string | undefined;
Expand All @@ -44,13 +50,34 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator {
// straight into the Petrinaut editor's file-picker import.
const sdcpn = toSdcpnFile(serialized, {});
writeFileSync(join(input.runDir, 'net.sdcpn.json'), `${JSON.stringify(sdcpn, null, 2)}\n`);
} catch {
} catch (err) {
// Best-effort integration output — don't fail the cook run.
ctx.warnings?.push(`Petrinaut net export disabled: ${errorMessage(err)}`);
}
}

// FE-763: open a Petrinaut event stream when runDir is present.
// Emits an initial_marking event up-front, then transition_fired /
// net_halted / net_deadlocked events as the net runs. Library
// callers without a runDir get the existing no-op behavior.
let eventSink: NetEventSink | undefined;
if (input.runDir) {
try {
const stream = createPetrinautEventStream({
runId: input.runId ?? 'unknown',
filePath: join(input.runDir, 'petrinaut-events.jsonl'),
onError: (message) => ctx.warnings?.push(message),
});
stream.emitInitialMarking(blueprint);
eventSink = stream.sink;
} catch (err) {
// Best-effort integration output — don't fail the cook run.
ctx.warnings?.push(`Petrinaut event stream disabled: ${errorMessage(err)}`);
}
}
Comment thread
kostandinang marked this conversation as resolved.

const net = wireHandlers(blueprint, input, ctx);
await net.run(firingPolicy, () => net.hasHaltToken());
await net.run(firingPolicy, () => net.hasHaltToken(), eventSink);

hasStructuralHalt = net.hasHaltToken();
// Derive halt reason from any halt token deposited during the run.
Expand All @@ -64,7 +91,8 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator {
} catch (err) {
return {
status: 'halted',
reason: err instanceof Error ? err.message : String(err),
reason: errorMessage(err),
warnings: ctx.warnings ?? [],
reports: [...ctx.reportIds],
epics: input.plan.epics.map(
(e) => ctx.epicOutcomes.get(e.id) ?? { epicId: e.id, status: 'halted' as const },
Expand Down Expand Up @@ -98,6 +126,7 @@ export function createOrchestrator(firingPolicy: FiringPolicy): Orchestrator {
return {
status: halted ? 'halted' : 'completed',
reason: haltReason,
warnings: ctx.warnings ?? [],
reports: [...ctx.reportIds],
epics: input.plan.epics.map((e) => ctx.epicOutcomes.get(e.id)!),
slices: input.plan.slices.map((s) => ctx.sliceOutcomes.get(s.id)!),
Expand Down
8 changes: 4 additions & 4 deletions src/orchestrator/src/net-compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
}
return out;
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred);
return [];
};
break;
Expand Down Expand Up @@ -740,7 +740,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
{ place: budgetPlace, token: { ...baseToken, retryCount: retryCount + 1 } },
];
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred);
return [];
};
break;
Expand Down Expand Up @@ -795,7 +795,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
{ place: budgetPlace, token: { ...baseToken, reworkCount: reworkCount + 1 } },
];
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred);
return [];
};
break;
Expand Down Expand Up @@ -877,7 +877,7 @@ export function wireHandlers(blueprint: NetBlueprint, input: OrchestratorInput,
// happens in sibling-passthrough transitions downstream.
return [{ place: intermediatePlace, token: { ...inputToken, reportId } }];
})();
net.scheduleDeferred(skel.id, skel.contract, skel.inputs, deferred);
net.scheduleDeferred(skel.id, skel.contract, { places: skel.inputs, tokens: consumed }, deferred);
return [];
};
break;
Expand Down
31 changes: 25 additions & 6 deletions src/orchestrator/src/petri-net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,24 @@ export type FiringPolicy = 'serial' | 'parallel';
/** Event kinds aligned with spec doc §7. */
export type NetEventKind = 'transition_fired' | 'net_deadlocked' | 'net_halted';

/** Structured event emitted during net execution. */
/**
* Structured event emitted during net execution.
*
* `consumed` / `produced` are place-name lists (one entry per arc).
* `consumedTokens` / `producedTokens` carry the single token that traversed
* each corresponding arc. Brunch currently models one-token-per-arc firing;
* if multi-token arcs become real, this contract should change deliberately
* with tests that exercise that capability.
*/
export type NetEvent = {
kind: NetEventKind;
ts: string;
transitionId?: string;
contract?: TransitionContract;
consumed?: string[];
consumedTokens?: Token[];
produced?: string[];
producedTokens?: Token[];
};

/** Sink for structured net events. Optional — defaults to no-op. */
Expand Down Expand Up @@ -101,6 +111,7 @@ export function placeName(placeId: string): string {
}

type TransitionClaim = { transition: TransitionDef; consumed: Token[] };
export type ConsumedClaim = { places: string[]; tokens: Token[] };

export class PetriNet {
private places = new Map<string, Token[]>();
Expand Down Expand Up @@ -141,12 +152,12 @@ export class PetriNet {
scheduleDeferred(
transitionId: string,
contract: TransitionContract | undefined,
consumedPlaces: string[],
consumed: ConsumedClaim,
work: Promise<{ place: string; token: Token }[]>,
): void {
this.pendingDeferred++;
work
.then((outputs) => this.completeDeferred(transitionId, contract, consumedPlaces, outputs))
.then((outputs) => this.completeDeferred(transitionId, contract, consumed, outputs))
.catch((err) => {
this.deferredError ??= err;
this.pendingDeferred--;
Expand All @@ -157,21 +168,25 @@ export class PetriNet {
private completeDeferred(
transitionId: string,
contract: TransitionContract | undefined,
consumedPlaces: string[],
consumed: ConsumedClaim,
outputs: { place: string; token: Token }[],
): void {
const producedPlaces: string[] = [];
const producedTokens: Token[] = [];
for (const { place, token } of outputs) {
this.addToken(place, token);
producedPlaces.push(place);
producedTokens.push(token);
}
this.deferredEventSink?.emit({
kind: 'transition_fired',
ts: new Date().toISOString(),
transitionId,
contract,
consumed: consumedPlaces,
consumed: consumed.places,
consumedTokens: consumed.tokens,
produced: producedPlaces,
producedTokens,
});
this.pendingDeferred--;
this.wakeOneWaiter();
Expand Down Expand Up @@ -277,14 +292,16 @@ export class PetriNet {
}

private depositClaim(
{ transition, consumed: _consumed }: TransitionClaim,
{ transition, consumed }: TransitionClaim,
outputs: { place: string; token: Token }[],
eventSink?: NetEventSink,
): void {
const producedPlaces: string[] = [];
const producedTokens: Token[] = [];
for (const { place, token } of outputs) {
this.addToken(place, token);
producedPlaces.push(place);
producedTokens.push(token);
}
// Deferred handlers return [] synchronously; their transition_fired
// event is emitted once from completeDeferred when outputs land.
Expand All @@ -295,7 +312,9 @@ export class PetriNet {
transitionId: transition.id,
contract: transition.contract,
consumed: transition.inputs,
consumedTokens: consumed,
produced: producedPlaces,
producedTokens,
Comment thread
cursor[bot] marked this conversation as resolved.
});
}

Expand Down
Loading
Loading