diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f3d0c4d91..f66e485d6 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -619,11 +619,11 @@ def _fetch_all_topic_metadata(self): cluster = self._client.cluster if cluster.metadata_refresh_in_progress: future = cluster.request_update() - self._client.poll(future=future) + self._manager.run(self._manager.wait_for, future, None) stash = cluster.need_all_topic_metadata cluster.need_all_topic_metadata = True future = cluster.request_update() - self._client.poll(future=future) + self._manager.run(self._manager.wait_for, future, None) cluster.need_all_topic_metadata = stash def topics(self): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cec05faac..eb43a472c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -327,8 +327,10 @@ def poll(self, timeout_ms=None): # description of the problem. if self._subscription.subscribed_pattern: metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update, timeout_ms=timer.timeout_ms) - if not metadata_update.is_done: + try: + self._manager.run( + self._manager.wait_for, metadata_update, timer.timeout_ms) + except Errors.KafkaTimeoutError: log.debug('coordinator.poll: timeout updating metadata; returning early') return False