Skip to content

Commit 5b0a42e

Browse files
committed
fix: address review feedback — avoid stale SPM and decouple spm capture from fetch
- Capture only the LexOrdering from the stripped SortPreservingMergeExec instead of the entire plan node, then reconstruct a fresh SPM with the current (possibly rewritten) child plan in the fallback path. This avoids reusing a stale SPM that references an outdated subtree. - Decouple spm_ordering capture from fetch — use `spm_ordering.is_none()` instead of gating on `fetch.is_none()`, so the SPM ordering is recorded even when an outer operator already set fetch. - Remove unwrap() on with_fetch by constructing SPM directly. - Fix test comment: fallback is CoalescePartitionsExec/SPM, not GlobalLimitExec. - Add regression test: nested CoalescePartitionsExec(fetch=5) over SortPreservingMergeExec(fetch=3) now correctly preserves SPM ordering.
1 parent 7785c33 commit 5b0a42e

2 files changed

Lines changed: 73 additions & 17 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3934,7 +3934,7 @@ fn coalesce_partitions_fetch_preserved_by_enforce_distribution() -> Result<()> {
39343934

39353935
// The fetch=1 must survive. It can appear either as:
39363936
// - CoalescePartitionsExec: fetch=1 (re-inserted with fetch), or
3937-
// - GlobalLimitExec: skip=0, fetch=1 (fallback when merge wasn't re-added)
3937+
// - SortPreservingMergeExec: fetch=1 (when a merge is re-added with fetch)
39383938
let plan_str = displayable(result.as_ref()).indent(true).to_string();
39393939
assert!(
39403940
plan_str.contains("fetch=1"),
@@ -3994,6 +3994,51 @@ fn spm_fetch_preserved_by_enforce_distribution() -> Result<()> {
39943994
Ok(())
39953995
}
39963996

3997+
/// When both a `CoalescePartitionsExec(fetch=N)` and an inner
3998+
/// `SortPreservingMergeExec(fetch=M)` are stripped, the SPM ordering must
3999+
/// still be captured even though fetch was already set when the SPM is
4000+
/// encountered. The reconstructed fallback should use a
4001+
/// `SortPreservingMergeExec` (not a plain `CoalescePartitionsExec`) to
4002+
/// preserve sort semantics, and must wrap the *rewritten* child plan.
4003+
///
4004+
/// Regression test for: the `spm` capture was gated on `fetch.is_none()`,
4005+
/// causing it to be skipped when an outer operator already set `fetch`.
4006+
#[test]
4007+
fn nested_coalesce_over_spm_preserves_spm_ordering() -> Result<()> {
4008+
let schema = schema();
4009+
let sort_key: LexOrdering = [PhysicalSortExpr {
4010+
expr: col("c", &schema)?,
4011+
options: SortOptions::default(),
4012+
}]
4013+
.into();
4014+
4015+
// Build: CoalescePartitionsExec(fetch=5)
4016+
// -> SortPreservingMergeExec(fetch=3, [c ASC])
4017+
// -> sorted multi-partition parquet
4018+
let parquet = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
4019+
let spm: Arc<dyn ExecutionPlan> = Arc::new(
4020+
SortPreservingMergeExec::new(sort_key, parquet).with_fetch(Some(3)),
4021+
);
4022+
let coalesce_over_spm: Arc<dyn ExecutionPlan> =
4023+
Arc::new(CoalescePartitionsExec::new(spm).with_fetch(Some(5)));
4024+
4025+
let result = ensure_distribution_helper(coalesce_over_spm, 10, false)?;
4026+
4027+
let plan_str = displayable(result.as_ref()).indent(true).to_string();
4028+
// The minimum fetch (min(5,3)=3) must survive.
4029+
assert!(
4030+
plan_str.contains("fetch=3"),
4031+
"fetch=3 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
4032+
);
4033+
// The result should use SortPreservingMergeExec (not CoalescePartitionsExec)
4034+
// to preserve the ordering semantics.
4035+
assert!(
4036+
plan_str.contains("SortPreservingMergeExec"),
4037+
"Expected SortPreservingMergeExec to preserve ordering, but got:\n{plan_str}"
4038+
);
4039+
Ok(())
4040+
}
4041+
39974042
/// When a parent requires SinglePartition and maintains input order, order-preserving
39984043
/// variants (e.g. SortPreservingMergeExec) should be kept so that ordering can
39994044
/// propagate to ancestors. Replacing them with CoalescePartitionsExec would destroy

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ use datafusion_expr::logical_plan::{Aggregate, JoinType};
4040
use datafusion_physical_expr::expressions::{Column, NoOp};
4141
use datafusion_physical_expr::utils::map_columns_before_projection;
4242
use datafusion_physical_expr::{
43-
EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal,
43+
EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalExprRef,
44+
physical_exprs_equal,
4445
};
4546
use datafusion_physical_plan::ExecutionPlanProperties;
4647
use datafusion_physical_plan::aggregates::{
@@ -1017,28 +1018,32 @@ fn add_merge_on_top(
10171018
/// ```text
10181019
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
10191020
/// ```
1020-
#[expect(clippy::type_complexity)]
10211021
fn remove_dist_changing_operators(
10221022
mut distribution_context: DistributionContext,
1023-
) -> Result<(
1024-
DistributionContext,
1025-
Option<usize>,
1026-
Option<Arc<dyn ExecutionPlan>>,
1027-
)> {
1023+
) -> Result<(DistributionContext, Option<usize>, Option<LexOrdering>)> {
10281024
let mut fetch = None;
1029-
let mut spm: Option<Arc<dyn ExecutionPlan>> = None;
1025+
let mut spm_ordering: Option<LexOrdering> = None;
10301026
while is_repartition(&distribution_context.plan)
10311027
|| is_coalesce_partitions(&distribution_context.plan)
10321028
|| is_sort_preserving_merge(&distribution_context.plan)
10331029
{
1030+
// Track whether the stripped operator was a SortPreservingMergeExec,
1031+
// independently of whether it carries a fetch. We only need the
1032+
// ordering so we can reconstruct a fresh SPM later if needed.
1033+
if is_sort_preserving_merge(&distribution_context.plan)
1034+
&& spm_ordering.is_none()
1035+
&& let Some(spm) = distribution_context
1036+
.plan
1037+
.as_any()
1038+
.downcast_ref::<SortPreservingMergeExec>()
1039+
{
1040+
spm_ordering = Some(spm.expr().clone());
1041+
}
10341042
// Preserve any `fetch` (limit) that was pushed into a
10351043
// `SortPreservingMergeExec` or `CoalescePartitionsExec` by
10361044
// `LimitPushdown`. Without this, the limit would be lost when
10371045
// the operator is stripped.
10381046
if let Some(child_fetch) = distribution_context.plan.fetch() {
1039-
if is_sort_preserving_merge(&distribution_context.plan) && fetch.is_none() {
1040-
spm = Some(Arc::clone(&distribution_context.plan));
1041-
}
10421047
fetch = Some(fetch.map_or(child_fetch, |f: usize| f.min(child_fetch)));
10431048
}
10441049
// All of above operators have a single child. First child is only child.
@@ -1047,7 +1052,7 @@ fn remove_dist_changing_operators(
10471052
// Note that they will be re-inserted later on if necessary or helpful.
10481053
}
10491054

1050-
Ok((distribution_context, fetch, spm))
1055+
Ok((distribution_context, fetch, spm_ordering))
10511056
}
10521057

10531058
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1250,7 +1255,7 @@ pub fn ensure_distribution(
12501255
children,
12511256
},
12521257
mut fetch,
1253-
spm,
1258+
spm_ordering,
12541259
) = remove_dist_changing_operators(dist_context)?;
12551260

12561261
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
@@ -1518,9 +1523,15 @@ pub fn ensure_distribution(
15181523
// changing operator would be silently lost. Re-introduce it so the
15191524
// query still returns the correct number of rows.
15201525
if let Some(fetch_val) = fetch.take() {
1521-
let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
1522-
// Re-insert the original SortPreservingMergeExec with fetch.
1523-
spm.with_fetch(Some(fetch_val)).unwrap()
1526+
let limit_plan: Arc<dyn ExecutionPlan> = if let Some(ordering) = spm_ordering {
1527+
// Reconstruct a fresh SortPreservingMergeExec using the
1528+
// captured ordering and the *current* (possibly rewritten)
1529+
// child plan, rather than reusing the stale pre-optimization
1530+
// SPM which may reference an outdated subtree.
1531+
Arc::new(
1532+
SortPreservingMergeExec::new(ordering, Arc::clone(&dist_context.plan))
1533+
.with_fetch(Some(fetch_val)),
1534+
)
15241535
} else {
15251536
// The fetch came from a CoalescePartitionsExec. Re-introduce
15261537
// it as a CoalescePartitionsExec(fetch=N) wrapping the output.

0 commit comments

Comments
 (0)