Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -99,13 +101,17 @@ public class BalancedShardsAllocator implements ShardsAllocator {
"cluster.routing.allocation.balance.index",
0.55f,
0.0f,
Float.MAX_VALUE,
new IndexBalanceFactorValidator(),
Property.Dynamic,
Property.NodeScope
);
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.shard",
0.45f,
0.0f,
Float.MAX_VALUE,
new ShardBalanceFactorValidator(),
Property.Dynamic,
Property.NodeScope
);
Expand Down Expand Up @@ -518,6 +524,67 @@ public boolean getPreferPrimaryBalance() {
return preferPrimaryShardBalance;
}

/**
* Validates that the index balance factor, combined with the shard balance factor, sums to a value greater than zero.
*
* @opensearch.internal
*/
static final class IndexBalanceFactorValidator implements Setting.Validator<Float> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need individual classes for them, In the constructor we can pass the setting name to be fetched


@Override
public void validate(Float value) {}

@Override
public void validate(final Float value, final Map<Setting<?>, Object> settings) {
final float shardBalance = (Float) settings.get(SHARD_BALANCE_FACTOR_SETTING);
doValidateBalanceFactorSum(value, shardBalance);
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(SHARD_BALANCE_FACTOR_SETTING);
return settings.iterator();
}
}

/**
* Validates that the shard balance factor, combined with the index balance factor, sums to a value greater than zero.
*
* @opensearch.internal
*/
static final class ShardBalanceFactorValidator implements Setting.Validator<Float> {

@Override
public void validate(Float value) {}

@Override
public void validate(final Float value, final Map<Setting<?>, Object> settings) {
final float indexBalance = (Float) settings.get(INDEX_BALANCE_FACTOR_SETTING);
doValidateBalanceFactorSum(indexBalance, value);
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_BALANCE_FACTOR_SETTING);
return settings.iterator();
}
}

static void doValidateBalanceFactorSum(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException(
"Balance factors ["
+ INDEX_BALANCE_FACTOR_SETTING.getKey()
+ "] and ["
+ SHARD_BALANCE_FACTOR_SETTING.getKey()
+ "] must sum to a value greater than zero but was ["
+ sum
+ "]"
);
}
}

/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.allocator;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING;

public class BalancedShardsAllocatorSettingsTests extends OpenSearchTestCase {

public void testBothBalanceFactorsZeroIsRejectedOnConstruction() {
final Settings settings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.build();
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new BalancedShardsAllocator(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
);
assertThat(e.getMessage(), org.hamcrest.Matchers.containsString("must sum to a value greater than zero"));
}

public void testBothBalanceFactorsZeroIsRejectedOnDynamicUpdate() {
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
new BalancedShardsAllocator(Settings.EMPTY, clusterSettings);

final Settings newSettings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.build();

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(newSettings)
);
assertThat(e.getMessage(), org.hamcrest.Matchers.containsString("illegal value can't update"));
assertNotNull(e.getCause());
assertThat(e.getCause().getMessage(), org.hamcrest.Matchers.containsString("must sum to a value greater than zero"));
}

public void testIndexBalanceZeroWithNonZeroShardBalanceIsAccepted() {
final Settings settings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0")
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings, clusterSettings);
assertEquals(0.0f, allocator.getIndexBalance(), 0.0f);
assertEquals(1.0f, allocator.getShardBalance(), 0.0f);
}

public void testShardBalanceZeroWithNonZeroIndexBalanceIsAccepted() {
final Settings settings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "1.0")
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings, clusterSettings);
assertEquals(1.0f, allocator.getIndexBalance(), 0.0f);
assertEquals(0.0f, allocator.getShardBalance(), 0.0f);
}

public void testDynamicUpdateIndexBalanceToZeroWhileShardBalanceAlreadyZeroIsRejected() {
final Settings initialSettings = Settings.builder()
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "1.0")
.build();
final ClusterSettings clusterSettings = new ClusterSettings(initialSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
new BalancedShardsAllocator(initialSettings, clusterSettings);

final Settings newSettings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.build();

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(newSettings)
);
assertThat(e.getCause().getMessage(), org.hamcrest.Matchers.containsString("must sum to a value greater than zero"));
}

public void testDynamicUpdateShardBalanceToZeroWhileIndexBalanceAlreadyZeroIsRejected() {
final Settings initialSettings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0")
.build();
final ClusterSettings clusterSettings = new ClusterSettings(initialSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
new BalancedShardsAllocator(initialSettings, clusterSettings);

final Settings newSettings = Settings.builder()
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0")
.build();

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(newSettings)
);
assertThat(e.getCause().getMessage(), org.hamcrest.Matchers.containsString("must sum to a value greater than zero"));
}

public void testValidDynamicUpdateIsAccepted() {
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new BalancedShardsAllocator(Settings.EMPTY, clusterSettings);

final Settings newSettings = Settings.builder()
.put(INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.3")
.put(SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.7")
.build();

clusterSettings.applySettings(newSettings);
assertEquals(0.3f, allocator.getIndexBalance(), 0.001f);
assertEquals(0.7f, allocator.getShardBalance(), 0.001f);
}
}
Loading