Skip to content

[analytics-engine] Improve TopK correctness with CSS: replace Final with PartialReduce using topk check#22360

Merged
sandeshkr419 merged 14 commits into
opensearch-project:mainfrom
sandeshkr419:fix/disable-skip-partial-agg
Jul 1, 2026
Merged

[analytics-engine] Improve TopK correctness with CSS: replace Final with PartialReduce using topk check#22360
sandeshkr419 merged 14 commits into
opensearch-project:mainfrom
sandeshkr419:fix/disable-skip-partial-agg

Conversation

@sandeshkr419

@sandeshkr419 sandeshkr419 commented Jun 30, 2026

Copy link
Copy Markdown
Member

Co-Authored-By: @mch2 & @expani

Problem

When concurrent segment search (CSS) is active, DataFusion's physical optimizer inserts RepartitionExec(RoundRobin, N) → AggregateExec(Partial, ×N) → AggregateExec(FinalPartitioned) on the shard fragment. prepare_partial_plan calls force_aggregate_mode(Partial) which strips FinalPartitioned and returns the Partial(×N) subtree directly.

Each of the N CSS partitions then runs the TopK SortExec independently, truncating to the TopK fetch limit before the coordinator merge. Groups split across partitions are dropped, producing incorrect counts (e.g. q31 on clickbench dataset returns c=313 instead of c=1633).

Fix

Propagate a hasTopK boolean from OpenSearchTopKRewriter through the planner pipeline:

PlannerContextFragmentConversionDriverPartialAggregateInstructionNodeShardScanExecutionContextNativeBridge → Rust create_session_contextSessionContextHandle

In prepare_partial_plan, when has_topk=true, force_aggregate_mode replaces FinalPartitioned with PartialReduce instead of stripping it. PartialReduce keeps agg.input() intact — RepartitionExec(Hash) → Partial(×N) — so CSS partitions are merged by group key before the TopK SortExec truncates. The hash repartition ensures each group key lands in exactly one partition, so PartialReduce produces complete per-key partial state.

CSS scan parallelism is fully preserved: the scan + partial agg still run ×N in parallel. Only a merge step is added before TopK. Non-TopK queries (has_topk=false) are unaffected.

Changes

  • Java: PlannerContext.setTopKApplied set by PlannerImpl after OpenSearchTopKRewriter fires; threaded through FragmentConversionDriverPartialAggregateInstructionNode.hasTopKShardScanExecutionContext.hasTopKNativeBridge.createSessionContext
  • Rust agg_mode.rs: force_aggregate_mode accepts has_topk: bool; when true and hitting Final/FinalPartitioned, returns PartialReduce(input=agg.input()) instead of stripping. Removes Marc's partition_count() > 1 check — has_topk from the planner is the authoritative signal
  • Rust session_context.rs: SessionContextHandle.has_topk field; prepare_partial_plan passes it to apply_aggregate_mode
  • Plan shape goldens: 29 queries (q8–q43, prod2s) updated to show AggregateExec(PartialReduce) above RepartitionExec(Hash) → AggregateExec(Partial)

Testing

  • integTest: full QA suite passes
  • integTestPlanShape: all 29 affected golden files verified
  • Unit tests pass

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@sandeshkr419 sandeshkr419 requested a review from a team as a code owner June 30, 2026 20:42
@github-actions

github-actions Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 4652831)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Tighten OpenSearchTopKRewriter detection (nested stats + window fn bail)

Relevant files:

  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchTopKRewriter.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/TopKRewriterPlanShapeTests.java

Sub-PR theme: Rust — apply PartialReduce when TopK is active for CSS correctness

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/rust/src/agg_mode.rs
  • sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs
  • sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs
  • sandbox/plugins/analytics-backend-datafusion/rust/src/local_executor.rs

Sub-PR theme: Add TopK CSS correctness regression IT suite

Relevant files:

  • sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java

⚡ Recommended focus areas for review

Test Isolation

provisioned is a static volatile flag shared across test instances, and ensureProvisioned sets a persistent cluster setting (analytics.shard_bucket_oversampling_factor=0.1) that is never reset. This persistent setting can leak into other tests run in the same cluster after this suite, causing unrelated test flakiness. Consider using transient settings or resetting the value in an @AfterClass hook.

private void ensureProvisioned() throws Exception {
    if (!provisioned) {
        // MULTI_SEGMENT (2 segments/shard) + low oversampling makes the CSS truncation
        // bug reproducible on the local test cluster — each CSS partition independently
        // truncates to a very small fetch limit, producing wrong results without the fix.
        DatasetProvisioner.provision(client(), ClickBenchTestHelper.DATASET, 2, DatasetProvisioner.SegmentLayout.MULTI_SEGMENT);
        Request req = new Request("PUT", "/_cluster/settings");
        req.setJsonEntity(
            "{\"persistent\":{\"analytics.shard_bucket_oversampling_factor\": 0.1}}"
        );
        client().performRequest(req);
        provisioned = true;
    }
}
Test Isolation

assertCssMatchesNoCss resets CSS to "none" on success but not on failure — if the assertion throws, subsequent tests inherit CSS mode "all" with 4 slices. Wrap the assert in try/finally to guarantee cleanup.

private void assertCssMatchesNoCss(String ppl) throws Exception {
    setCss("none", 0);
    List<List<Object>> reference = rowsOf(executePPL(ppl));

    setCss("all", 4);
    List<List<Object>> withCss = rowsOf(executePPL(ppl));

    assertEquals(
        "CSS result differs from no-CSS reference for query: " + ppl,
        reference,
        withCss
    );

    setCss("none", 0);
}
Silent Regression Risk

substrait_has_fetch_rel only recurses through Fetch/Sort/Project/Filter/Aggregate rels and returns false (with a debug log) for any other rel type. If OpenSearchTopKRewriter's shard fragment ever contains an intermediate rel not in this list (e.g., Join, Set, or a new rel type added upstream), TopK correctness will silently regress to the pre-fix wrong-count behavior with no visible error. Consider failing closed (defaulting to true, i.e., applying PartialReduce) or asserting known-safe rel types.

