Skip to content

[Feature Request] Adaptive Coordinator-Level Concurrency Limiting for Scatter-Gather Workloads #22295

Description

@G1GC

Is your feature request related to a problem? Please describe

Authors: Karthik Jayaraman, Bhavik Patel, Philip Chan

OpenSearch's existing protection mechanisms such as admission control, search backpressure, and workload management all operate at the shard or node level. For scatter-gather workloads where allow_partial_search_results=false, shard-level rejection produces exponential failure amplification that makes these mechanisms counterproductive.

The Amplification Problem

When a search fans out to S shards and allow_partial_search_results=false, success requires all S shard requests to succeed. If each shard has an independent rejection probability p, the query success probability is:

P(success) = (1 - p)^S

Per-shard rejection rate (p) S=20 shards S=50 shards S=89 shards
1% 82% success 61% success 41% success
2% 67% 36% 16%
5% 36% 8% 1%

A per-shard rejection rate of just 1-2%, which appears healthy in node-level metrics, produces catastrophic query failure rates at the application layer.

Who Is Affected

Any OpenSearch deployment that runs scatter-gather queries across many shards, sets allow_partial_search_results=false because partial results are semantically incorrect, and experiences periodic overload. This may include monitoring/metrics platforms, logging platforms where alerting queries need complete results, security analytics requiring complete event correlation, and e-commerce search that cannot show partial product catalogs.

Describe the solution you'd like

Adaptive Coordinator-Level Concurrency Limiter

An adaptive concurrency limiter at the coordinator layer, operating before fan-out, where one rejection equals one failed query with zero wasted shard work. The limiter uses the TCP Vegas algorithm to auto-discover the coordinator node's optimal search concurrency by monitoring request latency. When latency rises (indicating queueing in the backend), it reduces the concurrency limit. When latency is stable, it probes upward. No manual capacity configuration is required.

The implementation is a module (modules/concurrency-limit/) that integrates via the existing ActionFilter extension point. The server module contains no concurrency-limiting logic. All behavior lives in the module. The Netflix concurrency-limits-core library (Apache 2.0) is used as a Maven dependency.

HTTP Request

  → ActionFilter chain

    → [order 1] ActionConcurrencyLimitFilter

      → if token acquired: proceed to TransportAction (fan-out)

      → if at limit: reject with 429 before any shard work

    → on completion: release token, feed RTT to algorithm

Algorithm

TCP Vegas measures the queuing delay signal: the difference between expected throughput (based on observed minimum RTT) and actual throughput. When actual throughput drops below expected, it indicates backend queueing and the concurrency window is reduced. When they converge, the algorithm probes upward. The implementation supports Vegas, Gradient2, and AIMD as selectable algorithms via a dynamic cluster setting, allowing operators to evaluate which works best for their workload.

A configurable warm-up period (default 5 minutes) ensures no rejections during initial convergence after a node start. During warm-up, the algorithm runs and learns but all requests are admitted. This eliminates false-positive rejections during rolling restarts.

A customizable updrift_factor parameter (validated at 5) controls how aggressively Vegas increases the limit when capacity is available. This addresses the known issue where standard Vegas is too conservative on the upward path, taking too long to recover after transient load dips.

Operational Model

Follows OpenSearch's established tri-state mode pattern (same as AdmissionControlMode, SearchBackpressureMode, etc.):

concurrency_limit.action.<alias>.mode: "disabled" | "monitor_only" | "enforced"

In monitor_only mode, the algorithm runs at full fidelity, converges on the limit, emits stats and logs on would-reject decisions, but never actually rejects. This enables production validation before enforcement with zero risk. Note that monitor_only cannot correctly simulate recovery patterns since load is never actually shed.

Settings

All settings are dynamic (no restart required) and per-alias. <alias> is a user-chosen name (e.g., search, bulk):

Setting Type Default Description
concurrency_limit.action.alias.action_name String (required) Full action name (e.g., indices:data/read/search)
concurrency_limit.action.alias.mode String disabled disabled, enforced, monitor_only
concurrency_limit.action.alias.algorithm String vegas vegas, gradient2, or aimd
concurrency_limit.action.alias.initial_limit int [1-10000] 20 Starting concurrency limit
concurrency_limit.action.alias.max_limit int [1-10000] 200 Upper bound on adaptive growth
concurrency_limit.action.alias.warmup_duration TimeValue 5m No rejections during this window
concurrency_limit.action.alias.vegas.updrift_factor int [1-100] 5 Upward drift speed (higher = faster recovery)
concurrency_limit.action.alias.aimd_backoff_ratio double [0.1-1.0] 0.9 AIMD decrease factor on drops
concurrency_limit.action.alias.gradient2_rtt_tolerance double >= 1.0 1.5 RTT tolerance before reducing limit

If the resolver returns a value that doesn't match any configured partition, the request is routed to unknownPartition which holds any unallocated percentage of the limit.

Example: search with aggregation isolation

PUT /_cluster/settings

{

  "persistent": {

    "concurrency_limit.action.search.action_name": "indices:data/read/search",

    "concurrency_limit.action.search.mode": "enforced",

    "concurrency_limit.action.search.algorithm": "vegas",

    "concurrency_limit.action.search.partitions": ["search", "aggregation"],

    "concurrency_limit.action.search.partition.search.percent": 0.9,

    "concurrency_limit.action.search.partition.aggregation.percent": 0.1,

    "concurrency_limit.action.search.partition.resolver": "byQueryShape"

  }

}

With this configuration, aggregation queries are guaranteed 10% of capacity but can burst higher when search traffic is light. Under overload, search is guaranteed 90% and aggregation spikes cannot starve point queries.

Partitioning

