diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 01acc574e..028b97ba6 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -718,10 +718,12 @@ def _poll_once(self, timer, max_records, update_offsets=True): log.debug('poll: timeout during coordinator.poll(); returning early') return {} - has_all_fetch_positions = self._refresh_committed_offsets(timeout_ms=timer.timeout_ms) + self._refresh_committed_offsets(timeout_ms=timer.timeout_ms) # Fire-and-forget: kicks ListOffsets reset for any remaining partitions. - # The result Task is shared across callers via Fetcher._reset_task; we - # don't await it here -- the main fetch wait below drives the loop. + # _reset_offsets_async self-drives metadata refresh + retry-backoff + # within request_timeout_ms; we don't need to wake the user thread + # early to re-trigger it. The result Task is shared across callers + # via Fetcher._reset_task. self._fetcher.reset_offsets_if_needed() # If data is available already, e.g. from a previous network client @@ -741,22 +743,11 @@ def _poll_once(self, timer, max_records, update_offsets=True): if records: return records - # We do not want to be stuck blocking in poll if we are missing some positions - # since the offset lookup may be backing off after a failure poll_timeout_ms = timer.timeout_ms if self.config['group_id'] is not None: poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000) - if not has_all_fetch_positions: - log.debug('poll: do not have all fetch positions...') - poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms']) self._client.poll(timeout_ms=poll_timeout_ms) - # after the long poll, we should check whether the group needs to rebalance - # prior to returning data so that the group can stabilize faster - if self._coordinator.need_rejoin(): - log.debug('poll: coordinator needs rejoin; returning early') - return {} - records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) return records