[analytics-engine] Improve TopK correctness with CSS: replace Final with PartialReduce using topk check#22360
Conversation
PR Reviewer Guide 🔍(Review updated until commit 4652831)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 4652831 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 408840c
Suggestions up to commit ac2aaca
Suggestions up to commit f7e0659
Suggestions up to commit 99e2b2a
Suggestions up to commit 3439805
|
|
❌ Gradle check result for 2f5d91f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit f5a5d63 |
|
Persistent review updated to latest commit 3439805 |
|
Persistent review updated to latest commit 35482cb |
|
Persistent review updated to latest commit 99e2b2a |
99e2b2a to
edc9997
Compare
|
Persistent review updated to latest commit edc9997 |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #22360 +/- ##
============================================
+ Coverage 73.34% 73.36% +0.02%
- Complexity 76017 76053 +36
============================================
Files 6076 6076
Lines 345507 345508 +1
Branches 49732 49732
============================================
+ Hits 253409 253487 +78
+ Misses 71869 71767 -102
- Partials 20229 20254 +25 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
edc9997 to
f7e0659
Compare
|
Persistent review updated to latest commit f7e0659 |
|
Persistent review updated to latest commit ac2aaca |
|
Persistent review updated to latest commit 408840c |
|
❌ Gradle check result for 408840c: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
has_topk flagWhen target_partitions > 1, the shard plan splits the scan into N file groups and runs a Partial aggregate per partition. The mode-stripping logic (force_aggregate_mode) previously discarded the FinalPartitioned + Hash repartition that merged these partitions, leaving TopK to operate on un-merged per-partition partial counts — incorrectly pruning groups whose global count is high but per-partition count is low. Fix: when the Partial aggregate below the Final has multiple output partitions, replace FinalPartitioned with PartialReduce instead of stripping it. PartialReduce merges partial accumulator states (calls merge()) but outputs partial state — preserving the schema contract with the coordinator's FinalPartitioned while ensuring TopK sees complete per-shard totals. Also sets skip_partial_aggregation_probe_ratio_threshold=1.0 for partial-aggregate shard sessions to prevent DataFusion from abandoning partial aggregation mid-stream (which would also produce fragmented results). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…+ PartialReduce Propagate a hasTopK flag from OpenSearchTopKRewriter through PlannerContext → FragmentConversionDriver → PartialAggregateInstructionNode → ShardScanExecutionContext → NativeBridge → Rust create_session_context → SessionContextHandle. In prepare_partial_plan, when has_topk=true, replace Final/FinalPartitioned with PartialReduce instead of stripping it. This keeps the RepartitionExec(Hash) → Partial(×N) subtree intact so CSS partitions are merged by group key before the TopK SortExec truncates — preserving CSS scan parallelism while ensuring TopK sees the complete per-shard dataset. Without this fix, force_aggregate_mode stripped Final and returned Partial(×N) directly. Each CSS partition independently truncated to the TopK fetch limit, dropping groups that were split across partitions. Update plan shape goldens for all 29 affected TopK queries (q8-q43, prod2s): shard_physical_1seg and shard_physical_nseg now show AggregateExec(PartialReduce) above RepartitionExec(Hash) → AggregateExec(Partial), with SortPreservingMergeExec correctly present above the TopK SortExec.
…lt comparison 13 regression cases for TopK correctness when concurrent segment search is active, covering all aggregate shapes identified by Aniketh Jain: case-01: multi-key (SearchEngineID, ClientIP) with count/sum/avg + != filter case-02: single-key count case-03: distinct_count (HLL) case-04: stddev_samp / var_samp / var_pop case-05: scalar sums (no group-by, no TopK — immunity check) case-06: offset + limit (head N from M) case-07: min / max case-08: avg + sum case-09a/b/c: three aggregate ordering permutations case-10: aggregates without aliases case-11: many aggregates on the same column case-12: percentile (p50, p95) case-13: mixed split+non-split (count/sum + percentile) Each test runs the query with CSS off to get a reference result, then with CSS on (max_slice_count=4) and asserts exact equality. This catches any regression where CSS partitions independently truncate before the coordinator merge.
…utor path The indexed executor (QueryShardExec) calls apply_aggregate_mode with a hardcoded false for has_topk, so PartialReduce was never applied when queries used the indexed scan path — which is the production path for all CSS queries. Only the listing-table path (session_context::prepare_partial_plan) received the correct has_topk value. Fix: extract handle.has_topk and pass it to apply_aggregate_mode in execute_indexed_with_context_inner, matching the session_context.rs path.
- skip_partial_aggregation_probe_ratio_threshold: gate on has_topk instead of has_partial_aggregate (only TopK queries need it; non-TopK partial aggregates don't risk incomplete partial state), and remove the duplicate setting - PartialReduce: add partition_count() > 1 guard so it is skipped when the input is already single-partition (no CSS) — PartialReduce over one partition is redundant and adds unnecessary overhead
…tions Queries with nested stats (stats A by X | stats B by Y | sort ...) were producing catastrophically wrong results with TopK enabled. The inner PARTIAL aggregate's input contains another aggregate (the inner FINAL), but the rewriter only checked that ER's direct child is PARTIAL — not whether that PARTIAL's subtree is clean. When TopK fires on the inner PARTIAL, it truncates groups before the outer aggregate sees all of them, causing the outer sum/count to receive only a tiny fraction of the actual groups. Fix: bail TopK if the matched PARTIAL's input subtree contains any aggregate node. This covers all chained stats patterns. The coordinator handles these queries correctly without per-shard TopK.
…n findFinalAgg In findFinalAgg, any node between the Sort and the target FINAL that consumes the grouped output makes TopK pushdown unsafe: 1. Non-FINAL OpenSearchAggregate (SINGLE/PARTIAL): chained stats pattern (stats A | stats B | sort). TopK on the inner agg truncates groups before the outer agg sees all of them, producing wrong totals. 2. OpenSearchProject with RexOver (window function): eventstats sits between the Sort and the grouped aggregate. Truncating rows before window evaluation produces wrong window partition results. 3. Second Project (when seenProject != null): safely bail rather than accept a second project that might carry window expressions or unsafe remappings. Apply Aniketh Jain's suggested fix exactly: collapse all three cases into findFinalAgg's early-reject block. Add unit test for chained stats case. Update testDetection_multipleProjects to reflect new safe-bail behavior.
…breaking With oversampling factor=2.0 and head 5, two groups with c=6 tie at the boundary and oversampling doesn't guarantee which survives the shard truncation. CSS and no-CSS may produce different orderings for tied groups. Switch to head 3 where the top SearchEngineIDs have distinct counts and results are deterministic.
…sical plan in Rust Previously, a boolean hasTopK flag was threaded from Java (PlannerContext → FragmentConversionDriver → PartialAggregateInstructionNode → NativeBridge → create_session_context) to Rust. Adding a field to PartialAggregateInstructionNode breaks wire compatibility with nodes running older versions of the plugin. Fix: detect TopK locally in Rust by walking the physical plan for a SortExec with fetch.is_some() before calling force_aggregate_mode. This is 1:1 with the old flag — the TopK Sort inserted by OpenSearchTopKRewriter is the only SortExec with a fetch limit in the shard fragment. Remove has_topk from SessionContextHandle, create_session_context signature, and ffm.rs FFM descriptors. Java wire format is identical to main. The change is self-contained in Rust. Also update plan shape goldens for q29/q31/q32/q33 — q33 no longer gets TopK because its AVG decomposition produces two Projects between Sort and FINAL, which findFinalAgg correctly bails on (2nd project guard added in earlier commit).
…hysical plan re-scan Replace plan_has_topk_sort (physical plan walk in prepare_partial_plan) with substrait_has_fetch_rel (Substrait byte scan in create_session_context): - substrait_has_fetch_rel: walks the Substrait rel tree looking for FetchRel with count_mode.is_some(). A Sort+Limit from OpenSearchTopKRewriter is encoded as FetchRel(count=N) wrapping SortRel in the Substrait plan bytes. - Detection is gated on has_partial_aggregate (short-circuits for single-shard where has_partial_aggregate=false — no Substrait parsing, zero cost). - Result stored on SessionContextHandle.has_topk and reused in prepare_partial_plan, removing the need to re-detect from the DataFusion physical plan. - skip_partial_aggregation_probe_ratio_threshold=1.0 now correctly gated on has_topk instead of has_partial_aggregate — avoids performance regression on non-TopK multi-shard queries. Single-shard safety: single-shard uses SINGLE aggregate mode, never emits SETUP_PARTIAL_AGGREGATE, so has_partial_aggregate=false and has_topk=false.
…_topk gate - test_substrait_has_fetch_rel_with_fetch: verifies FetchRel(count=N) wrapping SortRel is detected as TopK (matches what DataFusion Substrait producer emits for Sort(fetch=N) from OpenSearchTopKRewriter) - test_substrait_has_fetch_rel_without_fetch: SortRel without FetchRel → false - test_substrait_has_fetch_rel_empty: empty bytes → false (no panic) - test_skip_partial_agg_disabled_when_has_topk: skip_partial disabled when TopK active - test_skip_partial_agg_default_when_no_topk: non-TopK retains DF default (0.8)
…edge case, comment fix - agg_mode.rs: add test_apply_partial_with_topk_produces_partial_reduce — verifies that apply_aggregate_mode(Partial, has_topk=true) produces PartialReduce when the input has multiple partitions (CSS scenario). Exercises the core correctness path. - session_context.rs: add test_substrait_has_fetch_rel_with_fetch_no_count_mode — verifies FetchRel with count_mode=None is correctly treated as non-TopK (false). - OpenSearchTopKRewriter.java: clarify findFinalAgg comment on multi-Project pass-through: only seenProject (first) is used for collation remapping; rewrite() validates sort keys as RexInputRef so computed expressions are rejected there regardless.
- substrait_has_fetch_rel: add TODO for AnalyticsCore flag + note on wire upgrade path explaining why Substrait scan was chosen over an explicit Java flag (adding fields to PartialAggregateInstructionNode breaks wire compat with older nodes during rolling upgrades) - TopKCssCorrectnessIT: clarify that oversampling factor 2.0 is sufficient to reproduce the CSS correctness bug (partition-level truncation fires regardless of oversampling), and confirm tests do fail without the fix
OpenSearchTopKRewriter.java: - nested agg bail: replace hard rejection with TODO — Aniketh notes the correctness issue is due to lower default oversampling limits, not a fundamental impossibility. Revisit once TopK oversampling factor is available as an execution hint. session_context.rs (substrait_has_fetch_rel): - explicit match arms for Join, Set, Cross, Read returning false, with explanation that shard fragments never contain these from TopKRewriter - unhandled future rel types: log_debug + return false conservatively (don't panic, fall back to non-PartialReduce path safely) TopKCssCorrectnessIT: - MULTI_SEGMENT + 0.1 oversampling: makes the CSS truncation bug reproducible on the local test cluster (verified: 11/15 fail on main, all 15 pass with fix) - Fix remaining flakiness: testCase08 sorts by SearchEngineID (stable) instead of count (ties at low oversampling); all sort-c cases use head 2
408840c to
4652831
Compare
|
Persistent review updated to latest commit 4652831 |
|
❌ Gradle check result for 4652831: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Co-Authored-By: @mch2 & @expani
Problem
When concurrent segment search (CSS) is active, DataFusion's physical optimizer inserts
RepartitionExec(RoundRobin, N) → AggregateExec(Partial, ×N) → AggregateExec(FinalPartitioned)on the shard fragment.prepare_partial_plancallsforce_aggregate_mode(Partial)which stripsFinalPartitionedand returns thePartial(×N)subtree directly.Each of the N CSS partitions then runs the TopK
SortExecindependently, truncating to the TopK fetch limit before the coordinator merge. Groups split across partitions are dropped, producing incorrect counts (e.g. q31 on clickbench dataset returnsc=313instead ofc=1633).Fix
Propagate a
hasTopKboolean fromOpenSearchTopKRewriterthrough the planner pipeline:PlannerContext→FragmentConversionDriver→PartialAggregateInstructionNode→ShardScanExecutionContext→NativeBridge→ Rustcreate_session_context→SessionContextHandleIn
prepare_partial_plan, whenhas_topk=true,force_aggregate_modereplacesFinalPartitionedwithPartialReduceinstead of stripping it.PartialReducekeepsagg.input()intact —RepartitionExec(Hash) → Partial(×N)— so CSS partitions are merged by group key before the TopKSortExectruncates. The hash repartition ensures each group key lands in exactly one partition, soPartialReduceproduces complete per-key partial state.CSS scan parallelism is fully preserved: the scan + partial agg still run ×N in parallel. Only a merge step is added before TopK. Non-TopK queries (
has_topk=false) are unaffected.Changes
PlannerContext.setTopKAppliedset byPlannerImplafterOpenSearchTopKRewriterfires; threaded throughFragmentConversionDriver→PartialAggregateInstructionNode.hasTopK→ShardScanExecutionContext.hasTopK→NativeBridge.createSessionContextagg_mode.rs:force_aggregate_modeacceptshas_topk: bool; when true and hittingFinal/FinalPartitioned, returnsPartialReduce(input=agg.input())instead of stripping. Removes Marc'spartition_count() > 1check —has_topkfrom the planner is the authoritative signalsession_context.rs:SessionContextHandle.has_topkfield;prepare_partial_planpasses it toapply_aggregate_modeAggregateExec(PartialReduce)aboveRepartitionExec(Hash) → AggregateExec(Partial)Testing
integTest: full QA suite passesintegTestPlanShape: all 29 affected golden files verifiedCheck List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.