fn substrait_has_fetch_rel(plan_bytes: &[u8]) -> bool {
    use prost::Message;
    use substrait::proto::rel::RelType;

    fn rel_has_fetch(rel: &substrait::proto::Rel) -> bool {
        match rel.rel_type.as_ref() {
            Some(RelType::Fetch(f)) => f.count_mode.is_some(),
            Some(RelType::Sort(s)) => s.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
            Some(RelType::Project(p)) => p.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
            Some(RelType::Filter(f)) => f.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
            Some(RelType::Aggregate(a)) => a.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
            // TODO: enumerate remaining rel types explicitly and panic on unknown ones.
            Some(other) => {
                native_bridge_common::log_debug!(
                    "substrait_has_fetch_rel: {:?} — no TopK fetch",
                    std::mem::discriminant(other)
                );
                false
            }
            None => false,
        }
    }

    let Ok(plan) = substrait::proto::Plan::decode(plan_bytes) else { return false; };
    plan.relations.iter().any(|pr| {
        match pr.rel_type.as_ref() {
            Some(substrait::proto::plan_rel::RelType::Root(rr)) => {
                rr.input.as_ref().map_or(false, |r| rel_has_fetch(r))
            }
            Some(substrait::proto::plan_rel::RelType::Rel(r)) => rel_has_fetch(r),
            None => false,
        }
    })
}
Possible Issue

When has_topk && input_partitions > 1, the code replaces Final/FinalPartitioned with a PartialReduce built directly over agg.input() without recursing into that subtree. If there are nested Partial-mode aggregates deeper in the tree (chained stats, though the Java-side rewriter is documented to bail on nested aggregates), the deeper Partial subtree is left intact. Verify that this can never happen for TopK plans, otherwise the untouched inner Partial could still be wrong.

if has_topk && agg.input().output_partitioning().partition_count() > 1 {
    return Ok(Arc::new(AggregateExec::try_new(
        AggregateMode::PartialReduce,
        agg.group_expr().clone(),
        agg.aggr_expr().to_vec(),
        agg.filter_expr().to_vec(),
        Arc::clone(agg.input()),
        agg.input_schema(),
    )?));
}

@github-actions

github-actions Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 4652831

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Reject non-identity secondary projects

Passing through subsequent Projects without inspecting them lets any
non-plain-column expression (e.g., a computed CASE, CAST, or literal) sit between
the captured Project and the FINAL aggregate. Since only the first Project is used
for sort-key remapping, sort fields that reference outputs of the second Project
will be validated against the wrong schema, potentially producing incorrect TopK
results. Restrict pass-through to Projects containing only RexInputRef entries,
mirroring the safety check used elsewhere.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchTopKRewriter.java [238-245]

 if (node instanceof OpenSearchProject proj) {
     if (proj.getProjects().stream().anyMatch(RexOver::containsOver)) return null; // window fn
-    // Capture the first Project for sort-key remapping; pass through subsequent Projects.
-    ...
+    if (seenProject != null
+        && proj.getProjects().stream().anyMatch(p -> !(p instanceof RexInputRef))) {
+        return null;
+    }
     return findFinalAgg(proj.getInput(), seenProject == null ? proj : seenProject);
 }
Suggestion importance[1-10]: 7

__

Why: The comment claims subsequent plain-column projects are safe, but the code passes through any project unconditionally (except window functions). A computed expression in the second project could cause incorrect sort-key remapping; adding a RexInputRef-only check tightens correctness.

Medium
General
Avoid silent false-negatives on unknown rel types

