Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ serde_json = { workspace = true }
sqlparser = { workspace = true, optional = true }

[dev-dependencies]
criterion = { workspace = true }
ctor = { workspace = true }
env_logger = { workspace = true }
insta = { workspace = true }

[[bench]]
name = "map_expressions"
harness = false
251 changes: 251 additions & 0 deletions datafusion/expr/benches/map_expressions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Micro-benchmark for `LogicalPlan::map_expressions` on Extension nodes.
//!
//! Extension nodes can have many children but no expressions. When
//! `expressions()` returns empty, `map_expressions` should skip the
//! expensive clone-all-inputs + `with_exprs_and_inputs` rebuild.

use arrow::datatypes::{DataType, Field, Schema};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore, col};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

// ---------------------------------------------------------------------------
// Extension node with NO expressions (e.g. OneOf in view matching)
// ---------------------------------------------------------------------------

#[derive(Debug, Clone)]
struct NoExprExtension {
children: Vec<LogicalPlan>,
schema: DFSchemaRef,
}

impl PartialEq for NoExprExtension {
fn eq(&self, other: &Self) -> bool {
self.children.len() == other.children.len()
}
}
impl Eq for NoExprExtension {}

impl Hash for NoExprExtension {
fn hash<H: Hasher>(&self, state: &mut H) {
self.children.len().hash(state);
}
}

impl PartialOrd for NoExprExtension {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.children.len().cmp(&other.children.len()))
}
}

impl UserDefinedLogicalNodeCore for NoExprExtension {
fn name(&self) -> &str {
"NoExprExtension"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
self.children.iter().collect()
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
vec![] // Key: no expressions
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NoExprExtension(children={})", self.children.len())
}
fn with_exprs_and_inputs(
&self,
_: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> Result<Self> {
Ok(Self {
children: inputs,
schema: Arc::clone(&self.schema),
})
}
}

// ---------------------------------------------------------------------------
// Extension node WITH expressions (control group)
// ---------------------------------------------------------------------------

#[derive(Debug, Clone)]
struct WithExprExtension {
children: Vec<LogicalPlan>,
exprs: Vec<Expr>,
schema: DFSchemaRef,
}

impl PartialEq for WithExprExtension {
fn eq(&self, other: &Self) -> bool {
self.children.len() == other.children.len()
&& self.exprs.len() == other.exprs.len()
}
}
impl Eq for WithExprExtension {}

Comment thread
zhuqi-lucas marked this conversation as resolved.
impl Hash for WithExprExtension {
fn hash<H: Hasher>(&self, state: &mut H) {
self.children.len().hash(state);
self.exprs.len().hash(state);
}
}

impl PartialOrd for WithExprExtension {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(
self.children
.len()
.cmp(&other.children.len())
.then(self.exprs.len().cmp(&other.exprs.len())),
)
}
}

impl UserDefinedLogicalNodeCore for WithExprExtension {
fn name(&self) -> &str {
"WithExprExtension"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
self.children.iter().collect()
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
self.exprs.clone()
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"WithExprExtension(children={}, exprs={})",
self.children.len(),
self.exprs.len()
)
}
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> Result<Self> {
Ok(Self {
children: inputs,
exprs,
schema: Arc::clone(&self.schema),
})
}
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

fn build_schema(num_cols: usize) -> DFSchemaRef {
let fields: Vec<Field> = (0..num_cols)
.map(|i| Field::new(format!("col_{i}"), DataType::Utf8, true))
.collect();
Arc::new(DFSchema::try_from(Schema::new(fields)).unwrap())
}

fn build_child(schema: &DFSchemaRef) -> LogicalPlan {
LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation {
produce_one_row: false,
schema: Arc::clone(schema),
})
}

fn build_no_expr_plan(num_children: usize, num_cols: usize) -> LogicalPlan {
let schema = build_schema(num_cols);
let children: Vec<LogicalPlan> =
(0..num_children).map(|_| build_child(&schema)).collect();
LogicalPlan::Extension(Extension {
node: Arc::new(NoExprExtension {
children,
schema: Arc::clone(&schema),
}),
})
}

fn build_with_expr_plan(num_children: usize, num_cols: usize) -> LogicalPlan {
let schema = build_schema(num_cols);
let children: Vec<LogicalPlan> =
(0..num_children).map(|_| build_child(&schema)).collect();
let exprs: Vec<Expr> = (0..3).map(|i| col(format!("col_{i}"))).collect();
LogicalPlan::Extension(Extension {
node: Arc::new(WithExprExtension {
children,
exprs,
schema: Arc::clone(&schema),
}),
})
}

// ---------------------------------------------------------------------------
// Benchmarks
// ---------------------------------------------------------------------------

fn benchmark_map_expressions(c: &mut Criterion) {
let mut group = c.benchmark_group("map_expressions_extension");

let num_cols = 40;

for num_children in [1, 3, 5, 10] {
let no_expr_plan = build_no_expr_plan(num_children, num_cols);
let with_expr_plan = build_with_expr_plan(num_children, num_cols);

group.bench_with_input(
BenchmarkId::new("no_expr", num_children),
&no_expr_plan,
|b, plan| {
b.iter(|| {
let result = plan
.clone()
.map_expressions(|expr| Ok(Transformed::no(expr)))
.unwrap();
std::hint::black_box(result);
});
},
);

group.bench_with_input(
BenchmarkId::new("with_expr", num_children),
&with_expr_plan,
|b, plan| {
b.iter(|| {
let result = plan
.clone()
.map_expressions(|expr| Ok(Transformed::no(expr)))
.unwrap();
std::hint::black_box(result);
});
},
);
}

group.finish();
}

criterion_group!(benches, benchmark_map_expressions);
criterion_main!(benches);
34 changes: 23 additions & 11 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,29 @@ impl LogicalPlan {
.map_elements(f)?
.update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
LogicalPlan::Extension(Extension { node }) => {
// would be nice to avoid this copy -- maybe can
// update extension to just observer Exprs
let exprs = node.expressions().map_elements(f)?;
let plan = LogicalPlan::Extension(Extension {
node: UserDefinedLogicalNode::with_exprs_and_inputs(
node.as_ref(),
exprs.data,
node.inputs().into_iter().cloned().collect::<Vec<_>>(),
)?,
});
Transformed::new(plan, exprs.transformed, exprs.tnr)
let raw_exprs = node.expressions();
if raw_exprs.is_empty() {
// No expressions to transform — skip expensive clone of
// all inputs and reconstruction via with_exprs_and_inputs.
Transformed::no(LogicalPlan::Extension(Extension { node }))
} else {
// TODO: a more general optimization would be to change
// `UserDefinedLogicalNode::expressions()` to return
// references (`&[Expr]`) instead of cloned `Vec<Expr>`,
// and only clone + rebuild when the transform actually
// modifies an expression. This would avoid the clone +
// `with_exprs_and_inputs` rebuild even for non-empty
// expression lists when the transform is a no-op.
let exprs = raw_exprs.map_elements(f)?;
let plan = LogicalPlan::Extension(Extension {
node: UserDefinedLogicalNode::with_exprs_and_inputs(
node.as_ref(),
exprs.data,
node.inputs().into_iter().cloned().collect::<Vec<_>>(),
)?,
});
Transformed::new(plan, exprs.transformed, exprs.tnr)
}
}
LogicalPlan::TableScan(TableScan {
table_name,
Expand Down
Loading