Skip to content

fix(tiering): Track force merges in activeMerges to prevent tiering before force merge completion#22370

Merged
Bukhtawar merged 7 commits into
opensearch-project:mainfrom
KhishorekumarBS:fix/force-merge-tiering-drain-race
Jul 3, 2026
Merged

fix(tiering): Track force merges in activeMerges to prevent tiering before force merge completion#22370
Bukhtawar merged 7 commits into
opensearch-project:mainfrom
KhishorekumarBS:fix/force-merge-tiering-drain-race

Conversation

@KhishorekumarBS

@KhishorekumarBS KhishorekumarBS commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Description

MergeScheduler.forceMerge() bypasses the activeMerges counter, making in-flight force merges invisible to onMergesDrained(). When AutoForceMergeManager triggers a force merge and tiering starts before it completes, tiering proceeds without waiting — causing the merged segment to never reach remote store and leaving the
replica permanently diverged.

Changes

  • MergeScheduler.java: Increment/decrement activeMerges in forceMerge() + fire drain listeners on completion
  • IndexShard.java: Add waitForReplicaSync(TimeValue) as belt-and-suspenders check
  • TransportPrepareTieringAction.java: Call waitForReplicaSync after waitForRemoteStoreSync
  • Tests: Verify force merges block drain, activeMerges includes force merges, waitForReplicaSync timeout propagates

Testing

  • Unit tests: MergeSchedulerOnDrainedTests, TransportPrepareTieringActionTests, IndexShardTests
  • Integration test: Added

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 6cc8bfc)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Force Merge Double-Increment Risk

forceMerge() now increments activeMerges once at entry and calls runMerge() for each OneMerge in the loop. If runMerge() (or submitMergeTask() internally) also touches activeMerges via the background path, or if force merges spawn sub-merges through submitMergeTask, the counter could be double-counted. Additionally, only a single increment covers potentially many OneMerge entries — if the intent was one increment per merge unit, drain semantics may still fire prematurely between individual merges within the loop. Please confirm that a single increment for the entire force-merge batch is the intended semantics.

public void forceMerge(int maxNumSegment) throws IOException {
    assert Thread.currentThread().getName().contains(ThreadPool.Names.FORCE_MERGE)
        : "forceMerge must be called on FORCE_MERGE thread but was: " + Thread.currentThread().getName();
    forceMergeLock.acquireUninterruptibly();
    activeMerges.incrementAndGet();
    try {
        if (isShutdown.get()) {
            logger.debug("MergeScheduler is shutdown, skipping force merge");
            return;
        }
        Collection<OneMerge> oneMerges = mergeHandler.findForceMerges(maxNumSegment);
        for (OneMerge oneMerge : oneMerges) {
            if (isShutdown.get()) {
                logger.debug("MergeScheduler shutdown during force merge, aborting remaining merges");
                break;
            }
            runMerge(oneMerge);
        }
    } finally {
        decrementAndFireDrainListeners();
        forceMergeLock.release();
    }
}
Hard-Coded Poll Interval

waitForReplicaSync polls every 500ms via a hard-coded Thread.sleep(500), while REPLICA_SYNC_POLL_INTERVAL_MS constant is declared but unused. If timeout is less than 500ms the loop will effectively sleep past the deadline before checking again, causing the method to overshoot the timeout budget. Use the constant and/or clamp the sleep to min(500, remainingTimeout).

public void waitForReplicaSync(TimeValue timeout) throws IOException {
    if (!indexSettings.isSegRepEnabledOrRemoteNode()) {
        return;
    }
    long startNanos = System.nanoTime();
    Set<SegmentReplicationShardStats> stats = Set.of();
    while (System.nanoTime() - startNanos < timeout.nanos()) {
        stats = getReplicationStatsForTrackedReplicas();
        if (stats.isEmpty()
            || stats.stream()
                .allMatch(
                    s -> s.getCheckpointsBehindCount() == 0 && s.getBytesBehindCount() == 0 && s.getCurrentReplicationTimeMillis() == 0
                )) {
            logger.debug("All replicas in sync for shard [{}]", shardId);
            return;
        }
        long behindReplicas = stats.stream().filter(s -> s.getCheckpointsBehindCount() > 0 || s.getBytesBehindCount() > 0).count();
        long maxCheckpointsBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getCheckpointsBehindCount).max().orElse(0);
        long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0);
        logger.debug(
            "Waiting for replica sync on shard [{}]: {} replica(s) still behind, max checkpoints behind: {}, max bytes behind: {}",
            shardId,
            behindReplicas,
            maxCheckpointsBehind,
            maxBytesBehind
        );
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new OpenSearchException("Interrupted waiting for replica sync on shard [" + shardId + "]", e);
        }
    }
