Skip to content

Commit 6247fd9

Browse files
authored
KAFKA-19478 [3/N]: Use heaps to discover the least loaded process (apache#20172)
The original implementation uses a linear search to find the least loaded process in O(n), and we can replace this by look-ups in a heap is O(log(n)), as described below Active tasks: For active tasks, we can do exactly the same assignment as in the original algorithm by first building a heap (by load) of all processes. When we assign a task, we pick the head off the heap, assign the task to it, update the load, and re-insert it into the heap in O(log(n)). Standby tasks: For standby tasks, we cannot do this optimization directly, because of the order in which we assign tasks: 1. We first try to assign task A to a process that previously owned A. 2. If we did not find such a process, we assign A to the least loaded node. 3. We now try to assign task B to a process that previously owned B 4. If we did not find such a process, we assign B to the least loaded node ... The problem is that we cannot efficiently keep a heap (by load) throughout this process, because finding and removing process that previously owned A (and B and…) in the heap is O(n). We therefore need to change the order of evaluation to be able to use a heap: 1. Try to assign all tasks A, B.. to a process that previously owned the task 2. Build a heap. 3. Assign all remaining tasks to the least-loaded process that does not yet own the task. Since at most NumStandbyReplicas already own the task, we can do it by removing up to NumStandbyReplicas from the top of the heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas). Note that the change in order changes the resulting standby assignments (although this difference does not show up in the existing unit tests). I would argue that the new order of assignment will actually yield better assignments, since the assignment will be more sticky, which has the potential to reduce the amount of store we have to restore from the changelog topic after assingments. In our worst-performing benchmark, this improves the runtime by ~107x. Reviewers: Bill Bejeck<bbejeck@apache.org>
1 parent 4b9075b commit 6247fd9

6 files changed

Lines changed: 452 additions & 112 deletions

File tree

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java

Lines changed: 154 additions & 105 deletions
Large diffs are not rendered by default.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,265 @@ public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExisting
833833
assertEquals(2, getAllActiveTaskIds(result, "newMember").size());
834834
}
835835

