Skip to content

feat(webapp): mollifier API mutations on buffered runs#3756

Open
d-cs wants to merge 9 commits into
mollifier-phase-3-readsfrom
mollifier-phase-3-mutations
Open

feat(webapp): mollifier API mutations on buffered runs#3756
d-cs wants to merge 9 commits into
mollifier-phase-3-readsfrom
mollifier-phase-3-mutations

Conversation

@d-cs
Copy link
Copy Markdown
Collaborator

@d-cs d-cs commented May 26, 2026

Summary

Cancel, replay, reschedule, metadata, tags, and idempotency-key-reset now succeed against a run that's still in the mollifier buffer. Mutations are applied to the buffered snapshot via Lua CAS; the drainer carries the mutation forward when it replays.

Primitives added:

  • mutateWithFallback — PG-first / buffer-fallback resolver with bounded-wait safety net for entries that transition mid-mutation.
  • applyMetadataMutation — buffered metadata PUT mirroring the PG-side retry loop with CAS atomicity.
  • resolveRunForMutation — discriminated-union resolver used by route findResource so the route builder's pre-action 404 check sees buffered runs.

Routes wired (whole files, no GET/POST splits):

  • api.v2.runs.\$runParam.cancel.ts
  • api.v1.runs.\$runParam.replay.ts
  • api.v1.runs.\$runParam.reschedule.ts
  • api.v1.runs.\$runId.metadata.ts
  • api.v1.runs.\$runId.tags.ts
  • resetIdempotencyKey.server.ts

Stacked on the reads PR.

Test plan

  • `pnpm run typecheck --filter webapp` passes
  • `pnpm run test --filter webapp test/mollifierMutateWithFallback.test.ts` passes
  • `pnpm run test --filter webapp test/mollifierApplyMetadataMutation.test.ts` passes
  • `pnpm run test --filter webapp test/mollifierResolveRunForMutation.test.ts` passes

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 26, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: bfa5943e-9eaf-4d88-bef6-e4d163e9ad3a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch mollifier-phase-3-mutations

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 26, 2026

🦋 Changeset detected

Latest commit: 63ae447

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 32 packages
Name Type
@trigger.dev/redis-worker Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@trigger.dev/build Patch
@trigger.dev/core Patch
@trigger.dev/plugins Patch
@trigger.dev/python Patch
@trigger.dev/react-hooks Patch
@trigger.dev/rsc Patch
@trigger.dev/schema-to-json Patch
@trigger.dev/sdk Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
@trigger.dev/rbac Patch
trigger.dev Patch
references-ai-chat Patch
d3-chat Patch
references-d3-openai-agents Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/llm-model-catalog Patch
@internal/redis Patch
@internal/replication Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/tsql Patch
@internal/zod-worker Patch
references-nextjs-realtime Patch
references-realtime-hooks-test Patch
references-realtime-streams Patch
@internal/sdk-compat-tests Patch
references-telemetry Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Comment thread apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Comment thread apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts
Comment thread apps/webapp/app/v3/services/resetIdempotencyKey.server.ts
@d-cs d-cs self-assigned this May 26, 2026
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 1838229 to af0aeeb Compare May 26, 2026 11:12
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from b8ead31 to 109fbd7 Compare May 26, 2026 11:12
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from af0aeeb to 857bba3 Compare May 26, 2026 13:24
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from 109fbd7 to c7a66bd Compare May 26, 2026 13:24
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 857bba3 to 21babc8 Compare May 26, 2026 14:44
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch 2 times, most recently from 094d006 to 9914976 Compare May 26, 2026 15:00
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 0919f7a to f36c576 Compare May 26, 2026 15:12
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch 2 times, most recently from f4b6064 to 0547ba9 Compare May 26, 2026 16:00
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from c8ab214 to 047b240 Compare May 26, 2026 16:20
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from 0547ba9 to 0708ce5 Compare May 26, 2026 16:20
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 047b240 to e57bc5e Compare May 26, 2026 16:32
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from 0708ce5 to 396552e Compare May 26, 2026 16:32
devin-ai-integration[bot]

This comment was marked as resolved.

@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from 396552e to eb2a777 Compare May 26, 2026 16:57
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from f4131eb to d8f6cf7 Compare May 27, 2026 12:04
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from e0a57d9 to b3d188c Compare May 27, 2026 12:04
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from d8f6cf7 to 796a2c0 Compare May 27, 2026 12:15
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from b3d188c to c970692 Compare May 27, 2026 12:15
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 796a2c0 to b139391 Compare May 27, 2026 12:21
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from c970692 to ba084d8 Compare May 27, 2026 12:21
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from b139391 to d153042 Compare May 27, 2026 12:58
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from ba084d8 to eb520ef Compare May 27, 2026 12:58
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from f6d15b1 to fcd196d Compare May 27, 2026 16:58
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch 2 times, most recently from 5a68609 to dd45a3e Compare May 28, 2026 08:49
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from 214bd92 to c2e1c6e Compare May 28, 2026 08:49
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from dd45a3e to 597cce5 Compare May 28, 2026 09:41
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from c2e1c6e to 0f7365d Compare May 28, 2026 09:41
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 597cce5 to 188b8c7 Compare May 28, 2026 09:48
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch 3 times, most recently from 6fd8529 to 0de4601 Compare May 28, 2026 10:44
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from c5f92e8 to fe633cd Compare May 28, 2026 11:08
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch 4 times, most recently from b0f1a65 to 84d5445 Compare May 28, 2026 11:58
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from 5fd8611 to ffeab61 Compare May 28, 2026 13:04
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from 84d5445 to f9327b0 Compare May 28, 2026 13:04
@d-cs d-cs marked this pull request as ready for review May 28, 2026 13:08
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 new potential issues.

View 8 additional findings in Devin Review.

Open in Devin Review

Comment on lines +99 to +103
await applyMetadataMutationToBufferedRun({
runId: targetRunId,
body: { operations },
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Unhandled error in routeOperationsToRun can crash the request after the primary mutation already succeeded

In the metadata route, after the primary buffered metadata mutation succeeds (line 134), the code fans out parent/root operations via routeOperationsToRun in a Promise.all at lines 168-184. Inside routeOperationsToRun, the applyMetadataMutationToBufferedRun call at line 100 is not wrapped in a try/catch. If Redis is down or throws, the error propagates through Promise.all into the action handler, causing the entire request to return a 500 — even though the caller's own metadata mutation already landed successfully. The comment at lines 88-92 explicitly describes these parent/root operations as "auxiliary" and "best-effort", but the implementation doesn't match that intent.

How to trigger
  1. A buffered run has parent/root operations in the metadata PUT body.
  2. The primary metadata mutation succeeds on the buffer.
  3. The PG service fails for the parent run (e.g., completed run), so routeOperationsToRun falls through to the buffer fallback.
  4. applyMetadataMutationToBufferedRun throws (Redis connection error, timeout, etc.).
  5. The entire request fails with 500, and the successful primary mutation result at line 186 is never returned.
Suggested change
await applyMetadataMutationToBufferedRun({
runId: targetRunId,
body: { operations },
});
}
await applyMetadataMutationToBufferedRun({
runId: targetRunId,
body: { operations },
}).catch((err) => {
logger.warn("metadata route: buffer fallback for parent/root op failed", {
targetRunId,
error: err instanceof Error ? err.message : String(err),
});
});
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +169 to +170
routeOperationsToRun(bufferedEntry.parentTaskRunId, body.parentOperations, env),
// The PG service routes rootOperations to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Parent metadata routing uses internal ID which won't resolve in the buffer

In routeOperationsToRun at api.v1.runs.$runId.metadata.ts:169, bufferedEntry.parentTaskRunId is documented as an internal CUID (not a friendlyId). When the PG service fails and the code falls back to applyMetadataMutationToBufferedRun at line 100, that function calls buffer.getEntry(input.runId) which indexes by friendlyId. An internal CUID will never match, so getEntry returns null and the function returns { kind: 'not_found' }. This means parent operations can never be routed to a buffered parent via this fallback path. In practice this is likely harmless (parents are typically materialized before children reach the buffer), but it's a silent no-op that could surprise.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +1940 to 1995
redisTest(
"append_tags rejects with limit_exceeded when maxTags would be exceeded, writing nothing",
{ timeout: 20_000 },
async ({ redisContainer }) => {
const buffer = new MollifierBuffer({
redisOptions: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
},
logger: new Logger("test", "log"),
});
try {
await buffer.accept({
runId: "r_cap",
envId: "env_m",
orgId: "org_1",
payload: serialiseSnapshot({ tags: ["a", "b"] }),
});

// 2 existing + 2 new = 4 deduped > cap of 3 → rejected, nothing written.
const rejected = await buffer.mutateSnapshot("r_cap", {
type: "append_tags",
tags: ["c", "d"],
maxTags: 3,
});
expect(rejected).toBe("limit_exceeded");
const afterReject = await buffer.getEntry("r_cap");
const rejPayload = JSON.parse(afterReject!.payload) as { tags: string[] };
expect(rejPayload.tags).toEqual(["a", "b"]);

// Dedup keeps the count under the cap → applied.
const applied = await buffer.mutateSnapshot("r_cap", {
type: "append_tags",
tags: ["a", "c"],
maxTags: 3,
});
expect(applied).toBe("applied_to_snapshot");
const afterApply = await buffer.getEntry("r_cap");
const appPayload = JSON.parse(afterApply!.payload) as { tags: string[] };
expect(appPayload.tags).toEqual(["a", "b", "c"]);

// Landing exactly on the cap is allowed.
const exact = await buffer.mutateSnapshot("r_cap", {
type: "append_tags",
tags: ["a", "b", "c"],
maxTags: 3,
});
expect(exact).toBe("applied_to_snapshot");
} finally {
await buffer.close();
}
},
);

redisTest(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Test file buffer.test.ts has new test block inserted at wrong nesting level

In packages/redis-worker/src/mollifier/buffer.test.ts, lines 1940-1993 insert a new redisTest(...) block between the callback-closing }, (line 1939) and the ) (line 1995) that closes the previous redisTest call. This effectively nests the new test as an extra argument to the outer redisTest, with a semicolon (; from ); at line 1993) appearing inside the argument list. This is likely a syntax error that would prevent the test file from parsing. Not reporting as a bug per review guidelines (would be caught by compiler/CI), but flagging for reviewer awareness since it may block the test suite.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

d-cs and others added 9 commits May 28, 2026 14:18
Cancel, replay, reschedule, metadata, tags, and idempotency-key-reset
now succeed against a run that's still in the mollifier buffer.
Mutations are applied to the buffered snapshot via Lua CAS; the
drainer carries the mutation forward when it replays.

Stacked on the reads PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- metadata route: drop the \`as unknown as Parameters<...>\` cast on
  the parent/root operations path. Widen \`routeOperationsToRun\`'s env
  parameter to \`AuthenticatedEnvironment\` so the service's typed
  signature carries through; the caller always has the full env in
  scope.
- replay route: validate the buffered fallback against a Zod
  \`BufferedReplayInputSchema\` covering the fields
  \`ReplayTaskRunService.call\` actually reads (id, friendlyId,
  runtimeEnvironmentId, taskIdentifier, payload, payloadType, queue,
  isTest, traceId, spanId, engine, runTags + nullable
  concurrencyKey/workerQueue/machinePreset/realtimeStreamsVersion).
  Schema-fail logs the issue list and 404s rather than passing a
  half-shaped object into the service.
- resetIdempotencyKey: distinguish "PG-empty + buffer-cleared-nothing"
  (genuine 404) from "PG-empty + buffer-unreachable" (partial outage —
  503 with retry hint). The previous behaviour silently returned 404
  on outage, hiding the partial failure and leaving a buffered key
  effectively un-reset. New regression test covers all four branches
  (PG-hit + buffer-throws, PG-empty + buffer-hit, PG-empty +
  buffer-clean-miss, PG-empty + buffer-outage, mollifier-disabled).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- metadata route was routing rootOperations to bufferedEntry.parentTaskRunId
  with a comment claiming PG's nil-coalesce defaults to parent. PG actually
  defaults to taskRun.id (self), so a buffered grandchild metadata.root.set()
  was silently mutating the child's metadata instead of the root's.
  SyntheticRun already carries rootTaskRunFriendlyId from the snapshot —
  use it, falling back to the run itself (matching PG) when absent.

- reschedule route's PG path delegates to RescheduleTaskRunService which
  enforces `status !== "DELAYED"` and 422s otherwise. The buffer path had
  no equivalent guard, so a customer could inject delayUntil into the
  snapshot of an undelayed buffered run and the drainer would materialise
  it with an unintended delay. Added a pre-fetch through
  findRunByIdWithMollifierFallback and 422 when the buffered run has no
  delayUntil. SyntheticRun doesn't carry a "DELAYED" status enum
  (only QUEUED|FAILED|CANCELED) so the gate reads the snapshot's
  delayUntil field directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The wait-and-bounce loop for mutations racing a mid-drain run polled the
PG primary on a fixed 20ms cadence with no jitter — up to ~100 reads per
request, synchronized across concurrent waiters, piling load onto the
writer exactly when mollifier is engaged to shed it.

The drainer writes the canonical PG row BEFORE it acks (sets
`materialised`) or fails (deletes the entry), so the buffer entry's own
state is an authoritative, already-in-Redis signal for "is the row in PG
yet?". Watch that (cheap Redis getEntry) instead, and touch the primary
exactly once — for the actual mutation — only after it resolves. Poll
gaps now use jittered exponential backoff (20ms → 250ms cap).

Drops the per-poll PG timeout race (DEFAULT_PG_TIMEOUT_MS / pgTimeoutMs /
findRunInPgWithTimeout), unneeded now that PG is read once rather than in
a tight loop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove plan-tracking shorthand (Q2/Q3 design, _plans/) from mutations-layer mollifier comments; reword to plain English. Comment-only; no behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… metadata fallback errors

The tags API skipped MAX_TAGS_PER_RUN enforcement on the buffered path,
letting a buffered run exceed the cap the trigger validator applies at
creation. Enforce it atomically in the mutateSnapshot Lua: append_tags
now accepts an optional maxTags and returns "limit_exceeded" (writing
nothing) when the deduped count would overflow. mutateWithFallback gains
a symmetric rejectedResponse builder + a "rejected" outcome; the tags
route returns 422, matching the PG path.

Also stop silently swallowing PG failures in the metadata route's
parent/root op fan-out: warn (with targetRunId + error) before the
best-effort buffer fallback so a genuine PG outage is observable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…oute comments

Two Phase labels survived the earlier sweep — "Phase A6" in
api.v1.runs.\$runId.metadata.ts (the GET-loader-added comment, mirroring
the same pattern in api.v1.runs.\$runParam.attempts.ts on reads), and
"Phase B4" in api.v1.runs.\$runParam.replay.ts (referring to where
SyntheticRun was extended). Rewritten to plain prose; comment-only, no
behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n comments

Two stragglers from the prior sweep: a "(Q5)" tag in the
resetIdempotencyKey buffer-side comment, and a "Phase F" reference in
the applyMetadataMutation test's regression note. Comment-only; no
behavioural change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@d-cs d-cs force-pushed the mollifier-phase-3-reads branch from ffeab61 to fe21821 Compare May 28, 2026 13:18
@d-cs d-cs force-pushed the mollifier-phase-3-mutations branch from f9327b0 to 63ae447 Compare May 28, 2026 13:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant