Skip to content

Commit 8d2ee25

Browse files
zhuqi-lucasclaude
andcommitted
fix: address review feedback — avoid stale SPM and decouple spm capture from fetch
- Capture only the LexOrdering from the stripped SortPreservingMergeExec instead of the entire plan node, then reconstruct a fresh SPM with the current (possibly rewritten) child plan in the fallback path. This avoids reusing a stale SPM that references an outdated subtree. - Decouple spm_ordering capture from fetch — use `spm_ordering.is_none()` instead of gating on `fetch.is_none()`, so the SPM ordering is recorded even when an outer operator already set fetch. - Remove unwrap() on with_fetch by constructing SPM directly. - Fix test comment: fallback is CoalescePartitionsExec/SPM, not GlobalLimitExec. - Add regression test: nested CoalescePartitionsExec(fetch=5) over SortPreservingMergeExec(fetch=3) now correctly preserves SPM ordering. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7785c33 commit 8d2ee25

2 files changed

Lines changed: 85 additions & 19 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3934,7 +3934,7 @@ fn coalesce_partitions_fetch_preserved_by_enforce_distribution() -> Result<()> {
39343934

39353935
// The fetch=1 must survive. It can appear either as:
39363936
// - CoalescePartitionsExec: fetch=1 (re-inserted with fetch), or
3937-
// - GlobalLimitExec: skip=0, fetch=1 (fallback when merge wasn't re-added)
3937+
// - SortPreservingMergeExec: fetch=1 (when a merge is re-added with fetch)
39383938
let plan_str = displayable(result.as_ref()).indent(true).to_string();
39393939
assert!(
39403940
plan_str.contains("fetch=1"),
@@ -3994,6 +3994,51 @@ fn spm_fetch_preserved_by_enforce_distribution() -> Result<()> {
39943994
Ok(())
39953995
}
39963996

3997+
/// When both a `CoalescePartitionsExec(fetch=N)` and an inner
3998+
/// `SortPreservingMergeExec(fetch=M)` are stripped, the SPM ordering must
3999+
/// still be captured even though fetch was already set when the SPM is
4000+
/// encountered. The reconstructed fallback should use a
4001+
/// `SortPreservingMergeExec` (not a plain `CoalescePartitionsExec`) to
4002+
/// preserve sort semantics, and must wrap the *rewritten* child plan.
4003+
///
4004+
/// Regression test for: the `spm` capture was gated on `fetch.is_none()`,
4005+
/// causing it to be skipped when an outer operator already set `fetch`.
4006+
#[test]
4007+
fn nested_coalesce_over_spm_preserves_spm_ordering() -> Result<()> {
4008+
let schema = schema();
4009+
let sort_key: LexOrdering = [PhysicalSortExpr {
4010+
expr: col("c", &schema)?,
4011+
options: SortOptions::default(),
4012+
}]
4013+
.into();
4014+
4015+
// Build: CoalescePartitionsExec(fetch=5)
4016+
// -> SortPreservingMergeExec(fetch=3, [c ASC])
4017+
// -> sorted multi-partition parquet
4018+
let parquet = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
4019+
let spm: Arc<dyn ExecutionPlan> = Arc::new(
4020+
SortPreservingMergeExec::new(sort_key, parquet).with_fetch(Some(3)),
4021+
);
4022+
let coalesce_over_spm: Arc<dyn ExecutionPlan> =
4023+
Arc::new(CoalescePartitionsExec::new(spm).with_fetch(Some(5)));
4024+
4025+
let result = ensure_distribution_helper(coalesce_over_spm, 10, false)?;
4026+
4027+
let plan_str = displayable(result.as_ref()).indent(true).to_string();
4028+
// The minimum fetch (min(5,3)=3) must survive.
4029+
assert!(
4030+
plan_str.contains("fetch=3"),
4031+
"fetch=3 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
4032+
);
4033+
// The result should use SortPreservingMergeExec (not CoalescePartitionsExec)
4034+
// to preserve the ordering semantics.
4035+
assert!(
4036+
plan_str.contains("SortPreservingMergeExec"),
4037+
"Expected SortPreservingMergeExec to preserve ordering, but got:\n{plan_str}"
4038+
);
4039+
Ok(())
4040+
}
4041+
39974042
/// When a parent requires SinglePartition and maintains input order, order-preserving
39984043
/// variants (e.g. SortPreservingMergeExec) should be kept so that ordering can
39994044
/// propagate to ancestors. Replacing them with CoalescePartitionsExec would destroy

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ use datafusion_expr::logical_plan::{Aggregate, JoinType};
4040
use datafusion_physical_expr::expressions::{Column, NoOp};
4141
use datafusion_physical_expr::utils::map_columns_before_projection;
4242
use datafusion_physical_expr::{
43-
EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal,
43+
EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalExprRef,
44+
physical_exprs_equal,
4445
};
4546
use datafusion_physical_plan::ExecutionPlanProperties;
4647
use datafusion_physical_plan::aggregates::{
@@ -1023,22 +1024,32 @@ fn remove_dist_changing_operators(
10231024
) -> Result<(
10241025
DistributionContext,
10251026
Option<usize>,
1026-
Option<Arc<dyn ExecutionPlan>>,
1027+
Option<LexOrdering>,
10271028
)> {
10281029
let mut fetch = None;
1029-
let mut spm: Option<Arc<dyn ExecutionPlan>> = None;
1030+
let mut spm_ordering: Option<LexOrdering> = None;
10301031
while is_repartition(&distribution_context.plan)
10311032
|| is_coalesce_partitions(&distribution_context.plan)
10321033
|| is_sort_preserving_merge(&distribution_context.plan)
10331034
{
1035+
// Track whether the stripped operator was a SortPreservingMergeExec,
1036+
// independently of whether it carries a fetch. We only need the
1037+
// ordering so we can reconstruct a fresh SPM later if needed.
1038+
if is_sort_preserving_merge(&distribution_context.plan) && spm_ordering.is_none()
1039+
{
1040+
if let Some(spm) = distribution_context
1041+
.plan
1042+
.as_any()
1043+
.downcast_ref::<SortPreservingMergeExec>()
1044+
{
1045+
spm_ordering = Some(spm.expr().clone());
1046+
}
1047+
}
10341048
// Preserve any `fetch` (limit) that was pushed into a
10351049
// `SortPreservingMergeExec` or `CoalescePartitionsExec` by
10361050
// `LimitPushdown`. Without this, the limit would be lost when
10371051
// the operator is stripped.
10381052
if let Some(child_fetch) = distribution_context.plan.fetch() {
1039-
if is_sort_preserving_merge(&distribution_context.plan) && fetch.is_none() {
1040-
spm = Some(Arc::clone(&distribution_context.plan));
1041-
}
10421053
fetch = Some(fetch.map_or(child_fetch, |f: usize| f.min(child_fetch)));
10431054
}
10441055
// All of above operators have a single child. First child is only child.
@@ -1047,7 +1058,7 @@ fn remove_dist_changing_operators(
10471058
// Note that they will be re-inserted later on if necessary or helpful.
10481059
}
10491060

1050-
Ok((distribution_context, fetch, spm))
1061+
Ok((distribution_context, fetch, spm_ordering))
10511062
}
10521063

10531064
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1250,7 +1261,7 @@ pub fn ensure_distribution(
12501261
children,
12511262
},
12521263
mut fetch,
1253-
spm,
1264+
spm_ordering,
12541265
) = remove_dist_changing_operators(dist_context)?;
12551266

12561267
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
@@ -1518,17 +1529,27 @@ pub fn ensure_distribution(
15181529
// changing operator would be silently lost. Re-introduce it so the
15191530
// query still returns the correct number of rows.
15201531
if let Some(fetch_val) = fetch.take() {
1521-
let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
1522-
// Re-insert the original SortPreservingMergeExec with fetch.
1523-
spm.with_fetch(Some(fetch_val)).unwrap()
1524-
} else {
1525-
// The fetch came from a CoalescePartitionsExec. Re-introduce
1526-
// it as a CoalescePartitionsExec(fetch=N) wrapping the output.
1527-
Arc::new(
1528-
CoalescePartitionsExec::new(Arc::clone(&dist_context.plan))
1532+
let limit_plan: Arc<dyn ExecutionPlan> =
1533+
if let Some(ordering) = spm_ordering {
1534+
// Reconstruct a fresh SortPreservingMergeExec using the
1535+
// captured ordering and the *current* (possibly rewritten)
1536+
// child plan, rather than reusing the stale pre-optimization
1537+
// SPM which may reference an outdated subtree.
1538+
Arc::new(
1539+
SortPreservingMergeExec::new(
1540+
ordering,
1541+
Arc::clone(&dist_context.plan),
1542+
)
15291543
.with_fetch(Some(fetch_val)),
1530-
)
1531-
};
1544+
)
1545+
} else {
1546+
// The fetch came from a CoalescePartitionsExec. Re-introduce
1547+
// it as a CoalescePartitionsExec(fetch=N) wrapping the output.
1548+
Arc::new(
1549+
CoalescePartitionsExec::new(Arc::clone(&dist_context.plan))
1550+
.with_fetch(Some(fetch_val)),
1551+
)
1552+
};
15321553
dist_context = DistributionContext::new(limit_plan, data, vec![dist_context]);
15331554
}
15341555

0 commit comments

Comments
 (0)