Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions kafka/net/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ class KafkaNetClient:
code depends on. Goal: shrink over time as components transition to using
KafkaConnectionManager directly (fire-and-forget via _request_buffer).
"""
def __init__(self, **configs):
def __init__(self, net=None, manager=None, **configs):
# _lock is still used by the legacy Coordinator (kafka/coordinator/base.py).
# Remove once Coordinator moves to the IO thread (Phase D).
self._lock = threading.RLock()
self._net = NetworkSelector(**configs)
self._manager = KafkaConnectionManager(self._net, **configs)
self._net = NetworkSelector(**configs) if net is None else net
self._manager = KafkaConnectionManager(self._net, **configs) if manager is None else manager

@property
def cluster(self):
Expand Down Expand Up @@ -63,11 +63,11 @@ def await_ready(self, node_id, timeout_ms=30000):
self.maybe_connect(node_id)
conn = self._manager._conns.get(node_id)
if conn is not None and not conn.init_future.is_done:
self._manager.poll(timeout_ms=timeout_ms, future=conn.init_future)
self._net.poll(timeout_ms=timeout_ms, future=conn.init_future)
# Connection may be initialized but paused (e.g. max_in_flight reached).
# Poll briefly to drain in-flight responses and unpause.
if conn is not None and conn.connected and conn.paused:
self._manager.poll(timeout_ms=min(timeout_ms, self._manager.config['request_timeout_ms']))
self._net.poll(timeout_ms=min(timeout_ms, self._manager.config['request_timeout_ms']))
if not self.is_ready(node_id):
raise Errors.KafkaConnectionError('Node %s not ready after %s ms' % (node_id, timeout_ms))
return True
Expand Down Expand Up @@ -116,7 +116,7 @@ def send(self, node_id, request, **kwargs):
def send_and_receive(self, node_id, request, timeout_ms=30000):
self.await_ready(node_id, timeout_ms=timeout_ms)
f = self.send(node_id, request)
self._manager.poll(timeout_ms=timeout_ms, future=f)
self._net.poll(timeout_ms=timeout_ms, future=f)
if f.succeeded():
return f.value
elif f.failed():
Expand All @@ -131,7 +131,7 @@ def poll(self, timeout_ms=None, future=None):
# _net.poll() concurrently and race on selector / task state.
# The lock goes away once HeartbeatThread does (Phase D).
with self._lock:
return self._manager.poll(timeout_ms=timeout_ms, future=future)
return self._net.poll(timeout_ms=timeout_ms, future=future)

def close(self, node_id=None):
self._manager.close(node_id=node_id)
Expand Down
5 changes: 1 addition & 4 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,6 @@ def stop(self, timeout_ms=None):
state['exception'] = Errors.KafkaConnectionError('Manager stopped')
event.set()

def poll(self, timeout_ms=None, future=None):
return self._net.poll(timeout_ms=timeout_ms, future=future)

async def wait_for(self, future, timeout_ms):
"""Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout.

