From 18742bb4ef95a90695da18a43d21f49a7638a305 Mon Sep 17 00:00:00 2001 From: MdTanwer Date: Wed, 1 Jul 2026 05:59:09 +0000 Subject: [PATCH] Fix pause/resume logic in rolling restart IT Separate shutdown from pause so background updates resume after each node restart, and replace Thread.sleep with assertBusy for in-flight work. Resolves #20064. Signed-off-by: MdTanwer --- .../recovery/FullRollingRestartIT.java | 62 ++++++++++++++----- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java index 9b35bbb45c1ed..5da22c849aeca 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java @@ -46,7 +46,6 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Priority; -import org.opensearch.common.SuppressForbidden; import org.opensearch.common.collect.MapBuilder; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -539,7 +538,6 @@ public void testDerivedSourceWithMultiFieldsRollingRestart() throws Exception { } } - @SuppressForbidden(reason = "Need to fix: https://github.com/opensearch-project/OpenSearch/issues/20064") public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Exception { String mapping = """ { @@ -604,28 +602,51 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except // Start concurrent updates during rolling restart logger.info("--> starting rolling restart with concurrent updates"); - final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicBoolean shutdown = new AtomicBoolean(false); + final AtomicBoolean paused = new AtomicBoolean(false); + final AtomicInteger inFlight = new AtomicInteger(0); final AtomicInteger successfulUpdates = new AtomicInteger(0); - final CountDownLatch updateDocLatch = new CountDownLatch(docCount / 3); + final AtomicInteger nextDocId = new AtomicInteger(0); + final CountDownLatch initialUpdatesLatch = new CountDownLatch(docCount / 3); + final Object pauseLock = new Object(); final Thread updateThread = new Thread(() -> { - while (stop.get() == false) { + while (shutdown.get() == false) { + synchronized (pauseLock) { + while (paused.get() && shutdown.get() == false) { + try { + pauseLock.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + if (shutdown.get()) { + break; + } try { - // Update documents sequentially to avoid conflicts - for (int i = 0; i < docCount && !stop.get(); i++) { - client().prepareUpdate("test", String.valueOf(i)) + int docId = nextDocId.getAndIncrement() % docCount; + inFlight.incrementAndGet(); + try { + client().prepareUpdate("test", String.valueOf(docId)) .setRetryOnConflict(3) .setDoc("counter", successfulUpdates.get() + 1, "last_updated", System.currentTimeMillis(), "version", 1) .execute() .actionGet(TimeValue.timeValueSeconds(5)); successfulUpdates.incrementAndGet(); - updateDocLatch.countDown(); - Thread.sleep(50); // Larger delay between updates + initialUpdatesLatch.countDown(); + } finally { + inFlight.decrementAndGet(); + synchronized (pauseLock) { + pauseLock.notifyAll(); + } } } catch (Exception e) { logger.warn("Error in background update thread", e); } } }); + updateThread.setName("derived-source-update-thread"); try { // Add replicas @@ -639,14 +660,14 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except // Start updates after cluster is stable updateThread.start(); - // Wait for fix number of updates to go through - updateDocLatch.await(); + // Wait for a fixed number of updates to go through + initialUpdatesLatch.await(); // Rolling restart of all nodes for (String node : internalCluster().getNodeNames()) { - // Stop updates temporarily during node restart - stop.set(true); - Thread.sleep(1000); // Wait for in-flight operations to complete + // Pause updates temporarily during node restart + paused.set(true); + assertBusy(() -> assertEquals(0, inFlight.get())); internalCluster().restartNode(node); ensureGreen(TimeValue.timeValueSeconds(60)); @@ -656,12 +677,19 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except verifyDerivedSourceWithUpdates(docCount); // Resume updates - stop.set(false); + paused.set(false); + synchronized (pauseLock) { + pauseLock.notifyAll(); + } } } finally { // Clean shutdown - stop.set(true); + shutdown.set(true); + paused.set(false); + synchronized (pauseLock) { + pauseLock.notifyAll(); + } updateThread.join(TimeValue.timeValueSeconds(30).millis()); if (updateThread.isAlive()) { updateThread.interrupt();