From 2892b1c47effa6a2c94d437589428b80033175a1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 7 May 2026 07:41:06 -0700 Subject: [PATCH] Consumer: replace client.poll with manager.run for metadata refresh waits --- kafka/consumer/group.py | 4 ++-- kafka/coordinator/consumer.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f3d0c4d91..f66e485d6 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -619,11 +619,11 @@ def _fetch_all_topic_metadata(self): cluster = self._client.cluster if cluster.metadata_refresh_in_progress: future = cluster.request_update() - self._client.poll(future=future) + self._manager.run(self._manager.wait_for, future, None) stash = cluster.need_all_topic_metadata cluster.need_all_topic_metadata = True future = cluster.request_update() - self._client.poll(future=future) + self._manager.run(self._manager.wait_for, future, None) cluster.need_all_topic_metadata = stash def topics(self): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cec05faac..eb43a472c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -327,8 +327,10 @@ def poll(self, timeout_ms=None): # description of the problem. if self._subscription.subscribed_pattern: metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update, timeout_ms=timer.timeout_ms) - if not metadata_update.is_done: + try: + self._manager.run( + self._manager.wait_for, metadata_update, timer.timeout_ms) + except Errors.KafkaTimeoutError: log.debug('coordinator.poll: timeout updating metadata; returning early') return False