Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,17 @@ pub fn get_required_group_by_exprs_indices(
for FunctionalDependence {
source_indices,
target_indices,
nullable,
..
} in &dependencies.deps
{
// Skip nullable dependencies: UNIQUE constraints allow NULL values,
// and NULLs are not considered equal in SQL, so two rows with NULL in
// the source key are NOT in the same group. We therefore cannot use a
// nullable FD to eliminate other GROUP BY columns.
if *nullable {
continue;
}
if source_indices
.iter()
.all(|source_idx| groupby_expr_indices.contains(source_idx))
Expand Down
14 changes: 12 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ impl LogicalPlan {
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
// we take the schema of the static term as the schema of the entire recursive query
static_term.schema()
}
}
Expand Down Expand Up @@ -2402,6 +2401,10 @@ impl SubqueryAlias {
// no field must share the same column name as this would lead to ambiguity when referencing
// columns in parent logical nodes.

// Capture whether the input is a RecursiveQuery before `plan` may be
// rebound to a wrapping Projection below.
let is_recursive_query = matches!(plan.as_ref(), LogicalPlan::RecursiveQuery(_));

// Compute unique aliases, if any, for each column of the input's schema.
let aliases = unique_field_aliases(plan.schema().fields());
let is_projection_needed = aliases.iter().any(Option::is_some);
Expand Down Expand Up @@ -2431,7 +2434,14 @@ impl SubqueryAlias {
// Requalify fields with the new `alias`.
let fields = plan.schema().fields().clone();
let meta_data = plan.schema().metadata().clone();
let func_dependencies = plan.schema().functional_dependencies().clone();
// Recursive queries do not expose the anchor's functional dependencies to
// the outer schema — the recursive term can produce rows that violate
// those dependencies, so they are intentionally dropped here.
let func_dependencies = if is_recursive_query {
FunctionalDependencies::empty()
} else {
plan.schema().functional_dependencies().clone()
};

let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
let schema = schema.as_arrow();
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ impl TreeNode for LogicalPlan {
static_term,
recursive_term,
is_distinct,
..
}) => (static_term, recursive_term).map_elements(f)?.update_data(
|(static_term, recursive_term)| {
LogicalPlan::RecursiveQuery(RecursiveQuery {
Expand Down
22 changes: 22 additions & 0 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5641,3 +5641,25 @@ set datafusion.execution.target_partitions = 4;

statement count 0
drop table t;

# Test that GROUP BY with a UNIQUE constraint does not incorrectly collapse
# NULL rows. UNIQUE allows multiple NULLs (NULLs are not equal in SQL), so
# a UNIQUE column cannot be used to eliminate other GROUP BY columns.
# Regression test for https://github.com/apache/datafusion/issues/21507

statement ok
CREATE TABLE t_unique_null(a INT, b INT, c INT, UNIQUE(a));

statement ok
INSERT INTO t_unique_null VALUES (1, 10, 100), (NULL, 20, 200), (NULL, 30, 300);

# The two NULL rows must stay in separate groups (grouped by b as well).
query II rowsort
SELECT a, SUM(c) AS total FROM t_unique_null GROUP BY a, b;
----
1 100
NULL 200
NULL 300

statement ok
DROP TABLE t_unique_null;
7 changes: 4 additions & 3 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,10 @@ async fn multilayer_aggregate() -> Result<()> {
assert_snapshot!(
plan,
@r"
Aggregate: groupBy=[[data.a]], aggr=[[sum(count(data.b)) AS sum(partial_count_b)]]
Aggregate: groupBy=[[data.a]], aggr=[[count(data.b)]]
TableScan: data projection=[a, b]
Projection: data.a, sum(count(data.b)) AS sum(partial_count_b)
Aggregate: groupBy=[[data.a, count(data.b)]], aggr=[[sum(count(data.b))]]
Aggregate: groupBy=[[data.a]], aggr=[[count(data.b)]]
TableScan: data projection=[a, b]
"
);
Ok(())
Expand Down
Loading