Skip to content

CEP-45: Incremental repair for mutation tracking#4696

Merged
aweisberg merged 1 commit intoapache:cep-45-mutation-trackingfrom
aweisberg:21098
Apr 14, 2026
Merged

CEP-45: Incremental repair for mutation tracking#4696
aweisberg merged 1 commit intoapache:cep-45-mutation-trackingfrom
aweisberg:21098

Conversation

@aweisberg
Copy link
Copy Markdown
Contributor

No description provided.

for (Shard shard : overlappingShards)
{
ShardSyncState state = new ShardSyncState(shard, liveHostIds);
shardStates.put(shard.range, state);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to reason about the thread safety of shardStates here...

The assignment is clearly visible after the CAS above, but are the iterations inside the callbacks later guaranteed to see the results of the put()s here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Register sync coordinator is a write barrier because it does a put in a ConcurrentHashMap? So any prior writes will be visible? As long shardStates is effectively immutable after that particular map should be OK.

This could be an ImmutableMap which might make it a little clearer so I'll make that change.

finally
{
if (!allSucceeded)
syncCoordinator.cancel();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the try block above produces InterruptedException? Do we need to cancel the rest of the sync coordinators (that hadn't been processed yet)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clean them up just so they don't have to wait for their timeout to elapse to clean up. I'll rework the exception handling here to catch Exception instead of RuntimeException.

CLUSTER.get(1).nodetoolResult("repair", specification.keyspaceName()).asserts().success();
// Background reconciliation doesn't exist/work so incremental repair just hangs waiting for reconciliation that never occurs
if (specification.replicationType.isTracked())
CLUSTER.get(1).nodetoolResult("repair", "-full", specification.keyspaceName()).asserts().success();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should an incremental repair request succeed after a successful full repair? It tried this, and it appears to hang, but I'm not sure why yet...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"node1_Repair#4:1" #270 daemon prio=5 os_prio=31 cpu=0.08ms elapsed=92.27s tid=0x000000013de75200 nid=0x2530b waiting on condition  [0x000000036944a000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.19/Native Method)
	at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.19/LockSupport.java:357)
	at org.apache.cassandra.utils.concurrent.AsyncFuture.awaitUntil(AsyncFuture.java:221)
	at org.apache.cassandra.utils.concurrent.Awaitable$Defaults.await(Awaitable.java:114)
	at org.apache.cassandra.utils.concurrent.AbstractFuture.await(AbstractFuture.java:482)
	at org.apache.cassandra.utils.concurrent.AbstractFuture.get(AbstractFuture.java:252)
	at org.apache.cassandra.replication.MutationTrackingSyncCoordinator.awaitCompletion(MutationTrackingSyncCoordinator.java:351)
	at org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask.waitForSyncCompletion(MutationTrackingIncrementalRepairTask.java:127)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I think the lack of background reconciliation still means that this won't work. The transfer IDs are only there to make sure read reconciliation works.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood the test. I don't think IR should hang in this test because we aren't relying on background reconciliation. There aren't any down nodes at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right now I remember. So the test inserts data using executeInternal which gives the mutation and id and applys it locally correclty, but because it's only applied locally it never propagates because there is no background reconciliation.

Mutations applied via execute/StorageProxy are given to ActiveLogReconciler which is basically in-memory hinted handoff for mutation tracking.

So this is working as intended for now in that we need to use full repair here instead of IR since IR can't complete until background reconciliation is done.

public int hashCode()
{
return Objects.hash(desc, offsetsByShard);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we ever actually put MutationTrackingSyncResponse in a collection?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove hashCode and equals. I ran the tests and I don't think they get used anymore.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed for RepairMessageSerializationsTest

{
logger.warn("Mutation tracking sync failed for keyspace {}", keyspace, error);
resultPromise.tryFailure(error);
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test that allows the timeout to elapse.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the test that is supposed to test timeouts blocks all verbs so you it times out on the prepare not doing the actual sync. I'll fix that test.

catch (RuntimeException e)
{
allSucceeded = false;
error = Throwables.merge(error, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll convert timeouts to exceptions so it can be exercised.

catch (Exception e)
{
logger.error("Error during mutation tracking repair", e);
resultPromise.tryFailure(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errors are put in resultPromise and don't get surfaced by allowing them to bubble up. To make this branch fire I could let exceptions bubble up and then get handled here. Would be less exception handling in general and then it would show up as tested.

I'll do that.

if (allRanges.isEmpty())
{
logger.info("No common ranges to repair for keyspace {}", keyspace);
return new AsyncPromise<CoordinatedRepairResult>().setSuccess(CoordinatedRepairResult.create(List.of(), List.of()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

if (overlappingShards.isEmpty())
{
completionFuture.setSuccess(null);
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to a checkState

public void onFailure(InetAddressAndPort from, RequestFailure failure)
{
fail(new RuntimeException(
String.format("Mutation tracking sync failed: participant %s returned failure %s", from, failure.reason)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

Copy link
Copy Markdown
Contributor Author

@aweisberg aweisberg Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case that sends an exception through here

Shard currentShard = getCurrentShard(state.shard.range);
if (currentShard != state.shard)
{
failWithTopologyChange();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Coverage tooling indicates this might not be tested.

Copy link
Copy Markdown
Contributor Author

@aweisberg aweisberg Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topology changes aren't supported yet https://issues.apache.org/jira/browse/CASSANDRA-20386

I'll take a look and see if I can at least induce one to exercise this failure path.

It might end up being more unit test then end to end test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an end to end test. Seems like we don't error out on topology changes and more or less do it.

* their current witnessed offsets. This establishes a happens-before relationship: the
* participant's response contains offsets captured after receiving this request, which is
* sent after the repair starts.
*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
*
* <p>

inMigrationPendingRange = migrationInfo.isRangeInPendingMigration(metadata().id,
first.getToken(),
last.getToken());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could replace the above w/

KeyspaceMigrationInfo migrationInfo = ClusterMetadata.current().mutationTrackingMigrationState.getKeyspaceInfo(metadata().keyspace);
boolean inMigrationPendingRange = migrationInfo != null && migrationInfo.isRangeInPendingMigration(metadata().id, first.getToken(), last.getToken());

// when incremental repair streams SSTables that were written before tracking was enabled.
Preconditions.checkState(!cfstore.metadata().replicationType().isTracked()
|| ClusterMetadata.current().mutationTrackingMigrationState
.getKeyspaceInfo(cfstore.metadata().keyspace) != null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be nice to have something like an isMigrating(String) on MTMS, but just a matter of taste I guess.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it to use a helper.

{
Preconditions.checkState(!cfstore.metadata().replicationType().isTracked());
// Tracked tables may legitimately use this path during migration from untracked to tracked,
// when incremental repair streams SSTables that were written before tracking was enabled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that during migration to tracked, we'd expect these SSTables to have no coordinator log offsets then? Is that worth asserting?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't matter for imports, since the keyspace being currently tracked means we'll avoid this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we will actually hit this method during migration. The sstables might actually have offsets in them since tracked writes have already started and the incremental repair starts after.

// flag on the mutation hasn't been set yet at this point — it's set later in
// applyMutation() — so we check the handler type instead.
if (this instanceof ReadRepairVerbHandler)
return metadata;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess the other option would be something like a handlesReadRepair() method that only ReadRepairVerbHandler overrides, but it's literally called ReadRepairVerbHandler, and we probably won't have something else handle RR mutations.

In any case, I'm remembering blocking RR is going to be reworked for migration anyway, so ignore me :D

@Nonnull Collection<String> columnFamilies)
{
Iterable<TableMetadata> tables;
if (!columnFamilies.isEmpty())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is almost a case where null would be nice to indicate "all tables", in the sense that an empty collection might be more likely than null to indicate incorrect argument construction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an artifact of how RepairOption treats the empty set as "all tables". I'll update this method to use null and then fix it at the caller to convert an empty set to null.

RepairTask task = new PreviewRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), neighborsAndRanges.shouldExcludeDeadParticipants, cfnames);
return task.perform(executor, validationScheduler)
.<Pair<CoordinatedRepairResult, Supplier<String>>>map(r -> Pair.create(r, task::successMessage))
.addCallback((s, f) -> executor.shutdown());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block here is duplicated 3 more times below. The original code here avoided that by returning after the if/else stuff, but we could just delegate to a submitRepairTask() or something similar.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a helper.

RepairJobDesc desc = new RepairJobDesc(parentSession, TimeUUID.Generator.nextTimeUUID(),
keyspace, "Mutation Tracking Sync", List.of(range));
MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(
coordinator.ctx, desc, commonRange.endpoints, metadata);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MutationTrackingSyncCoordinator syncCoordinator =
    new MutationTrackingSyncCoordinator(coordinator.ctx, desc, commonRange.endpoints, metadata);

...might be a little easier on the eyes.

Pair<CoordinatedRepairResult, Supplier<String>> irPair = Pair.create(irResult, incrementalTask::successMessage);
mtTask.perform(executor, validationScheduler)
.addCallback(
mtResult -> result.trySuccess(irPair),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle partial failure here? (i.e. Do we just return the irPair result if the MT task partially fails?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A failure at any step is a failure of the entire thing since we didn't complete the entire repair. That is what this should be doing which is return failure immediately once any step fails.

* Determines if this keyspace should use mutation tracking incremental repair.
* Returns true if:
* - Keyspace uses mutation tracking replication, OR
* - Keyspace is currently migrating (either direction)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not strictly true if migrating to untracked?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address that in the follow up where I am changing migration from tracked to untracked to be instant.

for (Range<Token> range : commonRange.ranges)
{
RepairJobDesc desc = new RepairJobDesc(parentSession, TimeUUID.Generator.nextTimeUUID(),
keyspace, "Mutation Tracking Sync", List.of(range));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table name is meaningless here, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but I figured for debugging purposes it's clearer to not leave it empty.


if (overlappingShards.isEmpty())
{
completionFuture.setSuccess(null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be nice to have a DEBUG level log message to indicate this happened.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted it a checkState

}
// Always include the local node
liveHostIds.add(metadata.directory.peerId(ctx.broadcastAddressAndPort()).id());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we just build the liveHostIds at construction time, could we make it final?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'll make it a final ImmutableSet

if (completionFuture.isDone())
return;

recaptureTargets();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is called from updateReplicatedOffsets(), but does that mean we keep expanding the targets after the initial round of sync requests? (i.e. If there are ongoing writes, can this cause the whole IR to time out?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this shouldn't occur on offsets received. I missed this coming in from the original PR.

Exception error = null;
for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators)
{
long remainingNanos = deadlineNanos - coordinator.ctx.clock().nanoTime();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remainingNanos can be negative if the whole budget has elapsed? Should we even attempt to await in that case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the timeout is elapsed it will throw timeout exception. I think it's cleaner to just go in with the elapsed timeout so you always exit through the same timeout exception error path.

}
catch (Exception e)
{
error = Throwables.merge(error, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this going to be ExecutionException, and if so, do we have to unwrap the cause?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to/should? ExecutionException is part of the chain of exceptions? It's the caller that catches the exceptions that needs to analyze the chain and decide what to do. Reality here is that we don't care what errors happened and we aren't going to handle them individually we are just going to mark it as failed and log them.

}
finally
{
throw e;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this silently swallows any exception that comes out of the cancel() calls.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you are right. For some reason I thought Java would automatically add it as a suppressed exception. If one of the cancellations throws it will also skip the rest. I'll turn it into a loop that aggregates the exceptions and then rethrows with them as suppressed.

if (mtMigration)
{
KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace);
if (migrationInfo != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this check redundant now, since MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress() checks it above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I will restructure this.

{
try
{
syncCoordinators.forEach(MutationTrackingSyncCoordinator::cancel);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we only need to explicitly cancel() all the coordinators if interrupted, or would we have to do it on timeouts as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right we need something to do the cleanup because the SyncCoordinator doesn't register a timeout check that does the cancellation.

I think for the timeout case what I actually want to do is schedule a task to do the cancellation rather then juggle it here.

Implement mutation tracking repair as a new repair task that replaces
Merkle tree validation and streaming for tracked keyspaces. Instead of
building hash trees and comparing data, MutationTrackingSyncCoordinator
sends MT_SYNC_REQ to all participants to collect their witnessed offsets,
then waits for background offset broadcasts to confirm all replicas have
reconciled to those target offsets.

Key changes:

- Add MutationTrackingIncrementalRepairTask that creates per-range
  MutationTrackingSyncCoordinator instances and blocks until all
  shards reach the target reconciled offsets or timeout.

- Add MT_SYNC_REQ/MT_SYNC_RSP verbs and MutationTrackingSyncRequest/
  MutationTrackingSyncResponse messages for the repair protocol to
  establish a happens-before relationship between repair start and
  offset collection.

- RepairCoordinator.create() factory method snapshots TCM state to
  decide whether to use mutation tracking repair and flips incremental
  to false (skipping anti-compaction) when MT is active without
  migration.

- Support mutation tracking migration: during untracked->tracked
  migration, run incremental repair first (for pre-migration data),
  then MT sync. KeyspaceMigrationInfo validates that repair ranges
  don't partially overlap pending migration ranges and routes
  streaming/SSTable finalization through the correct tracked vs
  untracked path.

- Temporarily route read repair mutations through the untracked write
  path during migration by adding isReadRepair flag to Mutation and
  bypassing migration routing checks in ReadRepairVerbHandler and
  CassandraKeyspaceWriteHandler. This is a stopgap; CASSANDRA-21252
  will roll back this approach and handle read repair properly.

- Add offset collection APIs to CoordinatorLog, Node2OffsetsMap, and
  Shard for computing union and intersection of witnessed offsets
  scoped to specific participant host IDs (supporting --force with
  dead node exclusion).

- Add configurable mutation_tracking_sync_timeout (default 2m) with
  JMX get/set on StorageServiceMBean.

- Fix ActiveRepairService to use tryFailure() instead of setFailure()
  to avoid double-completion exceptions during concurrent repair
  failures.

Co-Authored-By: Ariel Weisberg <aweisberg@apple.com>
@aweisberg aweisberg merged commit 19d4f1b into apache:cep-45-mutation-tracking Apr 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants