Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.ingest.GetPipelineResponse;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesArray;
Expand All @@ -30,6 +31,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
Expand All @@ -40,6 +42,13 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IngestPipelineFromKafkaIT extends KafkaIngestionBaseIT {

/**
* Wait budget for assertions on the Painless / field-mapping path.
* The base class's default 1-minute budget has been observed as borderline;
* doubling it gives headroom without meaningfully slowing the green case.
*/
private static final long PIPELINE_WAIT_TIMEOUT_MIN = 2L;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 minute is used for other tests here as well. Looking at the failures here, seems to be mostly topic creation failures?

I'm thinking if we should instead handle the topic creation logic better, maybe something like the following (we might have to verify this once)

public static void createTopic(String topicName, int numOfPartitions, String bootstrapServers) {
      await().atMost(60, TimeUnit.SECONDS).until(() -> {
          try {
              return getAdminClient(bootstrapServers, client -> {
                  NewTopic newTopic = new NewTopic(topicName, numOfPartitions, (short) 1);
                  try {
                      client.createTopics(List.of(newTopic)).all().get(5, TimeUnit.SECONDS);
                  } catch (ExecutionException e) {
                      if ((e.getCause() instanceof TopicExistsException) == false) {
                          throw new RuntimeException(e);
                      }
                  }

                  return checkTopicExistence(client, topicName);
              });
          } catch (Exception e) {
              LOGGER.warn("Topic [{}] is not ready yet", topicName, e);
              return false;
          }
      });
  }

private static boolean checkTopicExistence(AdminClient client, String topicName) throws Exception {
      return client.describeTopics(List.of(topicName))
          .allTopicNames()
          .get(5, TimeUnit.SECONDS)
          .containsKey(topicName);
  }

private static <Rep> Rep getAdminClient(String bootstrapServer, Function<AdminClient, Rep> function) {
      try (
          AdminClient adminClient = KafkaAdminClient.create(
              Map.of(
                  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
                  AdminClientConfig.CLIENT_ID_CONFIG, "test",
                  AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000",
                  AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000"
              )
          )
      ) {
          return function.apply(adminClient);
      }
  }


@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(KafkaPlugin.class, IngestCommonModulePlugin.class, PainlessModulePlugin.class);
Expand Down Expand Up @@ -377,15 +386,15 @@ public void testPipelineWithFieldMappingMapper() throws Exception {
if (response.getHits().getTotalHits().value() < 1) return false;
Map<String, Object> source = response.getHits().getHits()[0].getSourceAsMap();
return Boolean.TRUE.equals(source.get("enriched")) && "alice".equals(source.get("name"));
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);

waitForState(() -> {
refresh(indexName);
SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "user2")).get();
if (response.getHits().getTotalHits().value() < 1) return false;
Map<String, Object> source = response.getHits().getHits()[0].getSourceAsMap();
return Boolean.TRUE.equals(source.get("enriched")) && "bob".equals(source.get("name"));
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);
}

// --- Transformation-specific tests ---
Expand Down Expand Up @@ -463,7 +472,7 @@ public void testCombinedScriptTransformations() throws Exception {
&& !props.containsKey("old_key")
&& "alice".equals(source.get("name"))
&& Integer.valueOf(25).equals(source.get("age"));
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -517,7 +526,7 @@ public void testTransformTypeConversion() throws Exception {
&& (Integer) source.get("age") == 25
&& source.get("timestamp") instanceof Number
&& ((Number) source.get("timestamp")).longValue() == 1739459500000L;
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -560,7 +569,7 @@ public void testTransformGeo() throws Exception {
@SuppressWarnings("unchecked")
Map<String, Object> geoPoint = (Map<String, Object>) location;
return geoPoint.containsKey("lat") && geoPoint.containsKey("lon") && !source.containsKey("lat") && !source.containsKey("lon");
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -647,7 +656,7 @@ public void testFieldMappingWithVersionAndPipeline() throws Exception {
&& "kafka".equals(source.get("source"))
&& !source.containsKey("user_id")
&& !source.containsKey("ts");
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -778,7 +787,7 @@ public void testFieldMappingWithDropPipeline() throws Exception {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats != null && stats.getMessageProcessorStats().totalProcessedCount() >= 2;
});
}, PIPELINE_WAIT_TIMEOUT_MIN, TimeUnit.MINUTES);
refresh(indexName);
SearchResponse response = client().prepareSearch(indexName).get();
assertThat(response.getHits().getTotalHits().value(), is(0L));
Expand Down Expand Up @@ -865,10 +874,11 @@ private void createFieldMappingIndexWithPipeline(
// --- Helper methods ---

private void createPipeline(String pipelineId, String pipelineJson) {
client().admin()
AcknowledgedResponse response = client().admin()
.cluster()
.putPipeline(new PutPipelineRequest(pipelineId, new BytesArray(pipelineJson), MediaTypeRegistry.JSON))
.actionGet();
assertAcked(response);
}

private void createIndexWithPipeline(String pipelineId, int numShards, int numReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,22 @@ protected long getSearchableDocCount(String node) throws Exception {
}

protected void waitForState(Callable<Boolean> checkState) throws Exception {
waitForState(checkState, 1, TimeUnit.MINUTES);
}

/**
* Same as {@link #waitForState(Callable)} but with a caller-supplied timeout.
* Use a longer budget for tests on the critical path
* (Kafka poll → mapper → ingest pipeline → engine → refresh), particularly when
* the pipeline triggers Painless script compilation, which can take several
* seconds under CI load.
*/
protected void waitForState(Callable<Boolean> checkState, long timeout, TimeUnit unit) throws Exception {
assertBusy(() -> {
if (checkState.call() == false) {
fail("Provided state requirements not met");
}
}, 1, TimeUnit.MINUTES);
}, timeout, unit);
}

protected String getSettings(String indexName, String setting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public static void createTopic(String topicName, int numOfPartitions, String boo
}

// validates topic is created
await().atMost(60, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers));
// metadata fetch on a cold testcontainer broker can chew a large slice of the
// budget before the topic even appears, hence keeping 2 min timeout
await().atMost(120, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers));
}

public static boolean checkTopicExistence(String topicName, String bootstrapServers) {
Expand Down
Loading