Skip to content

Fix stream topology storage conflicts#1830

Open
louis4li wants to merge 8 commits into
devfrom
fix/2026-06-05_stream-topology-storage-conflict
Open

Fix stream topology storage conflicts#1830
louis4li wants to merge 8 commits into
devfrom
fix/2026-06-05_stream-topology-storage-conflict

Conversation

@louis4li
Copy link
Copy Markdown
Contributor

@louis4li louis4li commented Jun 5, 2026

Summary

Root cause

The version conflict was caused by two lifecycle paths writing the same stream-topology grain for hot roots like agent-registry-store:

  1. ProjectionScopeActivationService.EnsureAsync() created the projection scope and directly upserted the observation relay.
  2. The projection scope actor then handled EnsureProjectionScopeCommand and upserted the same relay again as part of its own lifecycle.

This violated the actor/lifecycle ownership boundary and increased stale ETag writes against Orleans/Garnet storage. The fix makes activation only ensure the actor and dispatch the command; the actor owns relay attachment.

Test plan

  • dotnet test test/Aevatar.CQRS.Projection.Core.Tests/Aevatar.CQRS.Projection.Core.Tests.csproj --nologo --no-restore
  • dotnet test test/Aevatar.Foundation.Runtime.Hosting.Tests/Aevatar.Foundation.Runtime.Hosting.Tests.csproj --nologo --no-restore
  • bash tools/ci/test_stability_guards.sh
  • bash tools/ci/architecture_guards.sh
  • git diff --check -- src/Aevatar.Foundation.Runtime.Implementations.Orleans.Streaming/Streaming/Topology/StreamTopologyGrain.cs test/Aevatar.Foundation.Runtime.Hosting.Tests/OrleansDistributedCoverageTests.cs src/Aevatar.CQRS.Projection.Core/Orchestration/ProjectionScopeActivationService.cs test/Aevatar.CQRS.Projection.Core.Tests/ProjectionRuntimeRegistrationTests.cs

🤖 Generated with Claude Code

Retry Orleans topology state writes after storage version conflicts by refreshing persisted state and reapplying idempotent mutations, preventing benign concurrent updates from surfacing as production errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@louis4li louis4li requested a review from jason-aelf as a code owner June 5, 2026 08:59
louis4li and others added 6 commits June 5, 2026 17:10
Let projection scope actors own observation relay topology writes through their lifecycle command handling instead of writing the same relay from the activation service first, reducing concurrent stale ETag updates on shared stream topology grains.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Keep the #1828 fix focused on the projection lifecycle ownership bug: activation no longer writes observation relays directly, and stream topology storage keeps its original single-write behavior instead of masking conflicts with retry.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Keep the #1828 PR scoped to the projection activation root-cause fix by restoring stream topology and hosting retry-policy test files to dev.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the stale IProjectionPortSessionLease test fixture dependency so PR #1830 builds against the dev branch projection runtime contract.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Ensure committed-state activation forwards the triggering observation to the projection scope actor after scope activation admission, preserving actor-owned relay attachment without adding retries.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Ensure projection scopes do not discard publisher-local committed versions using a scope-level watermark, and keep script definition authority projections active through schema follow-up events.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@louis4li louis4li requested a review from eanzhao as a code owner June 5, 2026 13:38
@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 5, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 84.09%. Comparing base (43f1d15) to head (ae7ab75).

@@            Coverage Diff             @@
##              dev    #1830      +/-   ##
==========================================
- Coverage   84.10%   84.09%   -0.01%     
==========================================
  Files        1045     1045              
  Lines       70623    70661      +38     
  Branches     9119     9124       +5     
==========================================
+ Hits        59398    59425      +27     
- Misses       7210     7221      +11     
  Partials     4015     4015              
Flag Coverage Δ
ci 84.09% <100.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...stration/CommittedStateProjectionActivationHook.cs 85.45% <100.00%> (ø)
...rchestration/ProjectionActivationPlanDispatcher.cs 96.15% <100.00%> (+4.48%) ⬆️
.../Orchestration/ProjectionScopeActivationService.cs 95.23% <ø> (-0.12%) ⬇️
...on.Core/Orchestration/ProjectionScopeGAgentBase.cs 77.77% <ø> (-0.66%) ⬇️
...gCommittedStateProjectionActivationPlanProvider.cs 100.00% <100.00%> (ø)

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Remove a leftover using-only change so the PR only carries behaviorally relevant projection fixes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
var envelope = new EventEnvelope
{
Id = Guid.NewGuid().ToString("N"),
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Consensus: 2 models] severity=major, category=bug