836+
@Test
837+
public void shouldHandleLargeNumberOfTasksWithStandbyAssignment() {
838+
final int numTasks = 100;
839+
final int numClients = 5;
840+
final int numStandbyReplicas = 2;
841+
842+
Map<String, AssignmentMemberSpec> members = new HashMap<>();
843+
for (int i = 0; i < numClients; i++) {
844+
members.put("member" + i, createAssignmentMemberSpec("process" + i));
845+
}
846+
847+
GroupAssignment result = assignor.assign(
848+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))),
849+
new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology"))
850+
);
851+
852+
// Verify all active tasks are assigned
853+
Set<Integer> allActiveTasks = new HashSet<>();
854+
for (String memberId : result.members().keySet()) {
855+
List<Integer> memberActiveTasks = getAllActiveTaskIds(result, memberId);
856+
allActiveTasks.addAll(memberActiveTasks);
857+
}
858+
assertEquals(numTasks, allActiveTasks.size());
859+
860+
// Verify standby tasks are assigned (should be numTasks * numStandbyReplicas total)
861+
Set<Integer> allStandbyTasks = new HashSet<>();
862+
for (String memberId : result.members().keySet()) {
863+
List<Integer> memberStandbyTasks = getAllStandbyTaskIds(result, memberId);
864+
allStandbyTasks.addAll(memberStandbyTasks);
865+
}
866+
// With 5 clients and 2 standby replicas, we should have at least some standby tasks
867+
assertTrue(allStandbyTasks.size() > 0, "Should have some standby tasks assigned");
868+
// Maximum possible = numTasks * min(numStandbyReplicas, numClients - 1) = 100 * 2 = 200
869+
int maxPossibleStandbyTasks = numTasks * Math.min(numStandbyReplicas, numClients - 1);
870+
assertTrue(allStandbyTasks.size() <= maxPossibleStandbyTasks,
871+
"Should not exceed maximum possible standby tasks: " + maxPossibleStandbyTasks);
872+
873+
// Verify no client has both active and standby for the same task
874+
for (String memberId : result.members().keySet()) {
875+
Set<Integer> memberActiveTasks = new HashSet<>(getAllActiveTaskIds(result, memberId));
876+
Set<Integer> memberStandbyTasks = new HashSet<>(getAllStandbyTaskIds(result, memberId));
877+
memberActiveTasks.retainAll(memberStandbyTasks);
878+
assertTrue(memberActiveTasks.isEmpty(), "Client " + memberId + " has both active and standby for same task");
879+
}
880+
881+
// Verify load distribution is reasonable
882+
int minActiveTasks = Integer.MAX_VALUE;
883+
int maxActiveTasks = 0;
884+
for (String memberId : result.members().keySet()) {
885+
int activeTaskCount = getAllActiveTaskCount(result, memberId);
886+
minActiveTasks = Math.min(minActiveTasks, activeTaskCount);
887+
maxActiveTasks = Math.max(maxActiveTasks, activeTaskCount);
888+
}
889+
// With 100 tasks and 5 clients, each should have 20 tasks
890+
assertEquals(20, minActiveTasks);
891+
assertEquals(20, maxActiveTasks);
892+
893+
// Verify standby task distribution is reasonable
894+
int minStandbyTasks = Integer.MAX_VALUE;
895+
int maxStandbyTasks = 0;
896+
for (String memberId : result.members().keySet()) {
897+
int standbyTaskCount = getAllStandbyTaskIds(result, memberId).size();
898+
minStandbyTasks = Math.min(minStandbyTasks, standbyTaskCount);
899+
maxStandbyTasks = Math.max(maxStandbyTasks, standbyTaskCount);
900+
}
901+
// Each client should have some standby tasks, but not necessarily equal distribution
902+
assertTrue(minStandbyTasks >= 0);
903+
assertTrue(maxStandbyTasks > 0);
904+
}
905+
906+
@Test
907+
public void shouldHandleOddNumberOfClientsWithStandbyTasks() {
908+
// Test with odd number of clients (7) and even number of tasks (14)
909+
final int numTasks = 14;
910+
final int numClients = 7;
911+
final int numStandbyReplicas = 1;
912+
913+
Map<String, AssignmentMemberSpec> members = new HashMap<>();
914+
for (int i = 0; i < numClients; i++) {
915+
members.put("member" + i, createAssignmentMemberSpec("process" + i));
916+
}
917+
918+
GroupAssignment result = assignor.assign(
919+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))),
920+
new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology"))
921+
);
922+
923+
// Verify all active tasks are assigned
924+
Set<Integer> allActiveTasks = new HashSet<>();
925+
for (String memberId : result.members().keySet()) {
926+
List<Integer> memberActiveTasks = getAllActiveTaskIds(result, memberId);
927+
allActiveTasks.addAll(memberActiveTasks);
928+
}
929+
assertEquals(numTasks, allActiveTasks.size());
930+
931+
// Verify standby tasks are assigned
932+
Set<Integer> allStandbyTasks = new HashSet<>();
933+
for (String memberId : result.members().keySet()) {
934+
List<Integer> memberStandbyTasks = getAllStandbyTaskIds(result, memberId);
935+
allStandbyTasks.addAll(memberStandbyTasks);
936+
}
937+
assertEquals(numTasks * numStandbyReplicas, allStandbyTasks.size());
938+
939+
// With 14 tasks and 7 clients, each client should have 2 active tasks
940+
int expectedTasksPerClient = numTasks / numClients; // 14 / 7 = 2
941+
int remainder = numTasks % numClients; // 14 % 7 = 0
942+
943+
int clientsWithExpectedTasks = 0;
944+
int clientsWithOneMoreTask = 0;
945+
for (String memberId : result.members().keySet()) {
946+
int activeTaskCount = getAllActiveTaskCount(result, memberId);
947+
if (activeTaskCount == expectedTasksPerClient) {
948+
clientsWithExpectedTasks++;
949+
} else if (activeTaskCount == expectedTasksPerClient + 1) {
950+
clientsWithOneMoreTask++;
951+
}
952+
}
953+
assertEquals(numClients - remainder, clientsWithExpectedTasks); // 7 clients should have 2 tasks
954+
assertEquals(remainder, clientsWithOneMoreTask); // 0 clients should have 3 tasks
955+
}
956+
957+
@Test
958+
public void shouldHandleHighStandbyReplicaCount() {
959+
// Test with high number of standby replicas (5) and limited clients (3)
960+
final int numTasks = 6;
961+
final int numClients = 3;
962+
final int numStandbyReplicas = 5;
963+
964+
Map<String, AssignmentMemberSpec> members = new HashMap<>();
965+
for (int i = 0; i < numClients; i++) {
966+
members.put("member" + i, createAssignmentMemberSpec("process" + i));
967+
}
968+
969+
GroupAssignment result = assignor.assign(
970+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))),
971+
new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology"))
972+
);
973+
974+
// Verify all active tasks are assigned
975+
Set<Integer> allActiveTasks = new HashSet<>();
976+
for (String memberId : result.members().keySet()) {
977+
List<Integer> memberActiveTasks = getAllActiveTaskIds(result, memberId);
978+
allActiveTasks.addAll(memberActiveTasks);
979+
}
980+
assertEquals(numTasks, allActiveTasks.size());
981+
982+
// With only 3 clients and 5 standby replicas, not all standby replicas can be assigned
983+
// since each client can only hold standby tasks for tasks it doesn't have active
984+
Set<Integer> allStandbyTasks = new HashSet<>();
985+
for (String memberId : result.members().keySet()) {
986+
List<Integer> memberStandbyTasks = getAllStandbyTaskIds(result, memberId);
987+
allStandbyTasks.addAll(memberStandbyTasks);
988+
}
989+
990+
// Maximum possible = numTasks * min(numStandbyReplicas, numClients - 1) = 6 * 2 = 12
991+
int maxPossibleStandbyTasks = numTasks * Math.min(numStandbyReplicas, numClients - 1);
992+
assertTrue(allStandbyTasks.size() <= maxPossibleStandbyTasks);
993+
assertTrue(allStandbyTasks.size() > 0); // Should assign at least some standby tasks
994+
}
995+
996+
@Test
997+
public void shouldHandleLargeNumberOfSubtopologiesWithStandbyTasks() {
998+
// Test with many subtopologies (10) each with different numbers of tasks
999+
final int numSubtopologies = 10;
1000+
final int numClients = 4;
1001+
final int numStandbyReplicas = 1;
1002+
1003+
List<String> subtopologies = new ArrayList<>();
1004+
for (int i = 0; i < numSubtopologies; i++) {
1005+
subtopologies.add("subtopology-" + i);
1006+
}
1007+
1008+
Map<String, AssignmentMemberSpec> members = new HashMap<>();
1009+
for (int i = 0; i < numClients; i++) {
1010+
members.put("member" + i, createAssignmentMemberSpec("process" + i));
1011+
}
1012+
1013+
GroupAssignment result = assignor.assign(
1014+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))),
1015+
new TopologyDescriberImpl(5, true, subtopologies) // 5 tasks per subtopology
1016+
);
1017+
1018+
// Verify all subtopologies have tasks assigned
1019+
Set<String> subtopologiesWithTasks = new HashSet<>();
1020+
for (String memberId : result.members().keySet()) {
1021+
MemberAssignment member = result.members().get(memberId);
1022+
subtopologiesWithTasks.addAll(member.activeTasks().keySet());
1023+
}
1024+
assertEquals(numSubtopologies, subtopologiesWithTasks.size());
1025+
1026+
// Verify standby tasks are assigned across subtopologies
1027+
Set<String> subtopologiesWithStandbyTasks = new HashSet<>();
1028+
for (String memberId : result.members().keySet()) {
1029+
MemberAssignment member = result.members().get(memberId);
1030+
subtopologiesWithStandbyTasks.addAll(member.standbyTasks().keySet());
1031+
}
1032+
assertEquals(numSubtopologies, subtopologiesWithStandbyTasks.size());
1033+
}
1034+
1035+
@Test
1036+
public void shouldHandleEdgeCaseWithSingleClientAndMultipleStandbyReplicas() {
1037+
// Test edge case: single client with multiple standby replicas
1038+
final int numTasks = 10;
1039+
final int numStandbyReplicas = 3;
1040+
1041+
Map<String, AssignmentMemberSpec> members = mkMap(
1042+
mkEntry("member1", createAssignmentMemberSpec("process1"))
1043+
);
1044+
1045+
GroupAssignment result = assignor.assign(
1046+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))),
1047+
new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology"))
1048+
);
1049+
1050+
// Single client should get all active tasks
1051+
assertEquals(numTasks, getAllActiveTaskCount(result, "member1"));
1052+
1053+
// No standby tasks should be assigned since there's only one client
1054+
// (standby tasks can't be assigned to the same client as active tasks)
1055+
assertTrue(getAllStandbyTaskIds(result, "member1").isEmpty());
1056+
}
1057+
1058+
@Test
1059+
public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() {
1060+
// Test edge case: more standby replicas than available clients
1061+
final int numTasks = 4;
1062+
final int numClients = 2;
1063+
final int numStandbyReplicas = 5; // More than available clients
1064+
1065+
Map<String, AssignmentMemberSpec> members = new HashMap<>();
1066+
for (int i = 0; i < numClients; i++) {
1067+
members.put("member" + i, createAssignmentMemberSpec("process" + i));
1068+
}
1069+
1070+
GroupAssignment result = assignor.assign(
1071+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))),
1072+
new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology"))
1073+
);
1074+
1075+
// Verify all active tasks are assigned
1076+
Set<Integer> allActiveTasks = new HashSet<>();
1077+
for (String memberId : result.members().keySet()) {
1078+
List<Integer> memberActiveTasks = getAllActiveTaskIds(result, memberId);
1079+
allActiveTasks.addAll(memberActiveTasks);
1080+
}
1081+
assertEquals(numTasks, allActiveTasks.size());
1082+
1083+
// With only 2 clients, maximum standby tasks per task = 1 (since each client has active tasks)
1084+
Set<Integer> allStandbyTasks = new HashSet<>();
1085+
for (String memberId : result.members().keySet()) {
1086+
List<Integer> memberStandbyTasks = getAllStandbyTaskIds(result, memberId);
1087+
allStandbyTasks.addAll(memberStandbyTasks);
1088+
}
1089+
1090+
// Maximum possible = numTasks * 1 = 4
1091+
assertEquals(numTasks, allStandbyTasks.size());
1092+
}
1093+
1094+
8361095
private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) {
8371096
int size = 0;
8381097
for (String memberId : memberIds) {

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f
573573
createStateForRestoration(inputStream, 0);
574574

575575
if (useNewProtocol) {
576-
CLUSTER.setStandbyReplicas(appId, 1);
576+
CLUSTER.setGroupStandbyReplicas(appId, 1);
577577
}
578578

579579
final Properties props1 = props(stateUpdaterEnabled);

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,19 @@ public void shouldWorkWithRebalance(
140140

141141

142142
final Properties props = new Properties();
143+
final String appId = safeUniqueTestName(testInfo);
143144
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
144-
props.put(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(testInfo));
145+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
145146
props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
146147
props.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
147-
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
148-
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
149-
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
150148
if (streamsProtocolEnabled) {
151149
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
150+
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
151+
CLUSTER.setGroupSessionTimeout(appId, 10000);
152+
CLUSTER.setGroupHeartbeatTimeout(appId, 1000);
153+
} else {
154+
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
155+
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
152156
}
153157

154158
// cycle out Streams instances as long as the test is running.

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private Properties streamsConfiguration(final boolean streamsProtocolEnabled) {
9999
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
100100
if (streamsProtocolEnabled) {
101101
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
102-
CLUSTER.setStandbyReplicas("app-" + safeTestName, 1);
102+
CLUSTER.setGroupStandbyReplicas("app-" + safeTestName, 1);
103103
} else {
104104
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
105105
}

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,8 @@ public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final Map<Stri
401401

402402
private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) {
403403
brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
404+
brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "100");
405+
brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "100");
404406
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "0");
405407
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
406408
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
@@ -439,7 +441,33 @@ public Properties getLogConfig(final String topic) {
439441
}
440442
}
441443

