diff --git a/kafka/net/compat.py b/kafka/net/compat.py index fe71cd511..8297d2786 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -18,12 +18,12 @@ class KafkaNetClient: code depends on. Goal: shrink over time as components transition to using KafkaConnectionManager directly (fire-and-forget via _request_buffer). """ - def __init__(self, **configs): + def __init__(self, net=None, manager=None, **configs): # _lock is still used by the legacy Coordinator (kafka/coordinator/base.py). # Remove once Coordinator moves to the IO thread (Phase D). self._lock = threading.RLock() - self._net = NetworkSelector(**configs) - self._manager = KafkaConnectionManager(self._net, **configs) + self._net = NetworkSelector(**configs) if net is None else net + self._manager = KafkaConnectionManager(self._net, **configs) if manager is None else manager @property def cluster(self): @@ -63,11 +63,11 @@ def await_ready(self, node_id, timeout_ms=30000): self.maybe_connect(node_id) conn = self._manager._conns.get(node_id) if conn is not None and not conn.init_future.is_done: - self._manager.poll(timeout_ms=timeout_ms, future=conn.init_future) + self._net.poll(timeout_ms=timeout_ms, future=conn.init_future) # Connection may be initialized but paused (e.g. max_in_flight reached). # Poll briefly to drain in-flight responses and unpause. if conn is not None and conn.connected and conn.paused: - self._manager.poll(timeout_ms=min(timeout_ms, self._manager.config['request_timeout_ms'])) + self._net.poll(timeout_ms=min(timeout_ms, self._manager.config['request_timeout_ms'])) if not self.is_ready(node_id): raise Errors.KafkaConnectionError('Node %s not ready after %s ms' % (node_id, timeout_ms)) return True @@ -116,7 +116,7 @@ def send(self, node_id, request, **kwargs): def send_and_receive(self, node_id, request, timeout_ms=30000): self.await_ready(node_id, timeout_ms=timeout_ms) f = self.send(node_id, request) - self._manager.poll(timeout_ms=timeout_ms, future=f) + self._net.poll(timeout_ms=timeout_ms, future=f) if f.succeeded(): return f.value elif f.failed(): @@ -131,7 +131,7 @@ def poll(self, timeout_ms=None, future=None): # _net.poll() concurrently and race on selector / task state. # The lock goes away once HeartbeatThread does (Phase D). with self._lock: - return self._manager.poll(timeout_ms=timeout_ms, future=future) + return self._net.poll(timeout_ms=timeout_ms, future=future) def close(self, node_id=None): self._manager.close(node_id=node_id) diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 13033d836..8d5d06701 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -377,9 +377,6 @@ def stop(self, timeout_ms=None): state['exception'] = Errors.KafkaConnectionError('Manager stopped') event.set() - def poll(self, timeout_ms=None, future=None): - return self._net.poll(timeout_ms=timeout_ms, future=future) - async def wait_for(self, future, timeout_ms): """Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout. @@ -460,7 +457,7 @@ def run(self, coro, *args): """ if self._io_thread is None: future = self.call_soon(coro, *args) - self.poll(future=future) + self._net.poll(future=future) if future.exception is not None: raise future.exception return future.value diff --git a/test/conftest.py b/test/conftest.py index 43580f4bb..1f4e098b8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -41,13 +41,8 @@ def multi_broker(broker): @pytest.fixture -def client(broker): - cli = KafkaNetClient( - bootstrap_servers='%s:%d' % (broker.host, broker.port), - api_version=broker.broker_version, - request_timeout_ms=5000, - ) - broker.attach(cli._manager) +def client(net, manager, broker): + cli = KafkaNetClient(net=net, manager=manager) try: yield cli finally: @@ -55,9 +50,14 @@ def client(broker): @pytest.fixture -def manager(broker): +def net(): + return NetworkSelector() + + +@pytest.fixture +def manager(net, broker): manager = KafkaConnectionManager( - NetworkSelector(), + net, bootstrap_servers='%s:%d' % (broker.host, broker.port), api_version=broker.broker_version, request_timeout_ms=5000, diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index 05d05db50..50e678beb 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -161,7 +161,7 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker): assert call_soon.call_count == 0 -def test__reset_offsets_async(fetcher, mocker): +def test__reset_offsets_async(fetcher, manager, net, mocker): tp0 = TopicPartition("topic", 0) tp1 = TopicPartition("topic", 1) fetcher._subscriptions.subscribe(topics=["topic"]) @@ -182,14 +182,13 @@ async def fake_send(node_id, timestamps_and_epochs): return results[node_id] mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send) - manager = fetcher._client._manager manager.run(fetcher._reset_offsets_async, { tp0: OffsetResetStrategy.EARLIEST, tp1: OffsetResetStrategy.EARLIEST, }) # _reset_offsets_async is fire-and-forget; drain the spawned per-node tasks while len(pending) < 2 or fetcher._subscriptions.assignment[tp0].awaiting_reset or fetcher._subscriptions.assignment[tp1].awaiting_reset: - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) assert not fetcher._subscriptions.assignment[tp0].awaiting_reset assert not fetcher._subscriptions.assignment[tp1].awaiting_reset @@ -197,7 +196,7 @@ async def fake_send(node_id, timestamps_and_epochs): assert fetcher._subscriptions.assignment[tp1].position.offset == 1002 -def test__send_list_offsets_requests(fetcher, mocker): +def test__send_list_offsets_requests(fetcher, manager, net, mocker): tp = TopicPartition("topic_send_list_offsets", 1) pending = [] @@ -215,7 +214,6 @@ async def fake_send(node_id, timestamps): [None, -1], itertools.cycle([0])) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) - manager = fetcher._client._manager # Leader == None with pytest.raises(StaleMetadata): @@ -230,26 +228,26 @@ async def fake_send(node_id, timestamps): # Leader == 0, send failed fut = manager.call_soon(fetcher._send_list_offsets_requests, {tp: 0}) while not pending: - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) assert not fut.is_done assert mocked_send.called pending.pop().failure(NotLeaderForPartitionError(tp)) - manager.poll(future=fut) + net.poll(future=fut) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) # Leader == 0, send success fut = manager.call_soon(fetcher._send_list_offsets_requests, {tp: 0}) while not pending: - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) assert not fut.is_done pending.pop().success(({tp: (10, 10000)}, set())) - manager.poll(future=fut) + net.poll(future=fut) assert fut.succeeded() assert fut.value == ({tp: (10, 10000)}, set()) -def test__send_list_offsets_requests_multiple_nodes(fetcher, mocker): +def test__send_list_offsets_requests_multiple_nodes(fetcher, manager, net, mocker): tp1 = TopicPartition("topic_send_list_offsets", 1) tp2 = TopicPartition("topic_send_list_offsets", 2) tp3 = TopicPartition("topic_send_list_offsets", 3) @@ -267,11 +265,9 @@ async def fake_send(node_id, timestamps): mocked_leader.side_effect = itertools.cycle([0, 1]) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) - manager = fetcher._client._manager - def wait_for_send_futures(n): while len(send_futures) < n: - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) # -- All node succeeded case tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) @@ -295,10 +291,10 @@ def wait_for_send_futures(n): } # We only resolved 1 future so far, so result future is not yet ready - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) assert not fut.is_done second_future.success(({tp2: (12, 1002), tp4: (14, 1004)}, set())) - manager.poll(future=fut) + net.poll(future=fut) assert fut.succeeded() assert fut.value == ({tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}, set()) @@ -308,7 +304,7 @@ def wait_for_send_futures(n): wait_for_send_futures(2) send_futures[0][2].success(({tp1: (11, 1001)}, set())) send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) - manager.poll(future=fut) + net.poll(future=fut) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) @@ -318,7 +314,7 @@ def wait_for_send_futures(n): wait_for_send_futures(2) send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) send_futures[1][2].success(({tp1: (11, 1001)}, set())) - manager.poll(future=fut) + net.poll(future=fut) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) @@ -657,7 +653,7 @@ def test_partition_records_compacted_offset(mocker): assert msgs[0].offset == fetch_offset + 1 -def test_reset_offsets_paused(subscription_state, client, mocker): +def test_reset_offsets_paused(subscription_state, client, manager, net, mocker): fetcher = Fetcher(client, subscription_state) tp = TopicPartition('foo', 0) subscription_state.assign_from_user([tp]) @@ -670,10 +666,9 @@ async def fake_send(node_id, timestamps_and_epochs): mocker.patch.object(fetcher._client, 'ready', return_value=True) mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send) mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - manager = fetcher._client._manager manager.run(fetcher._reset_offsets_async, {tp: OffsetResetStrategy.LATEST}) while subscription_state.is_offset_reset_needed(tp): - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused @@ -681,7 +676,7 @@ async def fake_send(node_id, timestamps_and_epochs): assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) -def test_reset_offsets_paused_without_valid(subscription_state, client, mocker): +def test_reset_offsets_paused_without_valid(subscription_state, client, manager, net, mocker): fetcher = Fetcher(client, subscription_state) tp = TopicPartition('foo', 0) subscription_state.assign_from_user([tp]) @@ -694,10 +689,9 @@ async def fake_send(node_id, timestamps_and_epochs): mocker.patch.object(fetcher._client, 'ready', return_value=True) mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send) mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - manager = fetcher._client._manager manager.run(fetcher._reset_offsets_async, {tp: OffsetResetStrategy.EARLIEST}) while subscription_state.is_offset_reset_needed(tp): - manager.poll(timeout_ms=10) + net.poll(timeout_ms=10) assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused diff --git a/test/net/test_manager.py b/test/net/test_manager.py index 1af7051b9..7821cf960 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -219,7 +219,7 @@ def test_connect_to_timeout_fires(self, net): try: manager = KafkaConnectionManager(net, bootstrap_servers=['127.0.0.1:%d' % port], socket_connection_timeout_ms=100) conn = manager.get_connection('bootstrap-0') - manager.poll(timeout_ms=1000, future=conn.init_future) + net.poll(timeout_ms=1000, future=conn.init_future) assert conn.init_future.is_done finally: blocker.close() @@ -227,18 +227,18 @@ def test_connect_to_timeout_fires(self, net): class TestKafkaConnectionManagerClose: - def test_close_single_connection(self, manager): + def test_close_single_connection(self, manager, net): conn = manager.get_connection('bootstrap-0') assert 'bootstrap-0' in manager._conns manager.close('bootstrap-0') assert conn.init_future.is_done - def test_close_all_connections(self, manager): + def test_close_all_connections(self, manager, net): manager.get_connection('bootstrap-0') assert len(manager._conns) > 0 manager.close() # close_future callbacks should remove from _conns - manager.poll(timeout_ms=100) + net.poll(timeout_ms=100) assert len(manager._conns) == 0 def test_close_nonexistent_node(self, manager): @@ -284,12 +284,12 @@ async def bad_coro(): with pytest.raises(ValueError, match='bad_coro'): manager.run(bad_coro) - def test_call_soon_does_not_raise(self, manager): + def test_call_soon_does_not_raise(self, manager, net): async def bad_coro(): raise ValueError('bad_coro') future = manager.call_soon(bad_coro) assert not future.is_done - manager.poll(future=future) + net.poll(future=future) assert future.failed() assert isinstance(future.exception, ValueError) assert future.exception.args[0] == 'bad_coro' diff --git a/test/test_cluster.py b/test/test_cluster.py index 85c6245db..e6c9633b2 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -194,7 +194,7 @@ def test_request_update_sets_cluster_need_update(self, cluster): f = cluster.request_update() assert cluster._need_update - def test_request_update_sends_metadata_request(self, manager): + def test_request_update_sends_metadata_request(self, manager, net): manager.bootstrap() manager.cluster.config['retry_backoff_ms'] = 10 # reduce loop delay when metadata in progress @@ -202,16 +202,16 @@ def test_request_update_sends_metadata_request(self, manager): with patch.object(manager, 'send', return_value=Future().success(response)): f = manager.cluster.request_update() # Drive the cluster refresh loop - manager.poll(timeout_ms=100, future=f) + net.poll(timeout_ms=100, future=f) assert manager.send.called - def test_refresh_metadata_retries_no_node(self, manager): + def test_refresh_metadata_retries_no_node(self, manager, net): # No connected nodes, empty cluster cluster = manager.cluster with patch.object(cluster, 'brokers', return_value=[]): cluster.start_refresh_loop() f = cluster.request_update() - manager.poll(timeout_ms=0) + net.poll(timeout_ms=0) # Should not have resolved yet (retry scheduled) assert not f.is_done # Should have a scheduled retry