Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ def __init__(self, *topics, **configs):

self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)
self._manager = self._client._manager
self._cluster = self._manager.cluster

# If api_version was not passed explicitly, bootstrap to auto-discover
# it. bootstrap is passed as a deferred coroutine so that once the IO
Expand Down Expand Up @@ -426,7 +427,7 @@ def __init__(self, *topics, **configs):

if topics:
self._subscription.subscribe(topics=topics)
self._client.cluster.set_topics(topics)
self._cluster.set_topics(topics)

def _validate_group_instance_id(self, group_instance_id):
# See also kafka.util.ensure_valid_topic_name
Expand Down Expand Up @@ -475,7 +476,7 @@ def assign(self, partitions):
# are committed since there will be no following rebalance
self._coordinator.maybe_auto_commit_offsets_now()
self._subscription.assign_from_user(partitions)
self._client.cluster.set_topics([tp.topic for tp in partitions])
self._cluster.set_topics([tp.topic for tp in partitions])
log.debug("Subscribed to partition(s): %s", partitions)

def assignment(self):
Expand Down Expand Up @@ -616,15 +617,14 @@ def _fetch_all_topic_metadata(self):
"""A blocking call that fetches topic metadata for all topics in the
cluster that the user is authorized to view.
"""
cluster = self._client.cluster
if cluster.metadata_refresh_in_progress:
future = cluster.request_update()
if self._cluster.metadata_refresh_in_progress:
future = self._cluster.request_update()
self._manager.run(self._manager.wait_for, future, None)
stash = cluster.need_all_topic_metadata
cluster.need_all_topic_metadata = True
future = cluster.request_update()
stash = self._cluster.need_all_topic_metadata
self._cluster.need_all_topic_metadata = True
future = self._cluster.request_update()
self._manager.run(self._manager.wait_for, future, None)
cluster.need_all_topic_metadata = stash
self._cluster.need_all_topic_metadata = stash

def topics(self):
"""Get all topics the user is authorized to view.
Expand All @@ -635,7 +635,7 @@ def topics(self):
set: topics
"""
self._fetch_all_topic_metadata()
return self._client.cluster.topics()
return self._cluster.topics()

def partitions_for_topic(self, topic):
"""This method first checks the local metadata cache for information
Expand All @@ -650,11 +650,10 @@ def partitions_for_topic(self, topic):
Returns:
set: Partition ids
"""
cluster = self._client.cluster
partitions = cluster.partitions_for_topic(topic)
partitions = self._cluster.partitions_for_topic(topic)
if partitions is None:
self._fetch_all_topic_metadata()
partitions = cluster.partitions_for_topic(topic)
partitions = self._cluster.partitions_for_topic(topic)
return partitions or set()

def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
Expand Down Expand Up @@ -971,13 +970,13 @@ def subscribe(self, topics=(), pattern=None, listener=None):

# Regex will need all topic metadata
if pattern is not None:
self._client.cluster.need_all_topic_metadata = True
self._client.cluster.set_topics([])
self._client.cluster.request_update()
self._cluster.need_all_topic_metadata = True
self._cluster.set_topics([])
self._cluster.request_update()
log.debug("Subscribed to topic pattern: %s", pattern)
else:
self._client.cluster.need_all_topic_metadata = False
self._client.cluster.set_topics(self._subscription.group_subscription())
self._cluster.need_all_topic_metadata = False
self._cluster.set_topics(self._subscription.group_subscription())
log.debug("Subscribed to topic(s): %s", topics)

def subscription(self):
Expand All @@ -998,8 +997,8 @@ def unsubscribe(self):
self._subscription.unsubscribe()
if self.config['api_version'] >= (0, 9):
self._coordinator.maybe_leave_group()
self._client.cluster.need_all_topic_metadata = False
self._client.cluster.set_topics([])
self._cluster.need_all_topic_metadata = False
self._cluster.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
self._iterator = None

Expand Down
5 changes: 3 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def __init__(self, client, **configs):

self._client = client
self._manager = client._manager
self._cluster = self._manager.cluster
self.heartbeat = Heartbeat(**self.config)
self._heartbeat_wakeup = WakeupNotifier(self._manager._net)
self._heartbeat_loop_future = None
Expand Down Expand Up @@ -323,7 +324,7 @@ async def ensure_coordinator_ready_async(self, timeout_ms=None):
raise
if exc.invalid_metadata:
log.debug('Requesting metadata for group coordinator request: %s', exc)
metadata_update = self._client.cluster.request_update()
metadata_update = self._cluster.request_update()
try:
await self._manager.wait_for(metadata_update, timer.timeout_ms)
except Errors.KafkaTimeoutError:
Expand Down Expand Up @@ -756,7 +757,7 @@ def _handle_find_coordinator_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
with self._lock:
coordinator_id = self._client.cluster.add_coordinator(response, 'group', self.group_id)
coordinator_id = self._cluster.add_coordinator(response, 'group', self.group_id)
if not coordinator_id:
# This could happen if coordinator metadata is different
# than broker metadata
Expand Down
9 changes: 4 additions & 5 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ def __init__(self, client, subscription, **configs):
self._subscription = subscription
self._is_leader = False
self._joined_subscription = set()
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._metadata_snapshot = self._build_metadata_snapshot(subscription, self._cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000
self.next_auto_commit_deadline = None
self.completed_offset_commits = collections.deque()
Expand Down Expand Up @@ -186,7 +185,7 @@ def _handle_metadata_update(self, cluster):

if set(topics) != self._subscription.subscription:
self._subscription.change_subscription(topics)
self._client.cluster.set_topics(self._subscription.group_subscription())
self._cluster.set_topics(self._subscription.group_subscription())

# check if there are any changes to the metadata which should trigger
# a rebalance
Expand Down Expand Up @@ -326,7 +325,7 @@ def poll(self, timeout_ms=None):
# essentially be ignored. See KAFKA-3949 for the complete
# description of the problem.
if self._subscription.subscribed_pattern:
metadata_update = self._client.cluster.request_update()
metadata_update = self._cluster.request_update()
try:
self._manager.run(
self._manager.wait_for, metadata_update, timer.timeout_ms)
Expand Down Expand Up @@ -371,7 +370,7 @@ def _perform_assignment(self, leader_id, protocol_name, members):
# Because assignment typically happens within response callbacks,
# we cannot block on metadata updates here (no recursion into poll())
self._subscription.group_subscribe(all_subscribed_topics)
self._client.cluster.set_topics(self._subscription.group_subscription())
self._cluster.set_topics(self._subscription.group_subscription())

# keep track of the metadata used for assignment so that we can check
# after rebalance completion whether anything has changed
Expand Down
Loading