-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Misc minor optimizations to query optimizer performance #21128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
eec93a2
f23a621
532b74e
9b9e4f5
e53a07f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -320,10 +320,8 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> { | |
| /// * do nothing. | ||
| fn extract_or_clauses_for_join<'a>( | ||
| filters: &'a [Expr], | ||
| schema: &'a DFSchema, | ||
| schema_cols: &'a HashSet<Column>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to have owned Columns? Maybe this could be something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it'll work with &Column here, but I do think its possible to avoid the String allocation and everything here seems like internal functions, I'll try it out
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed a change, it widen the scope by a bit but basically just introduces a type that holds two references and passes it around, everything is contained within the module. |
||
| ) -> impl Iterator<Item = Expr> + 'a { | ||
| let schema_columns = schema_columns(schema); | ||
|
|
||
| // new formed OR clauses and their column references | ||
| filters.iter().filter_map(move |expr| { | ||
| if let Expr::BinaryExpr(BinaryExpr { | ||
|
|
@@ -332,8 +330,8 @@ fn extract_or_clauses_for_join<'a>( | |
| right, | ||
| }) = expr | ||
| { | ||
| let left_expr = extract_or_clause(left.as_ref(), &schema_columns); | ||
| let right_expr = extract_or_clause(right.as_ref(), &schema_columns); | ||
| let left_expr = extract_or_clause(left.as_ref(), schema_cols); | ||
| let right_expr = extract_or_clause(right.as_ref(), schema_cols); | ||
|
|
||
| // If nothing can be extracted from any sub clauses, do nothing for this OR clause. | ||
| if let (Some(left_expr), Some(right_expr)) = (left_expr, right_expr) { | ||
|
|
@@ -421,6 +419,10 @@ fn push_down_all_join( | |
| // 3) should be kept as filter conditions | ||
| let left_schema = join.left.schema(); | ||
| let right_schema = join.right.schema(); | ||
|
|
||
| let left_schema_columns = schema_columns(left_schema.as_ref()); | ||
|
alamb marked this conversation as resolved.
|
||
| let right_schema_columns = schema_columns(right_schema.as_ref()); | ||
|
|
||
| let mut left_push = vec![]; | ||
| let mut right_push = vec![]; | ||
| let mut keep_predicates = vec![]; | ||
|
|
@@ -467,26 +469,38 @@ fn push_down_all_join( | |
| // Extract from OR clause, generate new predicates for both side of join if possible. | ||
| // We only track the unpushable predicates above. | ||
| if left_preserved { | ||
| left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema)); | ||
| left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema)); | ||
| left_push.extend(extract_or_clauses_for_join( | ||
| &keep_predicates, | ||
| &left_schema_columns, | ||
| )); | ||
| left_push.extend(extract_or_clauses_for_join( | ||
| &join_conditions, | ||
| &left_schema_columns, | ||
| )); | ||
| } | ||
| if right_preserved { | ||
| right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema)); | ||
| right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema)); | ||
| right_push.extend(extract_or_clauses_for_join( | ||
| &keep_predicates, | ||
| &right_schema_columns, | ||
| )); | ||
| right_push.extend(extract_or_clauses_for_join( | ||
| &join_conditions, | ||
| &right_schema_columns, | ||
| )); | ||
| } | ||
|
|
||
| // For predicates from join filter, we should check with if a join side is preserved | ||
| // in term of join filtering. | ||
| if on_left_preserved { | ||
| left_push.extend(extract_or_clauses_for_join( | ||
| &on_filter_join_conditions, | ||
| left_schema, | ||
| &left_schema_columns, | ||
| )); | ||
| } | ||
| if on_right_preserved { | ||
| right_push.extend(extract_or_clauses_for_join( | ||
| &on_filter_join_conditions, | ||
| right_schema, | ||
| &right_schema_columns, | ||
| )); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,12 +47,12 @@ impl OptimizerRule for PushDownLimit { | |
| true | ||
| } | ||
|
|
||
| #[expect(clippy::only_used_in_recursion)] | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is just wasteful, just a lint away.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As in you plan to remove it as a follow on PR? |
||
| fn rewrite( | ||
| &self, | ||
| plan: LogicalPlan, | ||
| config: &dyn OptimizerConfig, | ||
| ) -> Result<Transformed<LogicalPlan>> { | ||
| let _ = config.options(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is weird
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to answer your question above, I think the lint has to stay here because this seems worse, and as far as I can tell in this rule the config is just passed along recursively |
||
| let LogicalPlan::Limit(mut limit) = plan else { | ||
| return Ok(Transformed::no(plan)); | ||
| }; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ use std::borrow::Cow; | |
| use std::collections::HashSet; | ||
| use std::ops::Not; | ||
| use std::sync::Arc; | ||
| use std::sync::LazyLock; | ||
|
|
||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_common::nested_struct::has_one_of_more_common_fields; | ||
|
|
@@ -498,8 +499,6 @@ struct ConstEvaluator { | |
| /// The `config_options` are passed from the session to allow scalar functions | ||
| /// to access configuration like timezone. | ||
| execution_props: ExecutionProps, | ||
| input_schema: DFSchema, | ||
| input_batch: RecordBatch, | ||
| } | ||
|
|
||
| /// The simplify result of ConstEvaluator | ||
|
|
@@ -575,6 +574,18 @@ impl TreeNodeRewriter for ConstEvaluator { | |
| } | ||
| } | ||
|
|
||
| static DUMMY_SCHEMA: LazyLock<Arc<Schema>> = | ||
| LazyLock::new(|| Arc::new(Schema::new(vec![Field::new(".", DataType::Null, true)]))); | ||
|
|
||
| static DUMMY_DF_SCHEMA: LazyLock<DFSchema> = | ||
| LazyLock::new(|| DFSchema::try_from(Arc::clone(&*DUMMY_SCHEMA)).unwrap()); | ||
|
|
||
| static DUMMY_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| { | ||
| // Need a single "input" row to produce a single output row | ||
| let col = new_null_array(&DataType::Null, 1); | ||
| RecordBatch::try_new(DUMMY_SCHEMA.clone(), vec![col]).unwrap() | ||
| }); | ||
|
|
||
| impl ConstEvaluator { | ||
| /// Create a new `ConstantEvaluator`. | ||
| /// | ||
|
|
@@ -588,25 +599,13 @@ impl ConstEvaluator { | |
| pub fn try_new(config_options: Option<Arc<ConfigOptions>>) -> Result<Self> { | ||
| // The dummy column name is unused and doesn't matter as only | ||
| // expressions without column references can be evaluated | ||
| static DUMMY_COL_NAME: &str = "."; | ||
| let schema = Arc::new(Schema::new(vec![Field::new( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is nice to save several callocations for each call to the Const evaluator 👍 |
||
| DUMMY_COL_NAME, | ||
| DataType::Null, | ||
| true, | ||
| )])); | ||
| let input_schema = DFSchema::try_from(Arc::clone(&schema))?; | ||
| // Need a single "input" row to produce a single output row | ||
| let col = new_null_array(&DataType::Null, 1); | ||
| let input_batch = RecordBatch::try_new(schema, vec![col])?; | ||
|
|
||
| let mut execution_props = ExecutionProps::new(); | ||
| execution_props.config_options = config_options; | ||
|
|
||
| Ok(Self { | ||
| can_evaluate: vec![], | ||
| execution_props, | ||
| input_schema, | ||
| input_batch, | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -702,16 +701,13 @@ impl ConstEvaluator { | |
| return ConstSimplifyResult::NotSimplified(s, m); | ||
| } | ||
|
|
||
| let phys_expr = match create_physical_expr( | ||
| &expr, | ||
| &self.input_schema, | ||
| &self.execution_props, | ||
| ) { | ||
| Ok(e) => e, | ||
| Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), | ||
| }; | ||
| let phys_expr = | ||
| match create_physical_expr(&expr, &DUMMY_DF_SCHEMA, &self.execution_props) { | ||
| Ok(e) => e, | ||
| Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), | ||
| }; | ||
| let metadata = phys_expr | ||
| .return_field(self.input_batch.schema_ref()) | ||
| .return_field(DUMMY_BATCH.schema_ref()) | ||
| .ok() | ||
| .and_then(|f| { | ||
| let m = f.metadata(); | ||
|
|
@@ -720,7 +716,7 @@ impl ConstEvaluator { | |
| false => Some(FieldMetadata::from(m)), | ||
| } | ||
| }); | ||
| let col_val = match phys_expr.evaluate(&self.input_batch) { | ||
| let col_val = match phys_expr.evaluate(&DUMMY_BATCH) { | ||
| Ok(v) => v, | ||
| Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr), | ||
| }; | ||
|
|
@@ -1698,10 +1694,11 @@ impl TreeNodeRewriter for Simplifier<'_> { | |
| { | ||
| // Repeated occurrences of wildcard are redundant so remove them | ||
| // exp LIKE '%%' --> exp LIKE '%' | ||
| let simplified_pattern = Regex::new("%%+") | ||
| .unwrap() | ||
| .replace_all(pattern_str, "%") | ||
| .to_string(); | ||
|
|
||
| static LIKE_REGEX: LazyLock<Regex> = | ||
| LazyLock::new(|| Regex::new("%%+").unwrap()); | ||
| let simplified_pattern = | ||
| LIKE_REGEX.replace_all(pattern_str, "%").to_string(); | ||
| Transformed::yes(Expr::Like(Like { | ||
| pattern: Box::new( | ||
| string_scalar.to_expr(&simplified_pattern), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty DFSchema isn't free, similar to #20534