The recursive walk only handles Fetch/Sort/Project/Filter/Aggregate and returns
false for every other rel type (Join, Set, Extension, etc.). If the shard fragment
ever gains a wrapping rel above the FetchRel (e.g., a Root wrapper's child chain
that inserts something new), TopK detection silently fails and PartialReduce isn't
applied — producing the exact CSS correctness bug this PR aims to fix. Consider
walking all children generically (e.g., via a proto visitor or by recursing into
every input field) so unknown wrappers don't cause false negatives.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [504-521]

 fn rel_has_fetch(rel: &substrait::proto::Rel) -> bool {
         match rel.rel_type.as_ref() {
             Some(RelType::Fetch(f)) => f.count_mode.is_some(),
             Some(RelType::Sort(s)) => s.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
             Some(RelType::Project(p)) => p.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
             Some(RelType::Filter(f)) => f.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
             Some(RelType::Aggregate(a)) => a.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
+            // TODO: walk children generically for unhandled rel types to avoid
+            // silently returning false when new wrappers appear in shard fragments.
Suggestion importance[1-10]: 6

__

Why: Valid concern: the recursive walk enumerates only a handful of rel types and silently returns false for others, which could cause TopK detection to fail if shard fragments gain new wrappers. The suggestion aligns with the existing TODO in the code.

Low
Restrict PartialReduce substitution to FinalPartitioned

When constructing the PartialReduce node, agg.input() is used directly, but agg here
may be FinalPartitioned whose input is a RepartitionExec(Hash) over Partial. That
works, but for a plain Final (non-partitioned) the input can be
CoalescePartitions(Partial), and wrapping that with PartialReduce may yield a
single-partition input that skips the hash-repartition merge that CSS correctness
relies on. Consider explicitly asserting/handling the Final (non-partitioned) case,
or restricting PartialReduce substitution to FinalPartitioned only, so the
assumption "input is Hash-repartitioned Partial(×N)" always holds.

sandbox/plugins/analytics-backend-datafusion/rust/src/agg_mode.rs [91-100]

-if has_topk && agg.input().output_partitioning().partition_count() > 1 {
+if has_topk
+    && agg.mode() == &AggregateMode::FinalPartitioned
+    && agg.input().output_partitioning().partition_count() > 1
+{
     return Ok(Arc::new(AggregateExec::try_new(
         AggregateMode::PartialReduce,
         agg.group_expr().clone(),
         agg.aggr_expr().to_vec(),
         agg.filter_expr().to_vec(),
         Arc::clone(agg.input()),
         agg.input_schema(),
     )?));
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a plausible concern about Final vs FinalPartitioned inputs, though the existing partition_count>1 guard already provides some protection. Restricting to FinalPartitioned would make the assumption explicit but may be overly restrictive.

Low
Escape PPL body when building JSON

The PPL string is interpolated directly into a JSON body with only quote
concatenation — any query containing a double quote, backslash, or control character
will produce malformed JSON and fail the request with a confusing parse error rather
than executing the intended query. Use a proper JSON escape (or a JSON library) so
future test cases with quoted literals do not silently break.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [287-292]

 private Map<String, Object> executePPL(String ppl) throws Exception {
     Request request = new Request("POST", "/_analytics/ppl");
-    request.setJsonEntity("{\"query\": \"" + ppl + "\"}");
+    String escaped = ppl.replace("\\", "\\\\").replace("\"", "\\\"");
+    request.setJsonEntity("{\"query\": \"" + escaped + "\"}");
     Response response = client().performRequest(request);
     return entityAsMap(response);
 }
Suggestion importance[1-10]: 3

__

Why: Minor test-code hygiene improvement. Current tests don't contain special characters, so the impact is low, but it prevents future breakage.

Low

Previous suggestions

Suggestions up to commit 408840c
CategorySuggestion                                                                                                                                    Impact
General
Restore cluster state via try/finally

If the assertion fails or an exception is thrown between enabling CSS and the final
setCss("none", 0), the cluster is left with CSS enabled and 4 slices, polluting
state for later tests in the same JVM. Wrap the CSS-on block in try/finally so
cleanup runs even on assertion failure.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [251-265]

 private void assertCssMatchesNoCss(String ppl) throws Exception {
     setCss("none", 0);
     List<List<Object>> reference = rowsOf(executePPL(ppl));
 
     setCss("all", 4);
-    List<List<Object>> withCss = rowsOf(executePPL(ppl));
-
-    assertEquals(
-        "CSS result differs from no-CSS reference for query: " + ppl,
-        reference,
-        withCss
-    );
-
-    setCss("none", 0);
+    try {
+        List<List<Object>> withCss = rowsOf(executePPL(ppl));
+        assertEquals(
+            "CSS result differs from no-CSS reference for query: " + ppl,
+            reference,
+            withCss
+        );
+    } finally {
+        setCss("none", 0);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: Valid concern: if the assertion fails, CSS remains enabled and leaks into subsequent tests running in the same JVM, potentially masking failures or confusing debugging. try/finally is a clear correctness improvement for test isolation.

Medium
Recurse through Fetch input to avoid missing nested fetches

The recursive rel_has_fetch only descends through
Sort/Project/Filter/Aggregate/Fetch nodes, so any other node type (Join, Set,
Exchange, Extension, etc.) that wraps a FetchRel will cause TopK detection to
silently return false and lose the CSS correctness fix. Consider walking all
children generically or at minimum recursing into the Fetch node's input as well
when count_mode is absent, to avoid missing valid TopK fragments in unanticipated
plan shapes.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [492-501]

 fn rel_has_fetch(rel: &substrait::proto::Rel) -> bool {
     match rel.rel_type.as_ref() {
-        Some(RelType::Fetch(f)) => f.count_mode.is_some(),
+        Some(RelType::Fetch(f)) => {
+            f.count_mode.is_some()
+                || f.input.as_ref().map_or(false, |r| rel_has_fetch(r))
+        }
         Some(RelType::Sort(s)) => s.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         Some(RelType::Project(p)) => p.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         Some(RelType::Filter(f)) => f.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         Some(RelType::Aggregate(a)) => a.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         _ => false,
     }
 }
Suggestion importance[1-10]: 3

__

Why: The current code correctly detects a FetchRel with count_mode — the traversal covers the shapes emitted by the DataFusion Substrait producer for TopK. Extending recursion to more node types could be defensive but isn't clearly required for the fragments produced by the rewriter.

Low
Make one-shot provisioning thread-safe

The provisioned flag is checked and set without synchronization, so concurrent test
invocations could both pass the check and perform provisioning twice. Additionally,
if the first attempt fails after partial work, provisioned remains false but the
cluster settings may already be applied; subsequent runs will retry cleanly, but the
double-provision race is still a real hazard. Guard the block with synchronization
to make the one-shot semantics safe.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [34-44]

+private static final Object PROVISION_LOCK = new Object();
 private static volatile boolean provisioned = false;
 private static final String INDEX = "parquet_hits";
 
 private void ensureProvisioned() throws Exception {
-    if (!provisioned) {
+    if (provisioned) return;
+    synchronized (PROVISION_LOCK) {
+        if (provisioned) return;
         DatasetProvisioner.provision(client(), ClickBenchTestHelper.DATASET, 2);
         Request req = new Request("PUT", "/_cluster/settings");
         req.setJsonEntity(
             "{\"persistent\":{\"analytics.shard_bucket_oversampling_factor\": 2.0}}"
         );
         client().performRequest(req);
         provisioned = true;
     }
 }
Suggestion importance[1-10]: 3

__

Why: JUnit test methods typically run sequentially per instance, so the race is unlikely in practice. The change is a minor robustness improvement for a test helper.

Low
Possible issue
Escape PPL string before embedding in JSON

The PPL string is concatenated directly into a JSON body without escaping, so any
double quote or backslash in a query (present in several test cases via regex-like
patterns) would produce invalid JSON. Use a proper JSON serializer or at minimum
escape backslashes and quotes before embedding.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [285-290]

 private Map<String, Object> executePPL(String ppl) throws Exception {
     Request request = new Request("POST", "/_analytics/ppl");
-    request.setJsonEntity("{\"query\": \"" + ppl + "\"}");
+    String escaped = ppl.replace("\\", "\\\\").replace("\"", "\\\"");
+    request.setJsonEntity("{\"query\": \"" + escaped + "\"}");
     Response response = client().performRequest(request);
     return entityAsMap(response);
 }
Suggestion importance[1-10]: 6

__

Why: Inspecting the test queries, none contain double quotes or backslashes, so the current JSON is valid. However, this is a real latent hazard if future test cases add such characters, and the fix is trivial.

Low
Suggestions up to commit ac2aaca
CategorySuggestion                                                                                                                                    Impact
General
Restore CSS setting in finally block

If the assertion fails or executePPL throws, the final setCss("none", 0) is skipped
and CSS remains enabled globally, polluting all subsequent tests in the suite and
causing cascading false failures. Wrap the body in try/finally so the cluster
setting is always restored.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [251-265]

 private void assertCssMatchesNoCss(String ppl) throws Exception {
     setCss("none", 0);
-    List<List<Object>> reference = rowsOf(executePPL(ppl));
+    try {
+        List<List<Object>> reference = rowsOf(executePPL(ppl));
 
-    setCss("all", 4);
-    List<List<Object>> withCss = rowsOf(executePPL(ppl));
+        setCss("all", 4);
+        List<List<Object>> withCss = rowsOf(executePPL(ppl));
 
-    assertEquals(
-        "CSS result differs from no-CSS reference for query: " + ppl,
-        reference,
-        withCss
-    );
-
-    setCss("none", 0);
+        assertEquals(
+            "CSS result differs from no-CSS reference for query: " + ppl,
+            reference,
+            withCss
+        );
+    } finally {
+        setCss("none", 0);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: Correct observation — a test failure or exception leaves CSS enabled globally, polluting subsequent tests. Wrapping in try/finally is a valid and important robustness fix for test isolation.

Medium
Broaden Substrait TopK detection walker

The rel_has_fetch walker only descends into a hardcoded set of RelType variants
(Fetch/Sort/Project/Filter/Aggregate). If the shard fragment ever contains other
wrappers (e.g., Join, Set, Extension, ExtensionSingle) above the FetchRel, TopK
detection will silently return false and CSS correctness will regress. Consider a
generic descent over all single-input rel types or explicitly handle every wrapper
the planner may emit.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [492-501]

 fn rel_has_fetch(rel: &substrait::proto::Rel) -> bool {
     match rel.rel_type.as_ref() {
         Some(RelType::Fetch(f)) => f.count_mode.is_some(),
         Some(RelType::Sort(s)) => s.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         Some(RelType::Project(p)) => p.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         Some(RelType::Filter(f)) => f.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         Some(RelType::Aggregate(a)) => a.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
+        Some(RelType::ExtensionSingle(e)) => e.input.as_ref().map_or(false, |r| rel_has_fetch(r)),
         _ => false,
     }
 }
Suggestion importance[1-10]: 5

__

Why: Broadening the walker to handle more RelType wrappers could improve robustness of TopK detection, though the current planner's output may not emit those wrappers. Moderate defensive improvement.

Low
Possible issue
Recurse before wrapping with PartialReduce

When has_topk is true and PartialReduce is inserted here, the recursion stops — but
the input subtree may still contain nested Final/FinalPartitioned aggregates (e.g.,
chained stats) that will now be left in place instead of stripped. Recurse into
agg.input() first (or otherwise ensure any deeper Final nodes are stripped) before
wrapping, to preserve the invariant that no Final remains under a Partial-mode plan.

sandbox/plugins/analytics-backend-datafusion/rust/src/agg_mode.rs [91-100]

 if has_topk && agg.input().output_partitioning().partition_count() > 1 {
+    let new_input = force_aggregate_mode(Arc::clone(agg.input()), target, has_topk)?;
     return Ok(Arc::new(AggregateExec::try_new(
         AggregateMode::PartialReduce,
         agg.group_expr().clone(),
         agg.aggr_expr().to_vec(),
         agg.filter_expr().to_vec(),
-        Arc::clone(agg.input()),
+        new_input,
         agg.input_schema(),
     )?));
 }
Suggestion importance[1-10]: 6

__

Why: Valid concern that nested Final nodes below the PartialReduce insertion point would not be stripped, potentially breaking the invariant when chained aggregates occur. However, the planner's TopK rewriter now bails on chained stats, mitigating this in practice.

Low
Bail on multiple projects for safety

Passing through subsequent projects while retaining the first seenProject for
sort-key remapping is unsafe: the second project can reorder/drop columns so that a
RexInputRef in seenProject no longer corresponds to the same output field the Sort
references. The updated test comment claims safe-bail, but the code still recurses
instead of returning null. Either bail on any second Project, or re-validate that
the second Project is an identity mapping before continuing.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchTopKRewriter.java [236-242]

 if (node instanceof OpenSearchProject proj) {
     if (proj.getProjects().stream().anyMatch(RexOver::containsOver)) return null; // window fn
-    // Capture the first project for sort-key remapping; pass through subsequent projects.
-    // The rewrite() method validates that the sort key maps through seenProject as a plain
-    // column reference — computed expressions (AVG division, etc.) are rejected there.
-    return findFinalAgg(proj.getInput(), seenProject == null ? proj : seenProject);
+    if (seenProject != null) return null; // multiple projects: bail for safety
+    return findFinalAgg(proj.getInput(), proj);
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion raises a legitimate correctness concern about multiple projects potentially remapping columns unsafely. However, the PR comments claim rewrite() validates the sort key at seenProject, so the safety may already be handled downstream; still a reasonable defensive improvement.

Low
Suggestions up to commit f7e0659
CategorySuggestion                                                                                                                                    Impact
Possible issue
Restrict PartialReduce substitution to hash-partitioned input

When the current node is FinalPartitioned (treated as Final here), its input is
already the RepartitionExec(Hash) → Partial subtree, but when the node is a plain
Final there may only be a CoalescePartitions → Partial beneath it — building a
PartialReduce directly over that input would not do the hash-based group merging the
comment describes and could still yield wrong TopK results. Consider verifying the
input shape (or restricting this branch to FinalPartitioned) before substituting
PartialReduce.

sandbox/plugins/analytics-backend-datafusion/rust/src/agg_mode.rs [91-100]

-if has_topk && agg.input().output_partitioning().partition_count() > 1 {
+if has_topk
+    && agg.mode() == &AggregateMode::FinalPartitioned
+    && agg.input().output_partitioning().partition_count() > 1
+{
     return Ok(Arc::new(AggregateExec::try_new(
         AggregateMode::PartialReduce,
         agg.group_expr().clone(),
         agg.aggr_expr().to_vec(),
         agg.filter_expr().to_vec(),
         Arc::clone(agg.input()),
         agg.input_schema(),
     )?));
 }
Suggestion importance[1-10]: 6

__

Why: The concern about input shape when the node is plain Final (with CoalescePartitions instead of RepartitionExec) is legitimate and could affect correctness, though the actual plans generated appear to use FinalPartitioned in the target scenarios.

Low
Version-guard new wire field for BWC

The wire format changed (writeTo now emits a boolean, and the StreamInput ctor reads
one), but there is no version guard. During a mixed-version cluster or rolling
upgrade, an older node writing no bytes will cause the newer reader to consume the
next field and corrupt the stream. Gate the read/write on the stream version so
older peers still deserialize correctly.

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/PartialAggregateInstructionNode.java [38-40]

 public PartialAggregateInstructionNode(StreamInput in) throws IOException {
-    this.hasTopK = in.readBoolean();
+    this.hasTopK = in.getVersion().onOrAfter(HAS_TOPK_VERSION) ? in.readBoolean() : false;
 }
Suggestion importance[1-10]: 6

__

Why: Wire format compatibility during rolling upgrades is a real concern, though the sandbox nature of this code and the fact that it's a new field being added may make BWC less critical at this stage.

Low
General
Synchronize one-time provisioning to avoid races

The static provisioned flag is checked and set without synchronization, which is
racy when JUnit runs tests concurrently and can lead to duplicate provisioning or a
second thread proceeding before provisioning completes. Guard the block with
synchronization (or use a double-checked-locking pattern) so the setup runs exactly
once and later threads see a fully-completed state.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [34-44]

+private static final Object PROVISION_LOCK = new Object();
 private static volatile boolean provisioned = false;
 private static final String INDEX = "parquet_hits";
 
 private void ensureProvisioned() throws Exception {
-    if (!provisioned) {
+    if (provisioned) return;
+    synchronized (PROVISION_LOCK) {
+        if (provisioned) return;
         DatasetProvisioner.provision(client(), ClickBenchTestHelper.DATASET, 2);
         Request req = new Request("PUT", "/_cluster/settings");
         req.setJsonEntity(
             "{\"persistent\":{\"analytics.shard_bucket_oversampling_factor\": 2.0}}"
         );
         client().performRequest(req);
         provisioned = true;
     }
 }
Suggestion importance[1-10]: 4

__

Why: Adding synchronization is a minor robustness improvement for concurrent test execution, but JUnit tests within a class typically don't run in parallel by default, so the practical impact is limited.

Low
Escape PPL string when building JSON body

The PPL string is inlined into the JSON body via naive concatenation, so any
embedded double-quote or backslash in a future test query will produce invalid JSON.
Escape the query string properly (e.g. via a JSON builder or manual escaping of </code>
and ") to keep the helper robust as more cases are added.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [285-290]

 private Map<String, Object> executePPL(String ppl) throws Exception {
     Request request = new Request("POST", "/_analytics/ppl");
-    request.setJsonEntity("{\"query\": \"" + ppl + "\"}");
+    String escaped = ppl.replace("\\", "\\\\").replace("\"", "\\\"");
+    request.setJsonEntity("{\"query\": \"" + escaped + "\"}");
     Response response = client().performRequest(request);
     return entityAsMap(response);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion is valid defensive coding, but the current test queries don't contain characters that would break the JSON, so the impact is limited to future maintenance.

Low
Suggestions up to commit 99e2b2a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Preserve skip-partial-agg disable for partial aggregates

The previous behavior set the skip-partial-aggregation threshold whenever
has_partial_aggregate was true; the new code only sets it when has_topk is true.
This silently regresses the non-TopK partial-aggregate case where DF may now skip
partial aggregation and produce incorrect coordinator-side combined results. Keep
the threshold override whenever has_partial_aggregate is true (and also when
has_topk is true).

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [207-213]

 let mut config = SessionConfig::new();
 config.options_mut().execution.parquet.pushdown_filters = query_config.listing_table_pushdown_filters;
-// Disable DataFusion's adaptive skip-partial-aggregation when TopK is active:
-// if DF abandons partial agg midstream, the partial state sent to the coordinator
-// would be incomplete, causing TopK to see partial group counts and produce wrong results.
-if has_topk {
+// Disable DataFusion's adaptive skip-partial-aggregation when partial aggregate or TopK is
+// active: skipping partial agg would produce incomplete state for the coordinator merge.
+if has_partial_aggregate || has_topk {
     config.options_mut().execution.skip_partial_aggregation_probe_ratio_threshold = 1.0;
 }
Suggestion importance[1-10]: 9

__

Why: This is a strong catch — the change replaced if has_partial_aggregate with if has_topk, silently regressing the original behavior for partial-aggregate-without-TopK queries. This could cause incorrect aggregation results in the coordinator.

High
General
Restore CSS setting in finally block

If the assertion fails (or the second executePPL throws), the final setCss("none",
0) cleanup is skipped, leaving CSS enabled and contaminating subsequent tests in the
suite. Wrap the comparison in a try/finally so the cluster setting is always
restored.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [249-263]

 private void assertCssMatchesNoCss(String ppl) throws Exception {
     setCss("none", 0);
     List<List<Object>> reference = rowsOf(executePPL(ppl));
 
-    setCss("all", 4);
-    List<List<Object>> withCss = rowsOf(executePPL(ppl));
-
-    assertEquals(
-        "CSS result differs from no-CSS reference for query: " + ppl,
-        reference,
-        withCss
-    );
-
-    setCss("none", 0);
+    try {
+        setCss("all", 4);
+        List<List<Object>> withCss = rowsOf(executePPL(ppl));
+        assertEquals(
+            "CSS result differs from no-CSS reference for query: " + ppl,
+            reference,
+            withCss
+        );
+    } finally {
+        setCss("none", 0);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: Without try/finally, a failing assertion leaves CSS enabled at the cluster level, contaminating subsequent tests. This is a legitimate test-hygiene issue with meaningful impact on test reliability.

Medium
Fix misplaced Javadoc on aggregate helper

The Javadoc describing TopK detection was placed above
containsEngineNativeAggregate, but the method body remains the original
aggregate-mode check. The mismatch between the doc comment and the method it
documents is misleading and suggests a missing containsTopK helper was intended
here. Move the Javadoc to the correct method (or add the missing helper) so the doc
accurately describes the code.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [273-279]

 /**
- * Returns true if the fragment contains a TopK sort — an {@link OpenSearchSort} with a
- * non-null {@code fetch} (i.e. a LIMIT clause). When a TopK is co-located with a partial
- * aggregate, CSS must not split the shard data across partitions because each partition would
- * independently truncate to the TopK limit before the coordinator merge, dropping groups.
+ * Returns true if {@code root} contains an engine-native aggregate node with the given mode.
  */
 private static boolean containsEngineNativeAggregate(RelNode root, AggregateMode mode) {
Suggestion importance[1-10]: 6

__

Why: The Javadoc clearly describes TopK detection but is placed above containsEngineNativeAggregate, which is misleading. Fixing or relocating the doc improves code clarity, though it has no functional impact.

Low
Escape PPL string when building JSON

Building the JSON body via string concatenation will produce invalid JSON if a PPL
query contains a double quote, backslash, or newline. Although current test queries
don't include such characters, this is fragile and will break silently when new
cases are added. Use a JSON builder/escaper (e.g. an XContent builder or a small
escape helper) when embedding ppl into the request entity.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [283-288]

 private Map<String, Object> executePPL(String ppl) throws Exception {
     Request request = new Request("POST", "/_analytics/ppl");
-    request.setJsonEntity("{\"query\": \"" + ppl + "\"}");
+    String escaped = ppl.replace("\\", "\\\\").replace("\"", "\\\"")
+        .replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t");
+    request.setJsonEntity("{\"query\": \"" + escaped + "\"}");
     Response response = client().performRequest(request);
     return entityAsMap(response);
 }
Suggestion importance[1-10]: 4

__

Why: The current queries don't contain special characters, so escaping is preventative rather than fixing a real bug, but adding it improves robustness for future test additions.

Low
Suggestions up to commit 3439805
CategorySuggestion                                                                                                                                    Impact
General
Derive TopK flag per-fragment instead of globally

The topKApplied flag is propagated uniformly to every stage during conversion,
including the coordinator/root stage that does not contain the shard-level partial
aggregate. This may incorrectly set hasTopK=true on PartialAggregateInstructionNode
instances in any stage that has a partial aggregate but where TopK does not actually
co-locate with it. Consider checking per-fragment whether the resolved RelNode
actually contains both a partial aggregate AND an OpenSearchSort with non-null
fetch, rather than relying on a global planner flag.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [88-89]

+public static void convertAll(QueryDAG dag, CapabilityRegistry registry, boolean topKApplied) {
+    convertStage(dag.rootStage(), registry, topKApplied);
 
-
Suggestion importance[1-10]: 6

__

Why: Valid concern that a global topKApplied flag may be over-broad and incorrectly propagated to non-shard stages. However, the suggestion's improved_code is identical to the existing_code, so it points out the issue without providing a concrete fix.

Low
Ensure CSS is reset on test failure

If the assertion fails or executePPL throws after CSS is enabled, CSS remains
enabled for subsequent tests because the reset is unreachable. Wrap the
comparison/execution in a try/finally so CSS is always restored to none, otherwise
cross-test pollution can cause flaky downstream tests.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [249-263]

 private void assertCssMatchesNoCss(String ppl) throws Exception {
     setCss("none", 0);
     List<List<Object>> reference = rowsOf(executePPL(ppl));
 
     setCss("all", 4);
-    List<List<Object>> withCss = rowsOf(executePPL(ppl));
-
-    assertEquals(
-        "CSS result differs from no-CSS reference for query: " + ppl,
-        reference,
-        withCss
-    );
-
-    setCss("none", 0);
+    try {
+        List<List<Object>> withCss = rowsOf(executePPL(ppl));
+        assertEquals(
+            "CSS result differs from no-CSS reference for query: " + ppl,
+            reference,
+            withCss
+        );
+    } finally {
+        setCss("none", 0);
+    }
 }
Suggestion importance[1-10]: 6

__

Why: Valid test-hygiene improvement. Without try/finally, a failed assertion leaves CSS enabled and may pollute subsequent tests, causing flakiness.

Low
Restrict probe-threshold override to TopK case

Setting skip_partial_aggregation_probe_ratio_threshold = 1.0 unconditionally
whenever has_partial_aggregate is true changes behavior for all partial-aggregate
queries, not just TopK ones. This effectively disables partial-aggregation skip
optimization globally and may regress performance for high-cardinality non-TopK
aggregations. Consider gating this on has_topk (which is the case the PR targets) or
document why it must apply to all partial-aggregate fragments.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [213-215]

 config.options_mut().execution.target_partitions = effective_partitions;
 config.options_mut().execution.batch_size = effective_batch_size;
-if has_partial_aggregate {
+if has_partial_aggregate && has_topk {
     config.options_mut().execution.skip_partial_aggregation_probe_ratio_threshold = 1.0;
 }
Suggestion importance[1-10]: 5

__

Why: Reasonable observation that disabling the partial-aggregation skip optimization for all partial-aggregate queries could regress performance, but the PR author may have intentionally chosen this for correctness/safety reasons.

Low
Possible issue
Escape PPL string before JSON interpolation

The PPL string is concatenated directly into a JSON body without escaping. If any
test query contains a double quote, backslash, or control character (e.g., the
regex-style PPL in other tests), the request body becomes invalid JSON. Use a proper
JSON encoder or at minimum escape backslashes and double quotes before
interpolation.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/TopKCssCorrectnessIT.java [283-288]

 private Map<String, Object> executePPL(String ppl) throws Exception {
     Request request = new Request("POST", "/_analytics/ppl");
-    request.setJsonEntity("{\"query\": \"" + ppl + "\"}");
+    String escaped = ppl.replace("\\", "\\\\").replace("\"", "\\\"");
+    request.setJsonEntity("{\"query\": \"" + escaped + "\"}");
     Response response = client().performRequest(request);
     return entityAsMap(response);
 }
Suggestion importance[1-10]: 4

__

Why: The current tests don't include characters needing escaping, but the suggestion is defensive and would prevent future test breakage if queries with quotes/backslashes are added.

Low

Comment thread sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs Outdated
Comment thread sandbox/plugins/analytics-backend-datafusion/rust/src/agg_mode.rs Outdated
@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2f5d91f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f5a5d63

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3439805

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 35482cb

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 99e2b2a

@sandeshkr419 sandeshkr419 force-pushed the fix/disable-skip-partial-agg branch from 99e2b2a to edc9997 Compare July 1, 2026 00:42
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit edc9997

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for edc9997: SUCCESS

@codecov

codecov Bot commented Jul 1, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.36%. Comparing base (1e31adf) to head (f7e0659).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22360      +/-   ##
============================================
+ Coverage     73.34%   73.36%   +0.02%     
- Complexity    76017    76053      +36     
============================================
  Files          6076     6076              
  Lines        345507   345508       +1     
  Branches      49732    49732              
============================================
+ Hits         253409   253487      +78     
+ Misses        71869    71767     -102     
- Partials      20229    20254      +25     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@sandeshkr419 sandeshkr419 force-pushed the fix/disable-skip-partial-agg branch from edc9997 to f7e0659 Compare July 1, 2026 03:24
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f7e0659

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for f7e0659: SUCCESS

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ac2aaca

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 408840c

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 408840c: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@sandeshkr419 sandeshkr419 changed the title [analytics-engine] Improve TopK correctness with CSS: replace Final with PartialReduce using has_topk flag [analytics-engine] Improve TopK correctness with CSS: replace Final with PartialReduce using topk check Jul 1, 2026
Comment thread sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs Outdated
mch2 and others added 5 commits July 1, 2026 22:12
When target_partitions > 1, the shard plan splits the scan into N file
groups and runs a Partial aggregate per partition. The mode-stripping
logic (force_aggregate_mode) previously discarded the FinalPartitioned +
Hash repartition that merged these partitions, leaving TopK to operate
on un-merged per-partition partial counts — incorrectly pruning groups
whose global count is high but per-partition count is low.

Fix: when the Partial aggregate below the Final has multiple output
partitions, replace FinalPartitioned with PartialReduce instead of
stripping it. PartialReduce merges partial accumulator states (calls
merge()) but outputs partial state — preserving the schema contract
with the coordinator's FinalPartitioned while ensuring TopK sees
complete per-shard totals.

Also sets skip_partial_aggregation_probe_ratio_threshold=1.0 for
partial-aggregate shard sessions to prevent DataFusion from abandoning
partial aggregation mid-stream (which would also produce fragmented
results).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…+ PartialReduce

Propagate a hasTopK flag from OpenSearchTopKRewriter through PlannerContext →
FragmentConversionDriver → PartialAggregateInstructionNode → ShardScanExecutionContext
→ NativeBridge → Rust create_session_context → SessionContextHandle.

In prepare_partial_plan, when has_topk=true, replace Final/FinalPartitioned with
PartialReduce instead of stripping it. This keeps the RepartitionExec(Hash) →
Partial(×N) subtree intact so CSS partitions are merged by group key before the
TopK SortExec truncates — preserving CSS scan parallelism while ensuring TopK
sees the complete per-shard dataset.

Without this fix, force_aggregate_mode stripped Final and returned Partial(×N)
directly. Each CSS partition independently truncated to the TopK fetch limit,
dropping groups that were split across partitions.

Update plan shape goldens for all 29 affected TopK queries (q8-q43, prod2s):
shard_physical_1seg and shard_physical_nseg now show AggregateExec(PartialReduce)
above RepartitionExec(Hash) → AggregateExec(Partial), with SortPreservingMergeExec
correctly present above the TopK SortExec.
…lt comparison

13 regression cases for TopK correctness when concurrent segment search is active,
covering all aggregate shapes identified by Aniketh Jain:

case-01: multi-key (SearchEngineID, ClientIP) with count/sum/avg + != filter
case-02: single-key count
case-03: distinct_count (HLL)
case-04: stddev_samp / var_samp / var_pop
case-05: scalar sums (no group-by, no TopK — immunity check)
case-06: offset + limit (head N from M)
case-07: min / max
case-08: avg + sum
case-09a/b/c: three aggregate ordering permutations
case-10: aggregates without aliases
case-11: many aggregates on the same column
case-12: percentile (p50, p95)
case-13: mixed split+non-split (count/sum + percentile)

Each test runs the query with CSS off to get a reference result, then with CSS
on (max_slice_count=4) and asserts exact equality. This catches any regression
where CSS partitions independently truncate before the coordinator merge.
…utor path

The indexed executor (QueryShardExec) calls apply_aggregate_mode with a
hardcoded false for has_topk, so PartialReduce was never applied when queries
used the indexed scan path — which is the production path for all CSS queries.
Only the listing-table path (session_context::prepare_partial_plan) received
the correct has_topk value.

Fix: extract handle.has_topk and pass it to apply_aggregate_mode in
execute_indexed_with_context_inner, matching the session_context.rs path.
- skip_partial_aggregation_probe_ratio_threshold: gate on has_topk instead of
  has_partial_aggregate (only TopK queries need it; non-TopK partial aggregates
  don't risk incomplete partial state), and remove the duplicate setting
- PartialReduce: add partition_count() > 1 guard so it is skipped when the input
  is already single-partition (no CSS) — PartialReduce over one partition is
  redundant and adds unnecessary overhead
…tions

Queries with nested stats (stats A by X | stats B by Y | sort ...) were
producing catastrophically wrong results with TopK enabled. The inner PARTIAL
aggregate's input contains another aggregate (the inner FINAL), but the rewriter
only checked that ER's direct child is PARTIAL — not whether that PARTIAL's
subtree is clean.

When TopK fires on the inner PARTIAL, it truncates groups before the outer
aggregate sees all of them, causing the outer sum/count to receive only a tiny
fraction of the actual groups.

Fix: bail TopK if the matched PARTIAL's input subtree contains any aggregate
node. This covers all chained stats patterns. The coordinator handles these
queries correctly without per-shard TopK.
…n findFinalAgg

In findFinalAgg, any node between the Sort and the target FINAL that consumes
the grouped output makes TopK pushdown unsafe:

1. Non-FINAL OpenSearchAggregate (SINGLE/PARTIAL): chained stats pattern
   (stats A | stats B | sort). TopK on the inner agg truncates groups before
   the outer agg sees all of them, producing wrong totals.

2. OpenSearchProject with RexOver (window function): eventstats sits between
   the Sort and the grouped aggregate. Truncating rows before window evaluation
   produces wrong window partition results.

3. Second Project (when seenProject != null): safely bail rather than accept
   a second project that might carry window expressions or unsafe remappings.

Apply Aniketh Jain's suggested fix exactly: collapse all three cases into
findFinalAgg's early-reject block. Add unit test for chained stats case.
Update testDetection_multipleProjects to reflect new safe-bail behavior.
…breaking

With oversampling factor=2.0 and head 5, two groups with c=6 tie at the boundary
and oversampling doesn't guarantee which survives the shard truncation. CSS and
no-CSS may produce different orderings for tied groups. Switch to head 3 where
the top SearchEngineIDs have distinct counts and results are deterministic.
…sical plan in Rust

Previously, a boolean hasTopK flag was threaded from Java (PlannerContext →
FragmentConversionDriver → PartialAggregateInstructionNode → NativeBridge →
create_session_context) to Rust. Adding a field to PartialAggregateInstructionNode
breaks wire compatibility with nodes running older versions of the plugin.

Fix: detect TopK locally in Rust by walking the physical plan for a SortExec
with fetch.is_some() before calling force_aggregate_mode. This is 1:1 with the
old flag — the TopK Sort inserted by OpenSearchTopKRewriter is the only SortExec
with a fetch limit in the shard fragment.

Remove has_topk from SessionContextHandle, create_session_context signature, and
ffm.rs FFM descriptors. Java wire format is identical to main. The change is
self-contained in Rust.

Also update plan shape goldens for q29/q31/q32/q33 — q33 no longer gets TopK
because its AVG decomposition produces two Projects between Sort and FINAL, which
findFinalAgg correctly bails on (2nd project guard added in earlier commit).
…hysical plan re-scan

Replace plan_has_topk_sort (physical plan walk in prepare_partial_plan) with
substrait_has_fetch_rel (Substrait byte scan in create_session_context):

- substrait_has_fetch_rel: walks the Substrait rel tree looking for FetchRel
  with count_mode.is_some(). A Sort+Limit from OpenSearchTopKRewriter is encoded
  as FetchRel(count=N) wrapping SortRel in the Substrait plan bytes.

- Detection is gated on has_partial_aggregate (short-circuits for single-shard
  where has_partial_aggregate=false — no Substrait parsing, zero cost).

- Result stored on SessionContextHandle.has_topk and reused in prepare_partial_plan,
  removing the need to re-detect from the DataFusion physical plan.

- skip_partial_aggregation_probe_ratio_threshold=1.0 now correctly gated on
  has_topk instead of has_partial_aggregate — avoids performance regression on
  non-TopK multi-shard queries.

Single-shard safety: single-shard uses SINGLE aggregate mode, never emits
SETUP_PARTIAL_AGGREGATE, so has_partial_aggregate=false and has_topk=false.
…_topk gate

- test_substrait_has_fetch_rel_with_fetch: verifies FetchRel(count=N) wrapping
  SortRel is detected as TopK (matches what DataFusion Substrait producer emits
  for Sort(fetch=N) from OpenSearchTopKRewriter)
- test_substrait_has_fetch_rel_without_fetch: SortRel without FetchRel → false
- test_substrait_has_fetch_rel_empty: empty bytes → false (no panic)
- test_skip_partial_agg_disabled_when_has_topk: skip_partial disabled when TopK active
- test_skip_partial_agg_default_when_no_topk: non-TopK retains DF default (0.8)
…edge case, comment fix

- agg_mode.rs: add test_apply_partial_with_topk_produces_partial_reduce — verifies
  that apply_aggregate_mode(Partial, has_topk=true) produces PartialReduce when the
  input has multiple partitions (CSS scenario). Exercises the core correctness path.

- session_context.rs: add test_substrait_has_fetch_rel_with_fetch_no_count_mode —
  verifies FetchRel with count_mode=None is correctly treated as non-TopK (false).

- OpenSearchTopKRewriter.java: clarify findFinalAgg comment on multi-Project pass-through:
  only seenProject (first) is used for collation remapping; rewrite() validates sort
  keys as RexInputRef so computed expressions are rejected there regardless.
- substrait_has_fetch_rel: add TODO for AnalyticsCore flag + note on wire
  upgrade path explaining why Substrait scan was chosen over an explicit
  Java flag (adding fields to PartialAggregateInstructionNode breaks wire
  compat with older nodes during rolling upgrades)

- TopKCssCorrectnessIT: clarify that oversampling factor 2.0 is sufficient
  to reproduce the CSS correctness bug (partition-level truncation fires
  regardless of oversampling), and confirm tests do fail without the fix
OpenSearchTopKRewriter.java:
- nested agg bail: replace hard rejection with TODO — Aniketh notes the
  correctness issue is due to lower default oversampling limits, not a
  fundamental impossibility. Revisit once TopK oversampling factor is
  available as an execution hint.

session_context.rs (substrait_has_fetch_rel):
- explicit match arms for Join, Set, Cross, Read returning false, with
  explanation that shard fragments never contain these from TopKRewriter
- unhandled future rel types: log_debug + return false conservatively
  (don't panic, fall back to non-PartialReduce path safely)

TopKCssCorrectnessIT:
- MULTI_SEGMENT + 0.1 oversampling: makes the CSS truncation bug
  reproducible on the local test cluster (verified: 11/15 fail on main,
  all 15 pass with fix)
- Fix remaining flakiness: testCase08 sorts by SearchEngineID (stable)
  instead of count (ties at low oversampling); all sort-c cases use head 2
@sandeshkr419 sandeshkr419 force-pushed the fix/disable-skip-partial-agg branch from 408840c to 4652831 Compare July 1, 2026 22:13
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4652831

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 4652831: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@sandeshkr419 sandeshkr419 merged commit 8ba892a into opensearch-project:main Jul 1, 2026
16 of 20 checks passed
@sandeshkr419 sandeshkr419 deleted the fix/disable-skip-partial-agg branch July 1, 2026 23:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants