feat(webapp): mollifier API mutations on buffered runs#3756
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🦋 Changeset detectedLatest commit: 63ae447 The changes in this PR will be included in the next version bump. This PR includes changesets to release 32 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
1838229 to
af0aeeb
Compare
b8ead31 to
109fbd7
Compare
af0aeeb to
857bba3
Compare
109fbd7 to
c7a66bd
Compare
857bba3 to
21babc8
Compare
094d006 to
9914976
Compare
0919f7a to
f36c576
Compare
f4b6064 to
0547ba9
Compare
c8ab214 to
047b240
Compare
0547ba9 to
0708ce5
Compare
047b240 to
e57bc5e
Compare
0708ce5 to
396552e
Compare
396552e to
eb2a777
Compare
f4131eb to
d8f6cf7
Compare
e0a57d9 to
b3d188c
Compare
d8f6cf7 to
796a2c0
Compare
b3d188c to
c970692
Compare
796a2c0 to
b139391
Compare
c970692 to
ba084d8
Compare
b139391 to
d153042
Compare
ba084d8 to
eb520ef
Compare
f6d15b1 to
fcd196d
Compare
5a68609 to
dd45a3e
Compare
214bd92 to
c2e1c6e
Compare
dd45a3e to
597cce5
Compare
c2e1c6e to
0f7365d
Compare
597cce5 to
188b8c7
Compare
6fd8529 to
0de4601
Compare
c5f92e8 to
fe633cd
Compare
b0f1a65 to
84d5445
Compare
5fd8611 to
ffeab61
Compare
84d5445 to
f9327b0
Compare
| await applyMetadataMutationToBufferedRun({ | ||
| runId: targetRunId, | ||
| body: { operations }, | ||
| }); | ||
| } |
There was a problem hiding this comment.
🔴 Unhandled error in routeOperationsToRun can crash the request after the primary mutation already succeeded
In the metadata route, after the primary buffered metadata mutation succeeds (line 134), the code fans out parent/root operations via routeOperationsToRun in a Promise.all at lines 168-184. Inside routeOperationsToRun, the applyMetadataMutationToBufferedRun call at line 100 is not wrapped in a try/catch. If Redis is down or throws, the error propagates through Promise.all into the action handler, causing the entire request to return a 500 — even though the caller's own metadata mutation already landed successfully. The comment at lines 88-92 explicitly describes these parent/root operations as "auxiliary" and "best-effort", but the implementation doesn't match that intent.
How to trigger
- A buffered run has parent/root operations in the metadata PUT body.
- The primary metadata mutation succeeds on the buffer.
- The PG service fails for the parent run (e.g., completed run), so
routeOperationsToRunfalls through to the buffer fallback. applyMetadataMutationToBufferedRunthrows (Redis connection error, timeout, etc.).- The entire request fails with 500, and the successful primary mutation result at line 186 is never returned.
| await applyMetadataMutationToBufferedRun({ | |
| runId: targetRunId, | |
| body: { operations }, | |
| }); | |
| } | |
| await applyMetadataMutationToBufferedRun({ | |
| runId: targetRunId, | |
| body: { operations }, | |
| }).catch((err) => { | |
| logger.warn("metadata route: buffer fallback for parent/root op failed", { | |
| targetRunId, | |
| error: err instanceof Error ? err.message : String(err), | |
| }); | |
| }); |
Was this helpful? React with 👍 or 👎 to provide feedback.
| routeOperationsToRun(bufferedEntry.parentTaskRunId, body.parentOperations, env), | ||
| // The PG service routes rootOperations to |
There was a problem hiding this comment.
🚩 Parent metadata routing uses internal ID which won't resolve in the buffer
In routeOperationsToRun at api.v1.runs.$runId.metadata.ts:169, bufferedEntry.parentTaskRunId is documented as an internal CUID (not a friendlyId). When the PG service fails and the code falls back to applyMetadataMutationToBufferedRun at line 100, that function calls buffer.getEntry(input.runId) which indexes by friendlyId. An internal CUID will never match, so getEntry returns null and the function returns { kind: 'not_found' }. This means parent operations can never be routed to a buffered parent via this fallback path. In practice this is likely harmless (parents are typically materialized before children reach the buffer), but it's a silent no-op that could surprise.
Was this helpful? React with 👍 or 👎 to provide feedback.
| redisTest( | ||
| "append_tags rejects with limit_exceeded when maxTags would be exceeded, writing nothing", | ||
| { timeout: 20_000 }, | ||
| async ({ redisContainer }) => { | ||
| const buffer = new MollifierBuffer({ | ||
| redisOptions: { | ||
| host: redisContainer.getHost(), | ||
| port: redisContainer.getPort(), | ||
| password: redisContainer.getPassword(), | ||
| }, | ||
| logger: new Logger("test", "log"), | ||
| }); | ||
| try { | ||
| await buffer.accept({ | ||
| runId: "r_cap", | ||
| envId: "env_m", | ||
| orgId: "org_1", | ||
| payload: serialiseSnapshot({ tags: ["a", "b"] }), | ||
| }); | ||
|
|
||
| // 2 existing + 2 new = 4 deduped > cap of 3 → rejected, nothing written. | ||
| const rejected = await buffer.mutateSnapshot("r_cap", { | ||
| type: "append_tags", | ||
| tags: ["c", "d"], | ||
| maxTags: 3, | ||
| }); | ||
| expect(rejected).toBe("limit_exceeded"); | ||
| const afterReject = await buffer.getEntry("r_cap"); | ||
| const rejPayload = JSON.parse(afterReject!.payload) as { tags: string[] }; | ||
| expect(rejPayload.tags).toEqual(["a", "b"]); | ||
|
|
||
| // Dedup keeps the count under the cap → applied. | ||
| const applied = await buffer.mutateSnapshot("r_cap", { | ||
| type: "append_tags", | ||
| tags: ["a", "c"], | ||
| maxTags: 3, | ||
| }); | ||
| expect(applied).toBe("applied_to_snapshot"); | ||
| const afterApply = await buffer.getEntry("r_cap"); | ||
| const appPayload = JSON.parse(afterApply!.payload) as { tags: string[] }; | ||
| expect(appPayload.tags).toEqual(["a", "b", "c"]); | ||
|
|
||
| // Landing exactly on the cap is allowed. | ||
| const exact = await buffer.mutateSnapshot("r_cap", { | ||
| type: "append_tags", | ||
| tags: ["a", "b", "c"], | ||
| maxTags: 3, | ||
| }); | ||
| expect(exact).toBe("applied_to_snapshot"); | ||
| } finally { | ||
| await buffer.close(); | ||
| } | ||
| }, | ||
| ); | ||
|
|
||
| redisTest( |
There was a problem hiding this comment.
🚩 Test file buffer.test.ts has new test block inserted at wrong nesting level
In packages/redis-worker/src/mollifier/buffer.test.ts, lines 1940-1993 insert a new redisTest(...) block between the callback-closing }, (line 1939) and the ) (line 1995) that closes the previous redisTest call. This effectively nests the new test as an extra argument to the outer redisTest, with a semicolon (; from ); at line 1993) appearing inside the argument list. This is likely a syntax error that would prevent the test file from parsing. Not reporting as a bug per review guidelines (would be caught by compiler/CI), but flagging for reviewer awareness since it may block the test suite.
Was this helpful? React with 👍 or 👎 to provide feedback.
Cancel, replay, reschedule, metadata, tags, and idempotency-key-reset now succeed against a run that's still in the mollifier buffer. Mutations are applied to the buffered snapshot via Lua CAS; the drainer carries the mutation forward when it replays. Stacked on the reads PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- metadata route: drop the \`as unknown as Parameters<...>\` cast on the parent/root operations path. Widen \`routeOperationsToRun\`'s env parameter to \`AuthenticatedEnvironment\` so the service's typed signature carries through; the caller always has the full env in scope. - replay route: validate the buffered fallback against a Zod \`BufferedReplayInputSchema\` covering the fields \`ReplayTaskRunService.call\` actually reads (id, friendlyId, runtimeEnvironmentId, taskIdentifier, payload, payloadType, queue, isTest, traceId, spanId, engine, runTags + nullable concurrencyKey/workerQueue/machinePreset/realtimeStreamsVersion). Schema-fail logs the issue list and 404s rather than passing a half-shaped object into the service. - resetIdempotencyKey: distinguish "PG-empty + buffer-cleared-nothing" (genuine 404) from "PG-empty + buffer-unreachable" (partial outage — 503 with retry hint). The previous behaviour silently returned 404 on outage, hiding the partial failure and leaving a buffered key effectively un-reset. New regression test covers all four branches (PG-hit + buffer-throws, PG-empty + buffer-hit, PG-empty + buffer-clean-miss, PG-empty + buffer-outage, mollifier-disabled). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- metadata route was routing rootOperations to bufferedEntry.parentTaskRunId with a comment claiming PG's nil-coalesce defaults to parent. PG actually defaults to taskRun.id (self), so a buffered grandchild metadata.root.set() was silently mutating the child's metadata instead of the root's. SyntheticRun already carries rootTaskRunFriendlyId from the snapshot — use it, falling back to the run itself (matching PG) when absent. - reschedule route's PG path delegates to RescheduleTaskRunService which enforces `status !== "DELAYED"` and 422s otherwise. The buffer path had no equivalent guard, so a customer could inject delayUntil into the snapshot of an undelayed buffered run and the drainer would materialise it with an unintended delay. Added a pre-fetch through findRunByIdWithMollifierFallback and 422 when the buffered run has no delayUntil. SyntheticRun doesn't carry a "DELAYED" status enum (only QUEUED|FAILED|CANCELED) so the gate reads the snapshot's delayUntil field directly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The wait-and-bounce loop for mutations racing a mid-drain run polled the PG primary on a fixed 20ms cadence with no jitter — up to ~100 reads per request, synchronized across concurrent waiters, piling load onto the writer exactly when mollifier is engaged to shed it. The drainer writes the canonical PG row BEFORE it acks (sets `materialised`) or fails (deletes the entry), so the buffer entry's own state is an authoritative, already-in-Redis signal for "is the row in PG yet?". Watch that (cheap Redis getEntry) instead, and touch the primary exactly once — for the actual mutation — only after it resolves. Poll gaps now use jittered exponential backoff (20ms → 250ms cap). Drops the per-poll PG timeout race (DEFAULT_PG_TIMEOUT_MS / pgTimeoutMs / findRunInPgWithTimeout), unneeded now that PG is read once rather than in a tight loop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove plan-tracking shorthand (Q2/Q3 design, _plans/) from mutations-layer mollifier comments; reword to plain English. Comment-only; no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… metadata fallback errors The tags API skipped MAX_TAGS_PER_RUN enforcement on the buffered path, letting a buffered run exceed the cap the trigger validator applies at creation. Enforce it atomically in the mutateSnapshot Lua: append_tags now accepts an optional maxTags and returns "limit_exceeded" (writing nothing) when the deduped count would overflow. mutateWithFallback gains a symmetric rejectedResponse builder + a "rejected" outcome; the tags route returns 422, matching the PG path. Also stop silently swallowing PG failures in the metadata route's parent/root op fan-out: warn (with targetRunId + error) before the best-effort buffer fallback so a genuine PG outage is observable. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…oute comments Two Phase labels survived the earlier sweep — "Phase A6" in api.v1.runs.\$runId.metadata.ts (the GET-loader-added comment, mirroring the same pattern in api.v1.runs.\$runParam.attempts.ts on reads), and "Phase B4" in api.v1.runs.\$runParam.replay.ts (referring to where SyntheticRun was extended). Rewritten to plain prose; comment-only, no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n comments Two stragglers from the prior sweep: a "(Q5)" tag in the resetIdempotencyKey buffer-side comment, and a "Phase F" reference in the applyMetadataMutation test's regression note. Comment-only; no behavioural change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ffeab61 to
fe21821
Compare
f9327b0 to
63ae447
Compare
Summary
Cancel, replay, reschedule, metadata, tags, and idempotency-key-reset now succeed against a run that's still in the mollifier buffer. Mutations are applied to the buffered snapshot via Lua CAS; the drainer carries the mutation forward when it replays.
Primitives added:
mutateWithFallback— PG-first / buffer-fallback resolver with bounded-wait safety net for entries that transition mid-mutation.applyMetadataMutation— buffered metadata PUT mirroring the PG-side retry loop with CAS atomicity.resolveRunForMutation— discriminated-union resolver used by routefindResourceso the route builder's pre-action 404 check sees buffered runs.Routes wired (whole files, no GET/POST splits):
api.v2.runs.\$runParam.cancel.tsapi.v1.runs.\$runParam.replay.tsapi.v1.runs.\$runParam.reschedule.tsapi.v1.runs.\$runId.metadata.tsapi.v1.runs.\$runId.tags.tsresetIdempotencyKey.server.tsStacked on the reads PR.
Test plan