Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Priority;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -539,7 +538,6 @@ public void testDerivedSourceWithMultiFieldsRollingRestart() throws Exception {
}
}

@SuppressForbidden(reason = "Need to fix: https://github.com/opensearch-project/OpenSearch/issues/20064")
public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Exception {
String mapping = """
{
Expand Down Expand Up @@ -604,28 +602,51 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
// Start concurrent updates during rolling restart
logger.info("--> starting rolling restart with concurrent updates");

final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicBoolean shutdown = new AtomicBoolean(false);
final AtomicBoolean paused = new AtomicBoolean(false);
final AtomicInteger inFlight = new AtomicInteger(0);
final AtomicInteger successfulUpdates = new AtomicInteger(0);
final CountDownLatch updateDocLatch = new CountDownLatch(docCount / 3);
final AtomicInteger nextDocId = new AtomicInteger(0);
final CountDownLatch initialUpdatesLatch = new CountDownLatch(docCount / 3);
final Object pauseLock = new Object();
final Thread updateThread = new Thread(() -> {
while (stop.get() == false) {
while (shutdown.get() == false) {
synchronized (pauseLock) {
while (paused.get() && shutdown.get() == false) {
try {
pauseLock.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
if (shutdown.get()) {
break;
}
try {
// Update documents sequentially to avoid conflicts
for (int i = 0; i < docCount && !stop.get(); i++) {
client().prepareUpdate("test", String.valueOf(i))
int docId = nextDocId.getAndIncrement() % docCount;
inFlight.incrementAndGet();
try {
client().prepareUpdate("test", String.valueOf(docId))
.setRetryOnConflict(3)
.setDoc("counter", successfulUpdates.get() + 1, "last_updated", System.currentTimeMillis(), "version", 1)
.execute()
.actionGet(TimeValue.timeValueSeconds(5));
successfulUpdates.incrementAndGet();
updateDocLatch.countDown();
Thread.sleep(50); // Larger delay between updates
initialUpdatesLatch.countDown();
} finally {
inFlight.decrementAndGet();
synchronized (pauseLock) {
pauseLock.notifyAll();
}
}
} catch (Exception e) {
logger.warn("Error in background update thread", e);
}
}
});
updateThread.setName("derived-source-update-thread");

try {
// Add replicas
Expand All @@ -639,14 +660,14 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except

// Start updates after cluster is stable
updateThread.start();
// Wait for fix number of updates to go through
updateDocLatch.await();
// Wait for a fixed number of updates to go through
initialUpdatesLatch.await();

// Rolling restart of all nodes
for (String node : internalCluster().getNodeNames()) {
// Stop updates temporarily during node restart
stop.set(true);
Thread.sleep(1000); // Wait for in-flight operations to complete
// Pause updates temporarily during node restart
paused.set(true);
assertBusy(() -> assertEquals(0, inFlight.get()));

internalCluster().restartNode(node);
ensureGreen(TimeValue.timeValueSeconds(60));
Expand All @@ -656,12 +677,19 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
verifyDerivedSourceWithUpdates(docCount);

// Resume updates
stop.set(false);
paused.set(false);
synchronized (pauseLock) {
pauseLock.notifyAll();
}
}

} finally {
// Clean shutdown
stop.set(true);
shutdown.set(true);
paused.set(false);
synchronized (pauseLock) {
pauseLock.notifyAll();
}
updateThread.join(TimeValue.timeValueSeconds(30).millis());
if (updateThread.isAlive()) {
updateThread.interrupt();
Expand Down