Fix stream topology storage conflicts#1830
Conversation
Retry Orleans topology state writes after storage version conflicts by refreshing persisted state and reapplying idempotent mutations, preventing benign concurrent updates from surfacing as production errors. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Let projection scope actors own observation relay topology writes through their lifecycle command handling instead of writing the same relay from the activation service first, reducing concurrent stale ETag updates on shared stream topology grains. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Keep the #1828 fix focused on the projection lifecycle ownership bug: activation no longer writes observation relays directly, and stream topology storage keeps its original single-write behavior instead of masking conflicts with retry. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Keep the #1828 PR scoped to the projection activation root-cause fix by restoring stream topology and hosting retry-policy test files to dev. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the stale IProjectionPortSessionLease test fixture dependency so PR #1830 builds against the dev branch projection runtime contract. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Ensure committed-state activation forwards the triggering observation to the projection scope actor after scope activation admission, preserving actor-owned relay attachment without adding retries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Ensure projection scopes do not discard publisher-local committed versions using a scope-level watermark, and keep script definition authority projections active through schema follow-up events. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. @@ Coverage Diff @@
## dev #1830 +/- ##
==========================================
- Coverage 84.10% 84.09% -0.01%
==========================================
Files 1045 1045
Lines 70623 70661 +38
Branches 9119 9124 +5
==========================================
+ Hits 59398 59425 +27
- Misses 7210 7221 +11
Partials 4015 4015
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 3 files with indirect coverage changes 🚀 New features to boost your workflow:
|
Remove a leftover using-only change so the PR only carries behaviorally relevant projection fixes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| var envelope = new EventEnvelope | ||
| { | ||
| Id = Guid.NewGuid().ToString("N"), | ||
| Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), |
There was a problem hiding this comment.
[Consensus: 2 models] severity=major, category=bug
The new 3-arg DispatchAsync builds a synthetic observation envelope (fresh Id = Guid.NewGuid(), no Propagation) and dispatches it directly to the scope actor — but this is not equivalent to the real committed-state publication, and the observation relay is still attached by the scope actor itself (ProjectionScopeGAgentBase.HandleEnsureAsync line 61), so both delivery paths are live. Two verified consequences:
- Silently dropped for propagation-keyed projectors.
GAgentRunTerminalProjector.ProjectAsync(lines 51–56) returns early unlessenvelope.Propagation.CorrelationId == context.CorrelationId. This envelope carries noPropagation, so the bootstrap observation is discarded for run-terminal scopes — the draft-run current-state then materializes only via the eventually-consistent relay, which is exactly why this PR has to switchScopeDraftRunWorkflowRunCurrentStateIntegrationTeststo polling and add it to the polling allowlist. - Double-processed for propagation-agnostic projectors.
ProjectionMaterializationScopeGAgentBase.ProcessObservationCoreAsynchas no version/event-id dedup (it always runs materializers and advances the watermark), andHandleObservedEnvelopeAsyncfilters only by route. So the sameStateEventarriving via both the direct forward and the relay is materialized twice and persists twoProjectionScopeWatermarkAdvancedEvents — redundant work and event-store writes on the very path this PR is meant to de-conflict.
Fix: build the bootstrap envelope with the same propagation/dedup origin as the real publication (copy context.SourceEnvelope?.Propagation), or drop the synthetic second path and rely on the relay delivering the triggering event exactly once. (deepseek separately noted the forwarding also couples this dispatcher to stream-topology internals — StreamForwardingRules / ProjectionScopeActorId / EnvelopeRouteSemantics — and would read better extracted into the activation hook or a dedicated forwarding port.)
Per-model verbatim
- codex: This synthetic envelope drops the propagation from
context.SourceEnvelopeand gives the same committed state event a fresh envelope id.GAgentRunTerminalPlansderives the scopeSessionIdfromcontext.SourceEnvelope.Propagation.CorrelationId, whileGAgentRunTerminalProjectorlater requiresenvelope.Propagation.CorrelationIdto match that session, so the bootstrap observation can be skipped; if the normal relay also forwards the real publish, the different envelope id also defeats runtime dedup and can process the sameStateEventtwice. - kimi: Direct-dispatching the triggering
CommittedStateEventPublishedto the scope actor duplicates delivery whenever the scope is already active: the existing observation relay (still upserted by the actor inHandleEnsureAsync) will forward the same event via the stream. The scope actor does not dedupe by(sourceActorId, eventId), so the same committed event will be processed twice. Either gate the direct forward to scopes actually created by this activation, or add idempotent deduplication insideHandleObservedEnvelopeAsync.
| public ProjectionActivationPlanDispatcher(IServiceProvider services) | ||
| { | ||
| _services = services ?? throw new ArgumentNullException(nameof(services)); | ||
| _dispatchPort = services.GetService<IActorDispatchPort>(); |
There was a problem hiding this comment.
[Consensus: 2 models] severity=minor, category=di
_dispatchPort = services.GetService<IActorDispatchPort>() resolves a dependency from the container inside the constructor — a service-locator pattern (CLAUDE.md: 不用 Service Locator 隐藏依赖——依赖一律构造注入、显式可见). Prefer an explicit constructor parameter so the dependency is visible and lifetime-checked. The likely motive is optional resolution: a non-nullable ctor param would make every TryAddSingleton<ProjectionActivationPlanDispatcher>() site throw when IActorDispatchPort isn't registered. If optionality is intended, register via a factory that keeps the dependency explicit: TryAddSingleton(sp => new ProjectionActivationPlanDispatcher(sp, sp.GetService<IActorDispatchPort>())).
Per-model verbatim
- kimi:
services.GetService<IActorDispatchPort>()deepens the service-locator pattern in a singleton. Prefer addingIActorDispatchPort? dispatchPort = nullas an explicit constructor parameter so the dependency is visible and lifetime-checked by the container. - mimo: Resolving it via
services.GetService<IActorDispatchPort>()here is a service-locator anti-pattern that CLAUDE.md prohibits. Consider addingIActorDispatchPort?as a constructor parameter and updating theTryAddSingletonregistrations to a factory.
| @@ -54,7 +54,6 @@ public async Task<TLease> EnsureAsync( | |||
| request.SessionId); | |||
|
|
|||
| await _scopeRuntime.EnsureExistsAsync(scopeKey, ct).ConfigureAwait(false); | |||
There was a problem hiding this comment.
[kimi] severity=minor, category=arch
Removing the _scopeRuntime.EnsureObservationRelayAsync(scopeKey, ct) call (the deleted line just below) leaves ProjectionScopeActorRuntime<TScopeAgent>.EnsureObservationRelayAsync — and its only consumer, the _streams field plus the IStreamProvider? streams ctor param — with no callers. Per 删除优先, delete the dead method/field/param and drop the now-unused stream-provider argument at the new ProjectionScopeActorRuntime<TScopeAgent>(...) construction site, rather than leaving a dead relay path. (_scopeRuntime is a concrete type, not an interface, so this is an internal cleanup with no contract impact.)
| { | ||
| Id = Guid.NewGuid().ToString("N"), | ||
| Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), | ||
| Payload = Any.Pack(context.Published.Clone()), |
There was a problem hiding this comment.
[v4-pro] severity=minor, category=perf
Any.Pack(context.Published.Clone()) — Any.Pack already serializes the message into the Any's byte buffer, so the .Clone() deep-copy is a redundant throwaway allocation on every committed-state publication (a hot path). Use Any.Pack(context.Published). (If the envelope is reworked per the major comment on line 64, this line goes away anyway.)
Summary
Root cause
The version conflict was caused by two lifecycle paths writing the same stream-topology grain for hot roots like
agent-registry-store:ProjectionScopeActivationService.EnsureAsync()created the projection scope and directly upserted the observation relay.EnsureProjectionScopeCommandand upserted the same relay again as part of its own lifecycle.This violated the actor/lifecycle ownership boundary and increased stale ETag writes against Orleans/Garnet storage. The fix makes activation only ensure the actor and dispatch the command; the actor owns relay attachment.
Test plan
dotnet test test/Aevatar.CQRS.Projection.Core.Tests/Aevatar.CQRS.Projection.Core.Tests.csproj --nologo --no-restoredotnet test test/Aevatar.Foundation.Runtime.Hosting.Tests/Aevatar.Foundation.Runtime.Hosting.Tests.csproj --nologo --no-restorebash tools/ci/test_stability_guards.shbash tools/ci/architecture_guards.shgit diff --check -- src/Aevatar.Foundation.Runtime.Implementations.Orleans.Streaming/Streaming/Topology/StreamTopologyGrain.cs test/Aevatar.Foundation.Runtime.Hosting.Tests/OrleansDistributedCoverageTests.cs src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeActivationService.cs test/Aevatar.CQRS.Projection.Core.Tests/ProjectionRuntimeRegistrationTests.cs🤖 Generated with Claude Code