The new 3-arg DispatchAsync builds a synthetic observation envelope (fresh Id = Guid.NewGuid(), no Propagation) and dispatches it directly to the scope actor — but this is not equivalent to the real committed-state publication, and the observation relay is still attached by the scope actor itself (ProjectionScopeGAgentBase.HandleEnsureAsync line 61), so both delivery paths are live. Two verified consequences:

  1. Silently dropped for propagation-keyed projectors. GAgentRunTerminalProjector.ProjectAsync (lines 51–56) returns early unless envelope.Propagation.CorrelationId == context.CorrelationId. This envelope carries no Propagation, so the bootstrap observation is discarded for run-terminal scopes — the draft-run current-state then materializes only via the eventually-consistent relay, which is exactly why this PR has to switch ScopeDraftRunWorkflowRunCurrentStateIntegrationTests to polling and add it to the polling allowlist.
  2. Double-processed for propagation-agnostic projectors. ProjectionMaterializationScopeGAgentBase.ProcessObservationCoreAsync has no version/event-id dedup (it always runs materializers and advances the watermark), and HandleObservedEnvelopeAsync filters only by route. So the same StateEvent arriving via both the direct forward and the relay is materialized twice and persists two ProjectionScopeWatermarkAdvancedEvents — redundant work and event-store writes on the very path this PR is meant to de-conflict.

Fix: build the bootstrap envelope with the same propagation/dedup origin as the real publication (copy context.SourceEnvelope?.Propagation), or drop the synthetic second path and rely on the relay delivering the triggering event exactly once. (deepseek separately noted the forwarding also couples this dispatcher to stream-topology internals — StreamForwardingRules / ProjectionScopeActorId / EnvelopeRouteSemantics — and would read better extracted into the activation hook or a dedicated forwarding port.)

Per-model verbatim
  • codex: This synthetic envelope drops the propagation from context.SourceEnvelope and gives the same committed state event a fresh envelope id. GAgentRunTerminalPlans derives the scope SessionId from context.SourceEnvelope.Propagation.CorrelationId, while GAgentRunTerminalProjector later requires envelope.Propagation.CorrelationId to match that session, so the bootstrap observation can be skipped; if the normal relay also forwards the real publish, the different envelope id also defeats runtime dedup and can process the same StateEvent twice.
  • kimi: Direct-dispatching the triggering CommittedStateEventPublished to the scope actor duplicates delivery whenever the scope is already active: the existing observation relay (still upserted by the actor in HandleEnsureAsync) will forward the same event via the stream. The scope actor does not dedupe by (sourceActorId, eventId), so the same committed event will be processed twice. Either gate the direct forward to scopes actually created by this activation, or add idempotent deduplication inside HandleObservedEnvelopeAsync.

public ProjectionActivationPlanDispatcher(IServiceProvider services)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
_dispatchPort = services.GetService<IActorDispatchPort>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Consensus: 2 models] severity=minor, category=di

_dispatchPort = services.GetService<IActorDispatchPort>() resolves a dependency from the container inside the constructor — a service-locator pattern (CLAUDE.md: 不用 Service Locator 隐藏依赖——依赖一律构造注入、显式可见). Prefer an explicit constructor parameter so the dependency is visible and lifetime-checked. The likely motive is optional resolution: a non-nullable ctor param would make every TryAddSingleton<ProjectionActivationPlanDispatcher>() site throw when IActorDispatchPort isn't registered. If optionality is intended, register via a factory that keeps the dependency explicit: TryAddSingleton(sp => new ProjectionActivationPlanDispatcher(sp, sp.GetService<IActorDispatchPort>())).

Per-model verbatim
  • kimi: services.GetService<IActorDispatchPort>() deepens the service-locator pattern in a singleton. Prefer adding IActorDispatchPort? dispatchPort = null as an explicit constructor parameter so the dependency is visible and lifetime-checked by the container.
  • mimo: Resolving it via services.GetService<IActorDispatchPort>() here is a service-locator anti-pattern that CLAUDE.md prohibits. Consider adding IActorDispatchPort? as a constructor parameter and updating the TryAddSingleton registrations to a factory.

@@ -54,7 +54,6 @@ public async Task<TLease> EnsureAsync(
request.SessionId);

await _scopeRuntime.EnsureExistsAsync(scopeKey, ct).ConfigureAwait(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[kimi] severity=minor, category=arch

Removing the _scopeRuntime.EnsureObservationRelayAsync(scopeKey, ct) call (the deleted line just below) leaves ProjectionScopeActorRuntime<TScopeAgent>.EnsureObservationRelayAsync — and its only consumer, the _streams field plus the IStreamProvider? streams ctor param — with no callers. Per 删除优先, delete the dead method/field/param and drop the now-unused stream-provider argument at the new ProjectionScopeActorRuntime<TScopeAgent>(...) construction site, rather than leaving a dead relay path. (_scopeRuntime is a concrete type, not an interface, so this is an internal cleanup with no contract impact.)

{
Id = Guid.NewGuid().ToString("N"),
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
Payload = Any.Pack(context.Published.Clone()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[v4-pro] severity=minor, category=perf

Any.Pack(context.Published.Clone())Any.Pack already serializes the message into the Any's byte buffer, so the .Clone() deep-copy is a redundant throwaway allocation on every committed-state publication (a hot path). Use Any.Pack(context.Published). (If the envelope is reworked per the major comment on line 64, this line goes away anyway.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[prod2][aevatar-console-backend] Orleans storage version conflict during WriteStateAsync: streamtopology/agent-registry-store

2 participants