Fix flaky IngestPipelineFromKafkaIT - bump wait budgets and assertAck pipeline puts#22345
Fix flaky IngestPipelineFromKafkaIT - bump wait budgets and assertAck pipeline puts#22345imRishN wants to merge 1 commit into
Conversation
… pipeline puts Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Explore these optional code suggestions:
|
|
❌ Gradle check result for 41c8c0d: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
| * The base class's default 1-minute budget has been observed as borderline; | ||
| * doubling it gives headroom without meaningfully slowing the green case. | ||
| */ | ||
| private static final long PIPELINE_WAIT_TIMEOUT_MIN = 2L; |
There was a problem hiding this comment.
1 minute is used for other tests here as well. Looking at the failures here, seems to be mostly topic creation failures?
I'm thinking if we should instead handle the topic creation logic better, maybe something like the following (we might have to verify this once)
public static void createTopic(String topicName, int numOfPartitions, String bootstrapServers) {
await().atMost(60, TimeUnit.SECONDS).until(() -> {
try {
return getAdminClient(bootstrapServers, client -> {
NewTopic newTopic = new NewTopic(topicName, numOfPartitions, (short) 1);
try {
client.createTopics(List.of(newTopic)).all().get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
if ((e.getCause() instanceof TopicExistsException) == false) {
throw new RuntimeException(e);
}
}
return checkTopicExistence(client, topicName);
});
} catch (Exception e) {
LOGGER.warn("Topic [{}] is not ready yet", topicName, e);
return false;
}
});
}
private static boolean checkTopicExistence(AdminClient client, String topicName) throws Exception {
return client.describeTopics(List.of(topicName))
.allTopicNames()
.get(5, TimeUnit.SECONDS)
.containsKey(topicName);
}
private static <Rep> Rep getAdminClient(String bootstrapServer, Function<AdminClient, Rep> function) {
try (
AdminClient adminClient = KafkaAdminClient.create(
Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
AdminClientConfig.CLIENT_ID_CONFIG, "test",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000"
)
)
) {
return function.apply(adminClient);
}
}
Description
Fix flaky IngestPipelineFromKafkaIT
Related Issues
Resolves #21357
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.