1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+ package org .apache .kafka .clients .admin ;
18+
19+ import org .apache .kafka .clients .producer .Producer ;
20+ import org .apache .kafka .clients .producer .ProducerRecord ;
21+ import org .apache .kafka .common .Node ;
22+ import org .apache .kafka .common .TopicPartition ;
23+ import org .apache .kafka .common .errors .NotLeaderOrFollowerException ;
24+ import org .apache .kafka .common .test .ClusterInstance ;
25+ import org .apache .kafka .common .test .api .ClusterTest ;
26+ import org .apache .kafka .common .test .api .ClusterTestDefaults ;
27+ import org .apache .kafka .test .TestUtils ;
28+
29+ import org .junit .jupiter .api .BeforeEach ;
30+
31+ import java .util .List ;
32+
33+ import static org .junit .jupiter .api .Assertions .assertEquals ;
34+ import static org .junit .jupiter .api .Assertions .assertFalse ;
35+ import static org .junit .jupiter .api .Assertions .assertNotNull ;
36+
37+
38+ @ ClusterTestDefaults (
39+ brokers = 3
40+ )
41+ class DescribeProducersWithBrokerIdTest {
42+ private static final String TOPIC_NAME = "test-topic" ;
43+ private static final int NUM_PARTITIONS = 1 ;
44+ private static final short REPLICATION_FACTOR = 3 ;
45+
46+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition (TOPIC_NAME , 0 );
47+
48+ private final ClusterInstance clusterInstance ;
49+
50+ public DescribeProducersWithBrokerIdTest (ClusterInstance clusterInstance ) {
51+ this .clusterInstance = clusterInstance ;
52+ }
53+
54+ private static void sendTestRecords (Producer <byte [], byte []> producer ) {
55+ producer .send (new ProducerRecord <>(TOPIC_NAME , TOPIC_PARTITION .partition (), "key-0" .getBytes (), "value-0" .getBytes ()));
56+ producer .flush ();
57+ }
58+
59+ @ BeforeEach
60+ void setUp () throws InterruptedException {
61+ clusterInstance .createTopic (TOPIC_NAME , NUM_PARTITIONS , REPLICATION_FACTOR );
62+ }
63+
64+ private List <Integer > getReplicaBrokerIds (Admin admin ) throws Exception {
65+ var topicDescription = admin .describeTopics (List .of (TOPIC_PARTITION .topic ())).allTopicNames ().get ().get (TOPIC_PARTITION .topic ());
66+ return topicDescription .partitions ().get (TOPIC_PARTITION .partition ()).replicas ().stream ()
67+ .map (Node ::id )
68+ .toList ();
69+ }
70+
71+ private int getNonReplicaBrokerId (Admin admin ) throws Exception {
72+ var replicaBrokerIds = getReplicaBrokerIds (admin );
73+ return clusterInstance .brokerIds ().stream ()
74+ .filter (id -> !replicaBrokerIds .contains (id ))
75+ .findFirst ()
76+ .orElseThrow (() -> new IllegalStateException ("No non-replica broker found" ));
77+ }
78+
79+ private int getFollowerBrokerId (Admin admin ) throws Exception {
80+ var replicaBrokerIds = getReplicaBrokerIds (admin );
81+ var leaderBrokerId = clusterInstance .getLeaderBrokerId (TOPIC_PARTITION );
82+ return replicaBrokerIds .stream ()
83+ .filter (id -> id != leaderBrokerId )
84+ .findFirst ()
85+ .orElseThrow (() -> new IllegalStateException ("No follower found for partition " + TOPIC_PARTITION ));
86+ }
87+
88+ @ ClusterTest
89+ void testDescribeProducersDefaultRoutesToLeader () throws Exception {
90+ try (Producer <byte [], byte []> producer = clusterInstance .producer ();
91+ var admin = clusterInstance .admin ()) {
92+ sendTestRecords (producer );
93+
94+ var stateWithExplicitLeader = admin .describeProducers (
95+ List .of (TOPIC_PARTITION ),
96+ new DescribeProducersOptions ().brokerId (clusterInstance .getLeaderBrokerId (TOPIC_PARTITION ))
97+ ).partitionResult (TOPIC_PARTITION ).get ();
98+
99+ var stateWithDefaultRouting = admin .describeProducers (
100+ List .of (TOPIC_PARTITION )
101+ ).partitionResult (TOPIC_PARTITION ).get ();
102+
103+ assertNotNull (stateWithDefaultRouting );
104+ assertFalse (stateWithDefaultRouting .activeProducers ().isEmpty ());
105+ assertEquals (stateWithExplicitLeader .activeProducers (), stateWithDefaultRouting .activeProducers ());
106+ }
107+ }
108+
109+ @ ClusterTest
110+ void testDescribeProducersFromFollower () throws Exception {
111+ try (Producer <byte [], byte []> producer = clusterInstance .producer ();
112+ var admin = clusterInstance .admin ()) {
113+ sendTestRecords (producer );
114+
115+ var followerState = admin .describeProducers (
116+ List .of (TOPIC_PARTITION ),
117+ new DescribeProducersOptions ().brokerId (getFollowerBrokerId (admin ))
118+ ).partitionResult (TOPIC_PARTITION ).get ();
119+
120+ var leaderState = admin .describeProducers (
121+ List .of (TOPIC_PARTITION )
122+ ).partitionResult (TOPIC_PARTITION ).get ();
123+
124+ assertNotNull (followerState );
125+ assertFalse (followerState .activeProducers ().isEmpty ());
126+ assertEquals (leaderState .activeProducers (), followerState .activeProducers ());
127+ }
128+ }
129+
130+ @ ClusterTest (brokers = 4 )
131+ void testDescribeProducersWithInvalidBrokerId () throws Exception {
132+ try (Producer <byte [], byte []> producer = clusterInstance .producer ();
133+ var admin = clusterInstance .admin ()) {
134+ sendTestRecords (producer );
135+
136+ TestUtils .assertFutureThrows (NotLeaderOrFollowerException .class ,
137+ admin .describeProducers (
138+ List .of (TOPIC_PARTITION ),
139+ new DescribeProducersOptions ().brokerId (getNonReplicaBrokerId (admin ))
140+ ).partitionResult (TOPIC_PARTITION ));
141+ }
142+ }
143+ }
0 commit comments