Sync Check Too Strict

The in-sync condition requires getCurrentReplicationTimeMillis() == 0. Any tracked replica currently mid-replication (even briefly) will keep the primary blocked until timeout, even if it is caught up in terms of checkpoints/bytes. Since this is called synchronously on the tiering path with a 30s default timeout, transient replication activity on healthy replicas could cause spurious tiering failures. Consider relaxing to checkpoints/bytes-behind only, or documenting why in-progress replication must be zero.

    || stats.stream()
        .allMatch(
            s -> s.getCheckpointsBehindCount() == 0 && s.getBytesBehindCount() == 0 && s.getCurrentReplicationTimeMillis() == 0
        )) {
    logger.debug("All replicas in sync for shard [{}]", shardId);
    return;
}

…ace condition

MergeScheduler.forceMerge() runs merges by calling runMerge() directly,
bypassing submitMergeTask() which is the only place that increments
activeMerges. This makes in-flight force merges invisible to
onMergesDrained(), causing tiering's prepare step to proceed immediately
while a force merge is still running.

When AutoForceMergeManager triggers a force merge on an idle shard and
tiering is triggered before it completes, the merge finishes after shard
relocation. The merged segment is never uploaded to remote store
(RemoteStoreRefreshListener is already closed), leaving the replica
permanently diverged with a linearly growing replication lag.

Changes:
- Increment activeMerges in forceMerge() so onMergesDrained correctly
  waits for in-flight force merges before proceeding with tiering
- Fire drain listeners in forceMerge() finally block when all merges
  complete (mirrors submitMergeTask behavior)
- Add waitForReplicaSync() to IndexShard to verify replicas are in sync
  after waitForRemoteStoreSync() in the tiering prepare step
- Add unit tests verifying force merges block drain and are visible
  to getActiveMergeCount()

Signed-off-by: bkhishor <bkhishor@amazon.com>
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 6cc8bfc

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Use declared constant for poll interval

The hardcoded Thread.sleep(500) ignores the REPLICA_SYNC_POLL_INTERVAL_MS constant
declared for this exact purpose. Use the constant to keep the poll interval
configurable and consistent, and to avoid the mismatch between the documented
("every 500ms") behavior and a magic number.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3893-3898]

 try {
-    Thread.sleep(500);
+    Thread.sleep(REPLICA_SYNC_POLL_INTERVAL_MS);
 } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new OpenSearchException("Interrupted waiting for replica sync on shard [" + shardId + "]", e);
 }
Suggestion importance[1-10]: 5

__

Why: Valid observation — the REPLICA_SYNC_POLL_INTERVAL_MS constant is declared but never used, and the hardcoded 500 is a magic number. Using the constant improves consistency and maintainability.

Low
Bound replica sync by remaining budget

replicaSyncTimeout is volatile and read once here, but the entire prepare flow
already has a prepareTieringTimeout. Confirm the two timeouts don't compound in a
way that exceeds the outer request timeout — otherwise the caller may see the
request hang past its declared timeout. Consider passing a remaining-budget-based
TimeValue rather than the static setting.

server/src/main/java/org/opensearch/storage/action/tiering/TransportPrepareTieringAction.java [265-269]

+indexShard.waitForRemoteStoreSync();
+// Wait for replicas to apply the latest checkpoint before relocation.
+// This ensures replicas have downloaded the merged segment(s) from remote store.
+indexShard.waitForReplicaSync(replicaSyncTimeout);
+verifyNoUncommittedOps(indexShard, shardRouting);
 
-
Suggestion importance[1-10]: 4

__

Why: Raises a valid concern about timeout compounding, but the suggestion is speculative ("Confirm...") and the improved_code is identical to existing_code, providing no actual fix.

Low
Move increment inside try for safety

