Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,28 +683,37 @@ impl Unparser<'_> {

let join_filters = if table_scan_filters.is_empty() {
join.filter.clone()
} else if join.join_type == JoinType::Inner {
// For inner joins, ON and WHERE are semantically equivalent. Place table_scan_filters in WHERE because they may contain
// subquery expressions, which break dialects that reject subqueries in JOIN ON (e.g. BigQuery).
for filter in table_scan_filters {
let filter_expr = self.expr_to_sql(&filter)?;
select.selection(Some(filter_expr));
}
join.filter.clone()
} else {
// Combine `table_scan_filters` into a single filter using `AND`
let Some(combined_filters) =
let combined_filters =
table_scan_filters.into_iter().reduce(|acc, filter| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(filter),
})
})
else {
return internal_err!("Failed to combine TableScan filters");
};
});

// Combine `join.filter` with `combined_filters` using `AND`
match &join.filter {
Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
left: Box::new(filter.clone()),
op: Operator::And,
right: Box::new(combined_filters),
})),
None => Some(combined_filters),
match (&join.filter, combined_filters) {
(Some(filter), Some(combined)) => {
Some(Expr::BinaryExpr(BinaryExpr {
left: Box::new(filter.clone()),
op: Operator::And,
right: Box::new(combined),
}))
}
(Some(filter), None) => Some(filter.clone()),
(None, Some(combined)) => Some(combined),
(None, None) => None,
}
};

Expand Down
42 changes: 37 additions & 5 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_expr::test::function_stub::{
use datafusion_expr::{
EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union,
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame,
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
WindowFunctionDefinition, cast, col, lit, scalar_subquery, table_scan, wildcard,
};
use datafusion_functions::unicode;
use datafusion_functions_aggregate::grouping::grouping_udaf;
Expand Down Expand Up @@ -1828,7 +1828,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_with_filter)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND ("left"."name" LIKE 'some_name' AND (age > 10)))"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
);

let join_plan_no_filter = LogicalPlanBuilder::from(left_plan.clone())
Expand All @@ -1843,7 +1843,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_no_filter)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id WHERE "left"."name" LIKE 'some_name' AND (age > 10)"#
);

let right_plan_with_filter = table_scan_with_filters(
Expand All @@ -1868,7 +1868,7 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_multiple_filters)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val')) AND (age > 10))) WHERE ("left"."name" = 'after_join_filter_val')"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE ("left"."name" = 'after_join_filter_val') AND "left"."name" LIKE 'some_name' AND (right_table."name" = 'before_join_filter_val') AND (age > 10)"#
);

let right_plan_with_filter_schema = table_scan_with_filters(
Expand Down Expand Up @@ -1898,7 +1898,39 @@ fn test_join_with_table_scan_filters() -> Result<()> {
let sql = plan_to_sql(&join_plan_duplicated_filter)?;
assert_snapshot!(
sql,
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table.age > 10)) AND (right_table.age < 11)))"#
@r#"SELECT * FROM left_table AS "left" INNER JOIN right_table ON "left".id = right_table.id AND ("left".id > 5) WHERE "left"."name" LIKE 'some_name' AND (right_table.age > 10) AND (right_table.age < 11)"#
);

// Inner join with a scalar subquery in table_scan_filters. The subquery filter should appear in WHERE, not in JOIN ON,
// since dialects like BigQuery reject subqueries in join predicates.
let schema_subquery = Schema::new(vec![Field::new("id", DataType::Utf8, false)]);
let subquery_plan = table_scan(Some("subquery_table"), &schema_subquery, None)?
.aggregate(vec![] as Vec<Expr>, vec![max(col("subquery_table.id"))])?
.build()?;
let right_plan_with_subquery = table_scan_with_filters(
Some("right_table"),
&schema_right,
None,
vec![col("right_table.id").eq(scalar_subquery(Arc::new(subquery_plan)))],
)?
.build()?;

let left_plan =
table_scan(Some("left_table"), &schema_left, Some(vec![0, 1]))?.build()?;

let join_plan_subquery_filter = LogicalPlanBuilder::from(left_plan)
.join(
right_plan_with_subquery,
datafusion_expr::JoinType::Inner,
(vec!["left_table.id"], vec!["right_table.id"]),
None,
)?
.build()?;

let sql = plan_to_sql(&join_plan_subquery_filter)?;
assert_snapshot!(
sql,
@r#"SELECT left_table.id, left_table."name" FROM left_table INNER JOIN right_table ON left_table.id = right_table.id WHERE (right_table.id = (SELECT max(subquery_table.id) FROM subquery_table))"#
);

Ok(())
Expand Down
Loading