442-
public void setStandbyReplicas(final String groupId, final int numStandbyReplicas) {
444+
public void setGroupSessionTimeout(final String groupId, final int sessionTimeoutMs) {
445+
try (final Admin adminClient = createAdminClient()) {
446+
adminClient.incrementalAlterConfigs(
447+
Map.of(
448+
new ConfigResource(ConfigResource.Type.GROUP, groupId),
449+
List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, String.valueOf(sessionTimeoutMs)), AlterConfigOp.OpType.SET))
450+
)
451+
).all().get();
452+
} catch (final InterruptedException | ExecutionException e) {
453+
throw new RuntimeException(e);
454+
}
455+
}
456+
457+
public void setGroupHeartbeatTimeout(final String groupId, final int heartbeatTimeoutMs) {
458+
try (final Admin adminClient = createAdminClient()) {
459+
adminClient.incrementalAlterConfigs(
460+
Map.of(
461+
new ConfigResource(ConfigResource.Type.GROUP, groupId),
462+
List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatTimeoutMs)), AlterConfigOp.OpType.SET))
463+
)
464+
).all().get();
465+
} catch (final InterruptedException | ExecutionException e) {
466+
throw new RuntimeException(e);
467+
}
468+
}
469+
470+
public void setGroupStandbyReplicas(final String groupId, final int numStandbyReplicas) {
443471
try (final Admin adminClient = createAdminClient()) {
444472
adminClient.incrementalAlterConfigs(
445473
Map.of(

0 commit comments

Comments
 (0)