Expand Down Expand Up @@ -460,7 +457,7 @@ def run(self, coro, *args):
"""
if self._io_thread is None:
future = self.call_soon(coro, *args)
self.poll(future=future)
self._net.poll(future=future)
if future.exception is not None:
raise future.exception
return future.value
Expand Down
18 changes: 9 additions & 9 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,23 @@ def multi_broker(broker):


@pytest.fixture
def client(broker):
cli = KafkaNetClient(
bootstrap_servers='%s:%d' % (broker.host, broker.port),
api_version=broker.broker_version,
request_timeout_ms=5000,
)
broker.attach(cli._manager)
def client(net, manager, broker):
cli = KafkaNetClient(net=net, manager=manager)
try:
yield cli
finally:
cli.close()


@pytest.fixture
def manager(broker):
def net():
return NetworkSelector()


@pytest.fixture
def manager(net, broker):
manager = KafkaConnectionManager(
NetworkSelector(),
net,
bootstrap_servers='%s:%d' % (broker.host, broker.port),
api_version=broker.broker_version,
request_timeout_ms=5000,
Expand Down
40 changes: 17 additions & 23 deletions test/consumer/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker):
assert call_soon.call_count == 0


def test__reset_offsets_async(fetcher, mocker):
def test__reset_offsets_async(fetcher, manager, net, mocker):
tp0 = TopicPartition("topic", 0)
tp1 = TopicPartition("topic", 1)
fetcher._subscriptions.subscribe(topics=["topic"])
Expand All @@ -182,22 +182,21 @@ async def fake_send(node_id, timestamps_and_epochs):
return results[node_id]
mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send)

manager = fetcher._client._manager
manager.run(fetcher._reset_offsets_async, {
tp0: OffsetResetStrategy.EARLIEST,
tp1: OffsetResetStrategy.EARLIEST,
})
# _reset_offsets_async is fire-and-forget; drain the spawned per-node tasks
while len(pending) < 2 or fetcher._subscriptions.assignment[tp0].awaiting_reset or fetcher._subscriptions.assignment[tp1].awaiting_reset:
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)

assert not fetcher._subscriptions.assignment[tp0].awaiting_reset
assert not fetcher._subscriptions.assignment[tp1].awaiting_reset
assert fetcher._subscriptions.assignment[tp0].position.offset == 1001
assert fetcher._subscriptions.assignment[tp1].position.offset == 1002


def test__send_list_offsets_requests(fetcher, mocker):
def test__send_list_offsets_requests(fetcher, manager, net, mocker):
tp = TopicPartition("topic_send_list_offsets", 1)

pending = []
Expand All @@ -215,7 +214,6 @@ async def fake_send(node_id, timestamps):
[None, -1], itertools.cycle([0]))
mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0)

manager = fetcher._client._manager

# Leader == None
with pytest.raises(StaleMetadata):
Expand All @@ -230,26 +228,26 @@ async def fake_send(node_id, timestamps):
# Leader == 0, send failed
fut = manager.call_soon(fetcher._send_list_offsets_requests, {tp: 0})
while not pending:
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)
assert not fut.is_done
assert mocked_send.called
pending.pop().failure(NotLeaderForPartitionError(tp))
manager.poll(future=fut)
net.poll(future=fut)
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)

# Leader == 0, send success
fut = manager.call_soon(fetcher._send_list_offsets_requests, {tp: 0})
while not pending:
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)
assert not fut.is_done
pending.pop().success(({tp: (10, 10000)}, set()))
manager.poll(future=fut)
net.poll(future=fut)
assert fut.succeeded()
assert fut.value == ({tp: (10, 10000)}, set())


def test__send_list_offsets_requests_multiple_nodes(fetcher, mocker):
def test__send_list_offsets_requests_multiple_nodes(fetcher, manager, net, mocker):
tp1 = TopicPartition("topic_send_list_offsets", 1)
tp2 = TopicPartition("topic_send_list_offsets", 2)
tp3 = TopicPartition("topic_send_list_offsets", 3)
Expand All @@ -267,11 +265,9 @@ async def fake_send(node_id, timestamps):
mocked_leader.side_effect = itertools.cycle([0, 1])
mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0)

manager = fetcher._client._manager

def wait_for_send_futures(n):
while len(send_futures) < n:
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)

# -- All node succeeded case
tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
Expand All @@ -295,10 +291,10 @@ def wait_for_send_futures(n):
}

# We only resolved 1 future so far, so result future is not yet ready
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)
assert not fut.is_done
second_future.success(({tp2: (12, 1002), tp4: (14, 1004)}, set()))
manager.poll(future=fut)
net.poll(future=fut)
assert fut.succeeded()
assert fut.value == ({tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}, set())

Expand All @@ -308,7 +304,7 @@ def wait_for_send_futures(n):
wait_for_send_futures(2)
send_futures[0][2].success(({tp1: (11, 1001)}, set()))
send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
manager.poll(future=fut)
net.poll(future=fut)
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)

Expand All @@ -318,7 +314,7 @@ def wait_for_send_futures(n):
wait_for_send_futures(2)
send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
send_futures[1][2].success(({tp1: (11, 1001)}, set()))
manager.poll(future=fut)
net.poll(future=fut)
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)

Expand Down Expand Up @@ -657,7 +653,7 @@ def test_partition_records_compacted_offset(mocker):
assert msgs[0].offset == fetch_offset + 1


def test_reset_offsets_paused(subscription_state, client, mocker):
def test_reset_offsets_paused(subscription_state, client, manager, net, mocker):
fetcher = Fetcher(client, subscription_state)
tp = TopicPartition('foo', 0)
subscription_state.assign_from_user([tp])
Expand All @@ -670,18 +666,17 @@ async def fake_send(node_id, timestamps_and_epochs):
mocker.patch.object(fetcher._client, 'ready', return_value=True)
mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send)
mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0)
manager = fetcher._client._manager
manager.run(fetcher._reset_offsets_async, {tp: OffsetResetStrategy.LATEST})
while subscription_state.is_offset_reset_needed(tp):
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)

assert not subscription_state.is_offset_reset_needed(tp)
assert not subscription_state.is_fetchable(tp) # because tp is paused
assert subscription_state.has_valid_position(tp)
assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1)


def test_reset_offsets_paused_without_valid(subscription_state, client, mocker):
def test_reset_offsets_paused_without_valid(subscription_state, client, manager, net, mocker):
fetcher = Fetcher(client, subscription_state)
tp = TopicPartition('foo', 0)
subscription_state.assign_from_user([tp])
Expand All @@ -694,10 +689,9 @@ async def fake_send(node_id, timestamps_and_epochs):
mocker.patch.object(fetcher._client, 'ready', return_value=True)
mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send)
mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0)
manager = fetcher._client._manager
manager.run(fetcher._reset_offsets_async, {tp: OffsetResetStrategy.EARLIEST})
while subscription_state.is_offset_reset_needed(tp):
manager.poll(timeout_ms=10)
net.poll(timeout_ms=10)

assert not subscription_state.is_offset_reset_needed(tp)
assert not subscription_state.is_fetchable(tp) # because tp is paused
Expand Down
12 changes: 6 additions & 6 deletions test/net/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,26 +219,26 @@ def test_connect_to_timeout_fires(self, net):
try:
manager = KafkaConnectionManager(net, bootstrap_servers=['127.0.0.1:%d' % port], socket_connection_timeout_ms=100)
conn = manager.get_connection('bootstrap-0')
manager.poll(timeout_ms=1000, future=conn.init_future)
net.poll(timeout_ms=1000, future=conn.init_future)
assert conn.init_future.is_done
finally:
blocker.close()
listener.close()


class TestKafkaConnectionManagerClose:
def test_close_single_connection(self, manager):
def test_close_single_connection(self, manager, net):
conn = manager.get_connection('bootstrap-0')
assert 'bootstrap-0' in manager._conns
manager.close('bootstrap-0')
assert conn.init_future.is_done

def test_close_all_connections(self, manager):
def test_close_all_connections(self, manager, net):
manager.get_connection('bootstrap-0')
assert len(manager._conns) > 0
manager.close()
# close_future callbacks should remove from _conns
manager.poll(timeout_ms=100)
net.poll(timeout_ms=100)
assert len(manager._conns) == 0

def test_close_nonexistent_node(self, manager):
Expand Down Expand Up @@ -284,12 +284,12 @@ async def bad_coro():
with pytest.raises(ValueError, match='bad_coro'):
manager.run(bad_coro)

def test_call_soon_does_not_raise(self, manager):
def test_call_soon_does_not_raise(self, manager, net):
async def bad_coro():
raise ValueError('bad_coro')
future = manager.call_soon(bad_coro)
assert not future.is_done
manager.poll(future=future)
net.poll(future=future)
assert future.failed()
assert isinstance(future.exception, ValueError)
assert future.exception.args[0] == 'bad_coro'
8 changes: 4 additions & 4 deletions test/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,24 @@ def test_request_update_sets_cluster_need_update(self, cluster):
f = cluster.request_update()
assert cluster._need_update

def test_request_update_sends_metadata_request(self, manager):
def test_request_update_sends_metadata_request(self, manager, net):
manager.bootstrap()
manager.cluster.config['retry_backoff_ms'] = 10 # reduce loop delay when metadata in progress

response = _make_metadata_response(8)
with patch.object(manager, 'send', return_value=Future().success(response)):
f = manager.cluster.request_update()
# Drive the cluster refresh loop
manager.poll(timeout_ms=100, future=f)
net.poll(timeout_ms=100, future=f)
assert manager.send.called

def test_refresh_metadata_retries_no_node(self, manager):
def test_refresh_metadata_retries_no_node(self, manager, net):
# No connected nodes, empty cluster
cluster = manager.cluster
with patch.object(cluster, 'brokers', return_value=[]):
cluster.start_refresh_loop()
f = cluster.request_update()
manager.poll(timeout_ms=0)
net.poll(timeout_ms=0)
# Should not have resolved yet (retry scheduled)
assert not f.is_done
# Should have a scheduled retry
Expand Down
Loading