Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,12 @@ def _poll_once(self, timer, max_records, update_offsets=True):
log.debug('poll: timeout during coordinator.poll(); returning early')
return {}

has_all_fetch_positions = self._refresh_committed_offsets(timeout_ms=timer.timeout_ms)
self._refresh_committed_offsets(timeout_ms=timer.timeout_ms)
# Fire-and-forget: kicks ListOffsets reset for any remaining partitions.
# The result Task is shared across callers via Fetcher._reset_task; we
# don't await it here -- the main fetch wait below drives the loop.
# _reset_offsets_async self-drives metadata refresh + retry-backoff
# within request_timeout_ms; we don't need to wake the user thread
# early to re-trigger it. The result Task is shared across callers
# via Fetcher._reset_task.
self._fetcher.reset_offsets_if_needed()

# If data is available already, e.g. from a previous network client
Expand All @@ -741,22 +743,11 @@ def _poll_once(self, timer, max_records, update_offsets=True):
if records:
return records

# We do not want to be stuck blocking in poll if we are missing some positions
# since the offset lookup may be backing off after a failure
poll_timeout_ms = timer.timeout_ms
if self.config['group_id'] is not None:
poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000)
if not has_all_fetch_positions:
log.debug('poll: do not have all fetch positions...')
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])

self._client.poll(timeout_ms=poll_timeout_ms)
# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
if self._coordinator.need_rejoin():
log.debug('poll: coordinator needs rejoin; returning early')
return {}

records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records

Expand Down
Loading