diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f66e485d6..60dddf068 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -382,6 +382,7 @@ def __init__(self, *topics, **configs): self._client = self.config['kafka_client'](metrics=self._metrics, **self.config) self._manager = self._client._manager + self._cluster = self._manager.cluster # If api_version was not passed explicitly, bootstrap to auto-discover # it. bootstrap is passed as a deferred coroutine so that once the IO @@ -426,7 +427,7 @@ def __init__(self, *topics, **configs): if topics: self._subscription.subscribe(topics=topics) - self._client.cluster.set_topics(topics) + self._cluster.set_topics(topics) def _validate_group_instance_id(self, group_instance_id): # See also kafka.util.ensure_valid_topic_name @@ -475,7 +476,7 @@ def assign(self, partitions): # are committed since there will be no following rebalance self._coordinator.maybe_auto_commit_offsets_now() self._subscription.assign_from_user(partitions) - self._client.cluster.set_topics([tp.topic for tp in partitions]) + self._cluster.set_topics([tp.topic for tp in partitions]) log.debug("Subscribed to partition(s): %s", partitions) def assignment(self): @@ -616,15 +617,14 @@ def _fetch_all_topic_metadata(self): """A blocking call that fetches topic metadata for all topics in the cluster that the user is authorized to view. """ - cluster = self._client.cluster - if cluster.metadata_refresh_in_progress: - future = cluster.request_update() + if self._cluster.metadata_refresh_in_progress: + future = self._cluster.request_update() self._manager.run(self._manager.wait_for, future, None) - stash = cluster.need_all_topic_metadata - cluster.need_all_topic_metadata = True - future = cluster.request_update() + stash = self._cluster.need_all_topic_metadata + self._cluster.need_all_topic_metadata = True + future = self._cluster.request_update() self._manager.run(self._manager.wait_for, future, None) - cluster.need_all_topic_metadata = stash + self._cluster.need_all_topic_metadata = stash def topics(self): """Get all topics the user is authorized to view. @@ -635,7 +635,7 @@ def topics(self): set: topics """ self._fetch_all_topic_metadata() - return self._client.cluster.topics() + return self._cluster.topics() def partitions_for_topic(self, topic): """This method first checks the local metadata cache for information @@ -650,11 +650,10 @@ def partitions_for_topic(self, topic): Returns: set: Partition ids """ - cluster = self._client.cluster - partitions = cluster.partitions_for_topic(topic) + partitions = self._cluster.partitions_for_topic(topic) if partitions is None: self._fetch_all_topic_metadata() - partitions = cluster.partitions_for_topic(topic) + partitions = self._cluster.partitions_for_topic(topic) return partitions or set() def poll(self, timeout_ms=0, max_records=None, update_offsets=True): @@ -971,13 +970,13 @@ def subscribe(self, topics=(), pattern=None, listener=None): # Regex will need all topic metadata if pattern is not None: - self._client.cluster.need_all_topic_metadata = True - self._client.cluster.set_topics([]) - self._client.cluster.request_update() + self._cluster.need_all_topic_metadata = True + self._cluster.set_topics([]) + self._cluster.request_update() log.debug("Subscribed to topic pattern: %s", pattern) else: - self._client.cluster.need_all_topic_metadata = False - self._client.cluster.set_topics(self._subscription.group_subscription()) + self._cluster.need_all_topic_metadata = False + self._cluster.set_topics(self._subscription.group_subscription()) log.debug("Subscribed to topic(s): %s", topics) def subscription(self): @@ -998,8 +997,8 @@ def unsubscribe(self): self._subscription.unsubscribe() if self.config['api_version'] >= (0, 9): self._coordinator.maybe_leave_group() - self._client.cluster.need_all_topic_metadata = False - self._client.cluster.set_topics([]) + self._cluster.need_all_topic_metadata = False + self._cluster.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") self._iterator = None diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 4544173b6..28a33e5f0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -140,6 +140,7 @@ def __init__(self, client, **configs): self._client = client self._manager = client._manager + self._cluster = self._manager.cluster self.heartbeat = Heartbeat(**self.config) self._heartbeat_wakeup = WakeupNotifier(self._manager._net) self._heartbeat_loop_future = None @@ -323,7 +324,7 @@ async def ensure_coordinator_ready_async(self, timeout_ms=None): raise if exc.invalid_metadata: log.debug('Requesting metadata for group coordinator request: %s', exc) - metadata_update = self._client.cluster.request_update() + metadata_update = self._cluster.request_update() try: await self._manager.wait_for(metadata_update, timer.timeout_ms) except Errors.KafkaTimeoutError: @@ -756,7 +757,7 @@ def _handle_find_coordinator_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: with self._lock: - coordinator_id = self._client.cluster.add_coordinator(response, 'group', self.group_id) + coordinator_id = self._cluster.add_coordinator(response, 'group', self.group_id) if not coordinator_id: # This could happen if coordinator metadata is different # than broker metadata diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index eb43a472c..a4c39a51d 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -97,9 +97,8 @@ def __init__(self, client, subscription, **configs): self._subscription = subscription self._is_leader = False self._joined_subscription = set() - self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) + self._metadata_snapshot = self._build_metadata_snapshot(subscription, self._cluster) self._assignment_snapshot = None - self._cluster = client.cluster self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000 self.next_auto_commit_deadline = None self.completed_offset_commits = collections.deque() @@ -186,7 +185,7 @@ def _handle_metadata_update(self, cluster): if set(topics) != self._subscription.subscription: self._subscription.change_subscription(topics) - self._client.cluster.set_topics(self._subscription.group_subscription()) + self._cluster.set_topics(self._subscription.group_subscription()) # check if there are any changes to the metadata which should trigger # a rebalance @@ -326,7 +325,7 @@ def poll(self, timeout_ms=None): # essentially be ignored. See KAFKA-3949 for the complete # description of the problem. if self._subscription.subscribed_pattern: - metadata_update = self._client.cluster.request_update() + metadata_update = self._cluster.request_update() try: self._manager.run( self._manager.wait_for, metadata_update, timer.timeout_ms) @@ -371,7 +370,7 @@ def _perform_assignment(self, leader_id, protocol_name, members): # Because assignment typically happens within response callbacks, # we cannot block on metadata updates here (no recursion into poll()) self._subscription.group_subscribe(all_subscribed_topics) - self._client.cluster.set_topics(self._subscription.group_subscription()) + self._cluster.set_topics(self._subscription.group_subscription()) # keep track of the metadata used for assignment so that we can check # after rebalance completion whether anything has changed