Incrementing activeMerges before the isShutdown early-return branch is fine, but the
return inside the try block will still hit the finally and call
decrementAndFireDrainListeners(), which is correct. However, if findForceMerges or
any code before the loop throws, the counter is properly decremented. Verify that no
early return path bypasses the finally — currently OK, but consider moving the
increment to just before entering the try to make the invariant clearer and avoid a
leak if acquireUninterruptibly future-changes to throw.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [156-157]

 forceMergeLock.acquireUninterruptibly();
-activeMerges.incrementAndGet();
 try {
+    activeMerges.incrementAndGet();
     if (isShutdown.get()) {
         logger.debug("MergeScheduler is shutdown, skipping force merge");
         return;
Suggestion importance[1-10]: 3

__

Why: The current code is functionally correct since acquireUninterruptibly does not throw. The suggested change is a minor stylistic improvement for defensive coding but offers marginal value.

Low
Possible issue
Ensure at least one sync check runs

If timeout.nanos() is very small (or zero), the loop condition may be false on the
first iteration and the method will skip the initial stats check, jumping straight
to the timeout exception without ever polling. Perform at least one stats check
before evaluating the timeout so that an already-in-sync state is detected
regardless of timeout value.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3867-3873]

 public void waitForReplicaSync(TimeValue timeout) throws IOException {
     if (!indexSettings.isSegRepEnabledOrRemoteNode()) {
         return;
     }
     long startNanos = System.nanoTime();
     Set<SegmentReplicationShardStats> stats = Set.of();
-    while (System.nanoTime() - startNanos < timeout.nanos()) {
+    do {
Suggestion importance[1-10]: 5

__

Why: Reasonable edge case — with a very small timeout the loop could exit without checking stats. However, the setting has a minimum of 5s so this is unlikely in practice; still a minor correctness improvement.

Low

Previous suggestions

Suggestions up to commit 0450a30
CategorySuggestion                                                                                                                                    Impact
General
Use declared constant for sleep interval

The hardcoded Thread.sleep(500) should reference the declared constant
REPLICA_SYNC_POLL_INTERVAL_MS to keep the polling interval consistent and
configurable in one place. As written, changing the constant has no effect.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3893-3898]

 try {
-    Thread.sleep(500);
+    Thread.sleep(REPLICA_SYNC_POLL_INTERVAL_MS);
 } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new OpenSearchException("Interrupted waiting for replica sync on shard [" + shardId + "]", e);
 }
Suggestion importance[1-10]: 5

__

Why: Valid observation — the constant REPLICA_SYNC_POLL_INTERVAL_MS is declared but not used in the sleep call, making it dead code. Using it improves maintainability but has no functional impact.

Low
Do not discard async force-merge future

The async execute() call returns an ActionFuture that is discarded. If the
force-merge request fails to be dispatched (e.g., a validation error), the failure
will be silently swallowed and the test will incorrectly attribute any subsequent
failure to the tiering path. Capture the future and either check its state at the
end or wire in a listener that records failures for the test to assert on.

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/DataFormatAwarePrepareTieringAsyncIT.java [243]

-client().admin().indices().prepareForceMerge(ASYNC_INDEX).setMaxNumSegments(1).setFlush(false).execute();
+var forceMergeFuture = client().admin().indices().prepareForceMerge(ASYNC_INDEX).setMaxNumSegments(1).setFlush(false).execute();
 
 PrepareTieringRequest request = new PrepareTieringRequest(ASYNC_INDEX);
Suggestion importance[1-10]: 4

__

Why: Discarding the future is intentional here to test the race condition where force merge runs concurrently with prepare tiering. Capturing it could add value for diagnostics but is not critical to test correctness.

Low
Possible issue
Ensure paired increment/decrement under lock

The return inside the try block will still fire the finally which calls
decrementAndFireDrainListeners(), so the counter balance is fine. However, if
findForceMerges or any earlier logic throws before the counter is incremented (e.g.
between acquireUninterruptibly and incrementAndGet), the counter won't be
decremented but the lock also won't be released. Move the increment before
acquireUninterruptibly is fine, but ensure both increment and lock acquisition are
inside a structure that guarantees paired cleanup — currently they are not
symmetric.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [156-174]

 forceMergeLock.acquireUninterruptibly();
-activeMerges.incrementAndGet();
 try {
-    if (isShutdown.get()) {
-        logger.debug("MergeScheduler is shutdown, skipping force merge");
-        return;
+    activeMerges.incrementAndGet();
+    try {
+        if (isShutdown.get()) {
+            logger.debug("MergeScheduler is shutdown, skipping force merge");
+            return;
Suggestion importance[1-10]: 3

__

Why: The current code has acquireUninterruptibly() followed immediately by incrementAndGet() with no code that can throw between them, so the concern about asymmetric cleanup is largely theoretical. The suggestion offers marginal defensive value.

Low
Suggestions up to commit c6a2b50
CategorySuggestion                                                                                                                                    Impact
General
Use declared constant for poll interval

The hardcoded 500 should use the newly-declared constant
REPLICA_SYNC_POLL_INTERVAL_MS to avoid divergence between the constant and the
actual sleep interval. Currently the constant is declared but unused.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3893-3895]

 try {
-    Thread.sleep(500);
+    Thread.sleep(REPLICA_SYNC_POLL_INTERVAL_MS);
 } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new OpenSearchException("Interrupted waiting for replica sync on shard [" + shardId + "]", e);
 }
Suggestion importance[1-10]: 6

__

Why: Valid observation — the constant REPLICA_SYNC_POLL_INTERVAL_MS is declared but not used; using it improves maintainability and avoids divergence.

Low
Check replica sync state before polling loop

If timeout.nanos() is zero or negative, the loop is never entered and the method
throws immediately without ever checking replica state. Consider performing at least
one stats check before the loop, or explicitly handle non-positive timeouts to avoid
a spurious REPLICA_SYNC_TIMEOUT when replicas are actually already in sync.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3867-3873]

 public void waitForReplicaSync(TimeValue timeout) throws IOException {
     if (!indexSettings.isSegRepEnabledOrRemoteNode()) {
         return;
     }
     long startNanos = System.nanoTime();
-    Set<SegmentReplicationShardStats> stats = Set.of();
+    Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
+    if (stats.isEmpty()
+        || stats.stream().allMatch(s -> s.getCheckpointsBehindCount() == 0 && s.getBytesBehindCount() == 0 && s.getCurrentReplicationTimeMillis() == 0)) {
+        return;
+    }
     while (System.nanoTime() - startNanos < timeout.nanos()) {
Suggestion importance[1-10]: 5

__

Why: Reasonable defensive improvement; a zero/negative timeout would currently throw spuriously without checking state. Minor edge case impact.

Low
Ensure increment/decrement symmetry on force merge

When isShutdown is true, the method returns via the early return inside the try
block, which will execute the finally block calling
decrementAndFireDrainListeners(). That's correct, but if runMerge inside the loop
throws, the exception propagates and decrementAndFireDrainListeners() is still
called — good. However, note that the increment happens after
acquireUninterruptibly(); if the JVM is interrupted or an unchecked error occurs
between these two lines (extremely unlikely but possible with OOM), the counter is
not incremented while the lock is held. Move the increment before acquiring the
lock, or wrap the acquire+increment together to ensure symmetry with decrement.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [156-157]

+activeMerges.incrementAndGet();
 forceMergeLock.acquireUninterruptibly();
-activeMerges.incrementAndGet();
 try {
     if (isShutdown.get()) {
         logger.debug("MergeScheduler is shutdown, skipping force merge");
         return;
Suggestion importance[1-10]: 3

__

Why: The concern is largely theoretical; acquireUninterruptibly() does not throw InterruptedException, and reordering doesn't provide meaningful safety benefit. Low impact.

Low
Suggestions up to commit 1910145
CategorySuggestion                                                                                                                                    Impact
Possible issue
Synchronize drain listener firing

The listener draining logic is not synchronized with concurrent additions/removals
to onDrainedListeners. If another thread invokes onDrained between the isEmpty()
check and List.copyOf, or after clear(), listeners may be lost or fire twice. Guard
this block with the same lock/synchronization used elsewhere for onDrainedListeners.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [171-186]

 } finally {
     activeMerges.decrementAndGet();
-    // Fire all drain listeners if all merges completed and none pending.
-    // This ensures tiering's onMergesDrained callback fires after a force merge completes.
-    if (isFrozen() && activeMerges.get() == 0 && !mergeHandler.hasPendingMerges() && !onDrainedListeners.isEmpty()) {
-        List<Runnable> listeners = List.copyOf(onDrainedListeners);
-        onDrainedListeners.clear();
-        for (Runnable listener : listeners) {
-            try {
-                listener.run();
-            } catch (Exception ex) {
-                logger.warn("Exception in onDrained listener", ex);
+    synchronized (onDrainedListeners) {
+        if (isFrozen() && activeMerges.get() == 0 && !mergeHandler.hasPendingMerges() && !onDrainedListeners.isEmpty()) {
+            List<Runnable> listeners = List.copyOf(onDrainedListeners);
+            onDrainedListeners.clear();
+            for (Runnable listener : listeners) {
+                try {
+                    listener.run();
+                } catch (Exception ex) {
+                    logger.warn("Exception in onDrained listener", ex);
+                }
             }
         }
     }
     forceMergeLock.release();
 }
Suggestion importance[1-10]: 6

__

Why: Potential concurrency issue: if onDrained is called concurrently by another thread, listeners could be lost or fire twice. However, the correctness depends on the actual synchronization used elsewhere in the class, which isn't fully visible in the diff.

Low
Avoid double-counting active merges

The activeMerges counter is incremented before the shutdown check but the early
return in the shutdown path exits without decrementing (the finally block will
decrement, so this is actually fine) — however, if runMerge internally also
increments activeMerges, this causes double-counting for every force-merged
OneMerge. Verify that runMerge does not also increment activeMerges, otherwise the
counter will be inflated and onMergesDrained will never fire.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [156-161]

 forceMergeLock.acquireUninterruptibly();
+// Increment once for the entire force-merge operation; runMerge must NOT increment activeMerges again
 activeMerges.incrementAndGet();
 try {
     if (isShutdown.get()) {
         logger.debug("MergeScheduler is shutdown, skipping force merge");
         return;
Suggestion importance[1-10]: 3

__

Why: The suggestion only asks the author to verify that runMerge does not also increment activeMerges, without providing evidence that a double-count occurs. The improved code is essentially identical to the existing code with only an added comment.

Low
General
Use defined constant for poll interval

The hardcoded 500 should reference the REPLICA_SYNC_POLL_INTERVAL_MS constant that
was introduced for this purpose. Using the magic number here makes the constant dead
code and defeats its purpose of centralized configuration.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3893-3898]

 try {
-    Thread.sleep(500);
+    Thread.sleep(REPLICA_SYNC_POLL_INTERVAL_MS);
 } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new OpenSearchException("Interrupted waiting for replica sync on shard [" + shardId + "]", e);
 }
Suggestion importance[1-10]: 5

__

Why: Valid observation — the constant REPLICA_SYNC_POLL_INTERVAL_MS was defined but not used, and using it improves maintainability. Minor impact.

Low
Make replica sync timeout configurable

REPLICA_SYNC_TIMEOUT is hardcoded at 30 seconds and not configurable, unlike
prepareTieringTimeout which is volatile and dynamically settable. Large indexes with
big segments may require longer sync times; consider making this a cluster setting
or deriving it from prepareTieringTimeout to avoid rigid failures in production.

server/src/main/java/org/opensearch/storage/action/tiering/TransportPrepareTieringAction.java [258-260]

 indexShard.waitForRemoteStoreSync();
 // Wait for replicas to apply the latest checkpoint before relocation.
 // This ensures replicas have downloaded the merged segment(s) from remote store.
-indexShard.waitForReplicaSync(REPLICA_SYNC_TIMEOUT);
+indexShard.waitForReplicaSync(prepareTieringTimeout);
 verifyNoUncommittedOps(indexShard, shardRouting);
Suggestion importance[1-10]: 4

__

Why: Reasonable observation about hardcoded timeout being potentially insufficient for large indexes, but the suggested change to use prepareTieringTimeout may not be semantically correct as they represent different operations.

Low
Suggestions up to commit 314ef57
CategorySuggestion                                                                                                                                    Impact
Possible issue
Synchronize drain listener firing to avoid lost listeners

The listener firing logic reads and clears onDrainedListeners without
synchronization, while onDrained() may add listeners concurrently. If a listener is
added after List.copyOf but before clear(), it will be silently dropped and never
fire. Guard this block with the same lock/synchronization used in onDrained() to
prevent lost listeners.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [171-185]

 } finally {
     activeMerges.decrementAndGet();
-    // Fire all drain listeners if all merges completed and none pending.
-    // This ensures tiering's onMergesDrained callback fires after a force merge completes.
-    if (isFrozen() && activeMerges.get() == 0 && !mergeHandler.hasPendingMerges() && !onDrainedListeners.isEmpty()) {
-        List<Runnable> listeners = List.copyOf(onDrainedListeners);
-        onDrainedListeners.clear();
+    synchronized (onDrainedListeners) {
+        if (isFrozen() && activeMerges.get() == 0 && !mergeHandler.hasPendingMerges() && !onDrainedListeners.isEmpty()) {
+            List<Runnable> listeners = List.copyOf(onDrainedListeners);
+            onDrainedListeners.clear();
+            for (Runnable listener : listeners) {
+                try {
+                    listener.run();
+                } catch (Exception ex) {
+                    logger.warn("Exception in onDrained listener", ex);
+                }
+            }
+        }
+    }
+    forceMergeLock.release();
+}
Suggestion importance[1-10]: 6

__

Why: Points out a potential concurrency issue where listeners added between copyOf and clear() could be lost, but validity depends on the actual concurrency semantics of onDrainedListeners and onDrained() which aren't fully visible in the diff.

Low
General
Ensure at least one sync check before timeout

When timeout is smaller than REPLICA_SYNC_POLL_INTERVAL_MS (500ms), the loop may
sleep past the deadline and then re-check on the next iteration — but if the first
check fails and sleep exceeds timeout, the method throws without one last check.
Also, if timeout.nanos() is 0 or negative, the loop never executes and the method
throws immediately even if replicas are in sync. Consider performing at least one
check before the timeout comparison.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3871-3878]

 long startNanos = System.nanoTime();
 Set<SegmentReplicationShardStats> stats = Set.of();
-while (System.nanoTime() - startNanos < timeout.nanos()) {
+do {
     stats = getReplicationStatsForTrackedReplicas();
     if (stats.isEmpty() || stats.stream().allMatch(s -> s.getCheckpointsBehindCount() == 0)) {
         logger.debug("All replicas in sync for shard [{}]", shardId);
         return;
     }
+} while (System.nanoTime() - startNanos < timeout.nanos() && sleepAndContinue());
Suggestion importance[1-10]: 5

__

Why: Reasonable edge-case observation about zero/small timeouts, though the improved_code references a non-existent sleepAndContinue() helper making it not directly applicable.

Low
Avoid counting merges on shutdown path

The activeMerges counter is incremented before the shutdown check, but if shutdown
is true the method returns early via the return statement. Since the decrement is in
the finally block, this specific path is fine, but consider whether the
drain-listener firing logic in finally should still execute on the shutdown path —
currently it will, which may be unintended. Verify this behavior is correct on
shutdown, or move the increment after the shutdown check to avoid touching
activeMerges when no merges actually run.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [157-161]

 forceMergeLock.acquireUninterruptibly();
-activeMerges.incrementAndGet();
 try {
     if (isShutdown.get()) {
         logger.debug("MergeScheduler is shutdown, skipping force merge");
         return;
+    }
+    activeMerges.incrementAndGet();
+    try {
Suggestion importance[1-10]: 4

__

Why: Valid observation that incrementing activeMerges before the shutdown check causes the drain-listener block to run even on shutdown paths, but the impact is minor since no merges actually happened and the counter is properly decremented.

Low
Suggestions up to commit b84ffcd
CategorySuggestion                                                                                                                                    Impact
General
Ensure at least one sync check occurs

If timeout is smaller than REPLICA_SYNC_POLL_INTERVAL_MS (500ms), or the first check
happens near the timeout boundary, the loop may exit without ever having checked
once past the initial iteration and the sleep may exceed the remaining time.
Consider checking stats once before entering the loop and clamping the sleep to
remaining time so short timeouts still get evaluated correctly.

server/src/main/java/org/opensearch/index/shard/IndexShard.java [3864-3887]

 long startNanos = System.nanoTime();
-Set<SegmentReplicationShardStats> stats = Set.of();
+Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
+if (stats.isEmpty() || stats.stream().allMatch(s -> s.getCheckpointsBehindCount() == 0)) {
+    logger.debug("All replicas in sync for shard [{}]", shardId);
+    return;
+}
 while (System.nanoTime() - startNanos < timeout.nanos()) {
     stats = getReplicationStatsForTrackedReplicas();
     if (stats.isEmpty() || stats.stream().allMatch(s -> s.getCheckpointsBehindCount() == 0)) {
         logger.debug("All replicas in sync for shard [{}]", shardId);
         return;
     }
Suggestion importance[1-10]: 6

__

Why: Valid concern: with a short timeout, the loop may sleep 500ms past the deadline or the sleep exceeds remaining time, causing spurious timeouts. Clamping sleep to remaining time improves correctness for short timeouts.

Low
Pair counter increment with try/finally safely

The activeMerges counter is incremented before the shutdown check, but on the
shutdown early-return path, control flows through the finally block which correctly
decrements it. However, if findForceMerges throws before entering the loop, the
counter is still balanced by finally, which is correct. The real issue: the
increment happens outside the try, so if activeMerges.incrementAndGet() itself is
fine but a later addition throws between incrementAndGet and try, the counter would
leak. Move the increment inside the try block to guarantee finally-based decrement
pairing.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [156-158]

 forceMergeLock.acquireUninterruptibly();
-activeMerges.incrementAndGet();
 try {
+    activeMerges.incrementAndGet();
     if (isShutdown.get()) {
         logger.debug("MergeScheduler is shutdown, skipping force merge");
         return;
 }
Suggestion importance[1-10]: 4

__

Why: Moving activeMerges.incrementAndGet() inside the try block is a minor defensive improvement, but in practice AtomicInteger.incrementAndGet() after a successful lock acquire is unlikely to throw. Marginal robustness gain.

Low
Possible issue
Guard drain-fire against concurrent listener races

The check-then-modify sequence on onDrainedListeners (copy then clear) is not atomic
with respect to concurrent onDrained() calls or other threads firing the same
listeners. A listener could be double-invoked or missed if another thread reads
activeMerges == 0 simultaneously. Synchronize the drain-fire block with the same
lock/monitor used elsewhere in the scheduler for onDrainedListeners to ensure
atomicity.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [171-186]

 } finally {
     activeMerges.decrementAndGet();
-    // Fire all drain listeners if all merges completed and none pending.
-    // This ensures tiering's onMergesDrained callback fires after a force merge completes.
-    if (isFrozen() && activeMerges.get() == 0 && !mergeHandler.hasPendingMerges() && !onDrainedListeners.isEmpty()) {
-        List<Runnable> listeners = List.copyOf(onDrainedListeners);
-        onDrainedListeners.clear();
-        for (Runnable listener : listeners) {
-            try {
-                listener.run();
-            } catch (Exception ex) {
-                logger.warn("Exception in onDrained listener", ex);
+    synchronized (onDrainedListeners) {
+        if (isFrozen() && activeMerges.get() == 0 && !mergeHandler.hasPendingMerges() && !onDrainedListeners.isEmpty()) {
+            List<Runnable> listeners = List.copyOf(onDrainedListeners);
+            onDrainedListeners.clear();
+            for (Runnable listener : listeners) {
+                try {
+                    listener.run();
+                } catch (Exception ex) {
+                    logger.warn("Exception in onDrained listener", ex);
+                }
             }
         }
     }
     forceMergeLock.release();
 }
Suggestion importance[1-10]: 6

__

Why: Concurrent invocation of drain-fire from different code paths could lead to duplicate or missed listener invocations. Adding synchronization on onDrainedListeners addresses a real potential race, though the actual likelihood depends on the surrounding code not shown.

Low

@KhishorekumarBS KhishorekumarBS force-pushed the fix/force-merge-tiering-drain-race branch from fc254ba to a2f82ab Compare July 1, 2026 12:02
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a2f82ab

Comment thread server/src/main/java/org/opensearch/index/shard/IndexShard.java
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for a2f82ab: SUCCESS

@codecov

codecov Bot commented Jul 1, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 79.62963% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.40%. Comparing base (95ece9b) to head (6cc8bfc).
⚠️ Report is 9 commits behind head on main.

Files with missing lines Patch % Lines
...in/java/org/opensearch/index/shard/IndexShard.java 86.20% 3 Missing and 1 partial ⚠️
.../action/tiering/TransportPrepareTieringAction.java 42.85% 4 Missing ⚠️
.../index/engine/dataformat/merge/MergeScheduler.java 78.57% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22370      +/-   ##
============================================
- Coverage     73.44%   73.40%   -0.05%     
- Complexity    76114    76142      +28     
============================================
  Files          6076     6076              
  Lines        345508   345552      +44     
  Branches      49732    49735       +3     
============================================
- Hits         253773   253653     -120     
- Misses        71497    71668     +171     
+ Partials      20238    20231       -7     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 8a7d7b5

- Add IndexShardTests for waitForReplicaSync behavior
- Refine integration test assertions

Signed-off-by: bkhishor <bkhishor@amazon.com>
@KhishorekumarBS KhishorekumarBS force-pushed the fix/force-merge-tiering-drain-race branch from 8a7d7b5 to 76d6942 Compare July 1, 2026 15:33
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 76d6942

…rdcoded value

The force merge may or may not complete before prepare runs (timing-dependent).
Assert that replica mirrors primary's segment layout rather than expecting
exactly 1 segment each.

Signed-off-by: bkhishor <bkhishor@amazon.com>
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e79e354

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

❌ Gradle check result for e79e354: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7379268

@KhishorekumarBS KhishorekumarBS force-pushed the fix/force-merge-tiering-drain-race branch from 7379268 to e8b3592 Compare July 2, 2026 02:11
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e8b3592

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for e8b3592: SUCCESS

Signed-off-by: bkhishor <bkhishor@amazon.com>
@KhishorekumarBS KhishorekumarBS force-pushed the fix/force-merge-tiering-drain-race branch from e8b3592 to b84ffcd Compare July 2, 2026 05:49
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b84ffcd

…clusters

Analogous to MergeDrainTimeoutException.MERGE_DRAIN_TIMEOUT_MARKER, this
stable marker substring enables log parsing and coordinator-side detection
of replica sync timeouts during tiering preparation.

Signed-off-by: bkhishor <bkhishor@amazon.com>
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 314ef57

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 314ef57: SUCCESS

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1910145

Comment thread server/src/main/java/org/opensearch/index/shard/IndexShard.java
Comment thread server/src/main/java/org/opensearch/index/shard/IndexShard.java
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 1910145: SUCCESS

…onTime

waitForReplicaSync previously only checked checkpointsBehindCount == 0,
which can be true even when the replica has stale segment layout (DFA
metadata map mismatch). Now checks all three conditions:
- checkpointsBehindCount == 0
- bytesBehindCount == 0
- currentReplicationTimeMillis == 0

Also upgrades log statements from DEBUG to INFO for operational visibility
during tiering preparation.

Adds unit tests for bytesBehind and replicationTime timeout scenarios.

Signed-off-by: bkhishor <bkhishor@amazon.com>
@KhishorekumarBS KhishorekumarBS force-pushed the fix/force-merge-tiering-drain-race branch from 1910145 to c6a2b50 Compare July 2, 2026 10:09
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c6a2b50

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for c6a2b50: SUCCESS

Comment thread server/src/main/java/org/opensearch/index/shard/IndexShard.java
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0450a30

Add tiering.prepare.replica_sync_timeout as a dynamic cluster setting
(default 30s, range 5s-5m) so operators can tune the replica sync wait
time during tiering preparation for large indices.

Signed-off-by: bkhishor <bkhishor@amazon.com>
@KhishorekumarBS KhishorekumarBS force-pushed the fix/force-merge-tiering-drain-race branch from 0450a30 to 6cc8bfc Compare July 2, 2026 18:17
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6cc8bfc

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 6cc8bfc: SUCCESS

@Bukhtawar Bukhtawar left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, lets see how we can avoid blocking on replica sync, add a TODO

@Bukhtawar Bukhtawar merged commit b2bc553 into opensearch-project:main Jul 3, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants