Skip to content

Fix flaky IngestPipelineFromKafkaIT - bump wait budgets and assertAck pipeline puts#22345

Open
imRishN wants to merge 1 commit into
opensearch-project:mainfrom
imRishN:fix-ingest-pipeline-it-flakes
Open

Fix flaky IngestPipelineFromKafkaIT - bump wait budgets and assertAck pipeline puts#22345
imRishN wants to merge 1 commit into
opensearch-project:mainfrom
imRishN:fix-ingest-pipeline-it-flakes

Conversation

@imRishN

@imRishN imRishN commented Jun 29, 2026

Copy link
Copy Markdown
Member

Description

Fix flaky IngestPipelineFromKafkaIT

Related Issues

Resolves #21357

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

… pipeline puts

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@imRishN imRishN requested a review from a team as a code owner June 29, 2026 10:33
@github-actions github-actions Bot added >test-failure Test failure from CI, local build, etc. autocut flaky-test Random test failure that succeeds on second run labels Jun 29, 2026
@github-actions

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ No major issues detected

@github-actions

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Confirm pipeline visibility after acknowledgement

assertAcked only verifies the ack flag but does not guarantee the pipeline is
visible on all nodes, which can still cause flakiness when documents are ingested
immediately after. Consider also waiting via clusterHealth for cluster state
propagation, or explicitly retrieving the pipeline with GetPipelineRequest to
confirm visibility before returning.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [877-882]

 private void createPipeline(String pipelineId, String pipelineJson) {
     AcknowledgedResponse response = client().admin()
         .cluster()
         .putPipeline(new PutPipelineRequest(pipelineId, new BytesArray(pipelineJson), MediaTypeRegistry.JSON))
         .actionGet();
     assertAcked(response);
+    GetPipelineResponse getResponse = client().admin()
+        .cluster()
+        .getPipeline(new GetPipelineRequest(pipelineId))
+        .actionGet();
+    assertThat(getResponse.pipelines().isEmpty(), is(false));
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion to verify pipeline visibility post-ack could reduce flakiness, but AcknowledgedResponse in cluster operations typically indicates cluster state propagation. The added check is a minor defensive improvement with limited impact.

Low

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 41c8c0d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

* The base class's default 1-minute budget has been observed as borderline;
* doubling it gives headroom without meaningfully slowing the green case.
*/
private static final long PIPELINE_WAIT_TIMEOUT_MIN = 2L;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 minute is used for other tests here as well. Looking at the failures here, seems to be mostly topic creation failures?

I'm thinking if we should instead handle the topic creation logic better, maybe something like the following (we might have to verify this once)

public static void createTopic(String topicName, int numOfPartitions, String bootstrapServers) {
      await().atMost(60, TimeUnit.SECONDS).until(() -> {
          try {
              return getAdminClient(bootstrapServers, client -> {
                  NewTopic newTopic = new NewTopic(topicName, numOfPartitions, (short) 1);
                  try {
                      client.createTopics(List.of(newTopic)).all().get(5, TimeUnit.SECONDS);
                  } catch (ExecutionException e) {
                      if ((e.getCause() instanceof TopicExistsException) == false) {
                          throw new RuntimeException(e);
                      }
                  }

                  return checkTopicExistence(client, topicName);
              });
          } catch (Exception e) {
              LOGGER.warn("Topic [{}] is not ready yet", topicName, e);
              return false;
          }
      });
  }

private static boolean checkTopicExistence(AdminClient client, String topicName) throws Exception {
      return client.describeTopics(List.of(topicName))
          .allTopicNames()
          .get(5, TimeUnit.SECONDS)
          .containsKey(topicName);
  }

private static <Rep> Rep getAdminClient(String bootstrapServer, Function<AdminClient, Rep> function) {
      try (
          AdminClient adminClient = KafkaAdminClient.create(
              Map.of(
                  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
                  AdminClientConfig.CLIENT_ID_CONFIG, "test",
                  AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000",
                  AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000"
              )
          )
      ) {
          return function.apply(adminClient);
      }
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

autocut flaky-test Random test failure that succeeds on second run >test-failure Test failure from CI, local build, etc.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[AUTOCUT] Gradle Check Flaky Test Report for IngestPipelineFromKafkaIT

2 participants