The partition feature divides the adaptive concurrency limit across named groups, each guaranteed a percentage of the total capacity. When the adaptive algorithm raises or lowers the overall limit, each partition's sub-limit is recalculated proportionally in real time.

Total limit = 100

|-----"premium"  60% → sub-limit = 60

|-----"standard" 30% → sub-limit = 30

|----- "default"  10% → sub-limit = 10 (catch-all)

Partitions are work-conserving, not static hard caps. When total inflight is below the global limit, partition limits are ignored and any request gets through regardless of its partition allocation. All partitions can burst freely into available capacity. When total inflight reaches the global limit (overload), partition limits are enforced and only requests whose partition has not exceeded its proportional share get through.

This means idle capacity is never wasted. A partition configured at 10% can burst all the way to the global limit if other partitions are underutilized. Under overload, the guaranteed minimums kick in to protect high-priority traffic. Load testing confirmed this behavior.

Partition settings (per-alias):

concurrency_limit.action.<alias>.partitions = premium, standard, default

concurrency_limit.action.<alias>.partition.premium.percent = 0.6

concurrency_limit.action.<alias>.partition.standard.percent = 0.3

concurrency_limit.action.<alias>.partition.default.percent = 0.1

concurrency_limit.action.<alias>.partition.resolver = byHeader

concurrency_limit.action.<alias>.partition.resolver.byHeader.name = X-Request-Tier

Supported resolver types:

Type Description Config
byHeader Reads a named HTTP request header; value must match a partition name resolver.byHeader.name
byQueryShape Classifies based on query structure (e.g., presence of aggregations) Auto-detects
fixed Always maps to same partition (useful for testing) resolver.fixed.partition

If the resolver returns a value that doesn't match any configured partition, the request is routed to unknownPartition which holds any unallocated percentage of the limit.

Example: search with aggregation isolation

PUT /_cluster/settings

{

  "persistent": {

    "concurrency_limit.action.search.action_name": "indices:data/read/search",

    "concurrency_limit.action.search.mode": "enforced",

    "concurrency_limit.action.search.algorithm": "vegas",

    "concurrency_limit.action.search.partitions": ["search", "aggregation"],

    "concurrency_limit.action.search.partition.search.percent": 0.9,

    "concurrency_limit.action.search.partition.aggregation.percent": 0.1,

    "concurrency_limit.action.search.partition.resolver": "byQueryShape"

  }

}

With this configuration, aggregation queries are guaranteed 10% of capacity but can burst higher when search traffic is light. Under overload, search is guaranteed 90% and aggregation spikes cannot starve point queries.

Observability

Stats are exposed via GET /_nodes/stats under the concurrency_limiters key, with one object per configured alias:

{

  "concurrency_limiters": {

    "search": {

      "action_name": "indices:data/read/search",

      "algorithm": "vegas",

      "current_limit": 147,

      "in_flight": 42,

      "total_rejected": 1205,

      "rtt_no_load_millis": 12

    }

  }

}

Related component

Search:Resiliency

Describe alternatives you've considered

No response

Additional context

Prior Art

Netflix concurrency-limits (Apache 2.0) is a Java library implementing Vegas, Gradient2, and AIMD for server-side adaptive concurrency limiting, used in production at Netflix scale. Apache Kafka (KIP-842) uses the same Netflix library for broker-side request admission with similar cold-start mitigations. gRPC adaptive concurrency and Envoy adaptive concurrency filter apply similar concepts in other ecosystems.

Validation

This feature has been validated with synthetic benchmarking data on a production-scale cluster: 155 nodes (150 data, 5 master) with 24 cores and 150 GB each, 89 shards with 3 replicas, ~10B docs, 9.65 TB total, allow_partial_search_results=false. The workload consisted of 2K unique filter queries at variable QPS from 2K to 100K (20x overload).

Metric Custom Vegas (updrift=5, warmup=5m)
Steady-state error at 2K QPS 0%
Steady-state error at 5K QPS (near capacity) 1.7% avg, 5% max
Overload handling (100K QPS = 20x) Maintains ~5K QPS throughput, prevents crash
Reaction time to overload Instantaneous
Recovery time after overload ~2 minutes
Node restart error (2K QPS) 0.6% max, 0.01% avg
Node restart error (5K QPS) 25.8% max, 3.3% avg

Without the limiter, the cluster crashes under sustained overload with failure rate approaching 100%. With the limiter enabled, throughput is maintained at cluster capacity and excess load is shed cleanly via 429 responses.

Known Limitations (from our tests)

  1. The adaptive algorithm maintains a ~1.7% average rejection rate at near-capacity load (5K QPS) as a deliberate safety margin. This is tunable via updrift_factor at the cost of slower overload reaction.

  2. Vegas requires ongoing traffic to maintain an accurate RTT baseline. During low-traffic periods, the rtt_noload baseline may drift. The probeMultiplier setting controls how often the algorithm resets its baseline.

  3. Vegas will not adjust the limit downward unless inflight >= limit/2. Protection only engages when the node is meaningfully loaded, which is generally correct behavior for search workloads.

  4. monitor_only mode cannot simulate recovery dynamics since load is never actually shed. The algorithm's convergence behavior is accurate but recovery can only be observed in enforced mode.

Breaking changes

None. Default mode is disabled. No behavioral change for any existing deployment. No changes to wire protocol, cluster state, index format, or existing APIs. Fully backward compatible.


Metadata

Metadata

Assignees

No one assigned

    Labels

    RFCIssues requesting major changesSearch:ResiliencydiscussIssues intended to help drive brainstorming and decision makingenhancementEnhancement or improvement to existing feature or request

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    Status
    🆕 New
    Status
    New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions