Skip to content

feat: support collect_set#3954

Open
comphead wants to merge 2 commits intoapache:mainfrom
comphead:native_datafusion
Open

feat: support collect_set#3954
comphead wants to merge 2 commits intoapache:mainfrom
comphead:native_datafusion

Conversation

@comphead
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2525
Closes #3951

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@comphead comphead changed the title feat: support collect_set WIP feat: support collect_set Apr 18, 2026
@comphead comphead marked this pull request as ready for review April 18, 2026 21:48
Jefffrey pushed a commit to apache/arrow-rs that referenced this pull request Apr 19, 2026
# Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->

- Closes #NNN.

# Rationale for this change

Originally came from
apache/datafusion-comet#3954

Getting the error message which hides the CAST target `data_type`
```
  Cause: org.apache.comet.CometNativeException: External error: Arrow error: Cast error: Cannot cast list to non-list data types
```


<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

# What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

# Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

# Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.

If there are any breaking changes to public APIs, please call them out.
-->
@mbutrovich
Copy link
Copy Markdown
Contributor

mbutrovich commented Apr 20, 2026

Thanks @comphead for adding this! Here's some feedback:

Schema patch (adjustOutputForNativeState)

  • The BinaryType-to-ArrayType schema correction for ObjectHashAggregateExec partial mode is interesting. If collect_list or another TypedImperativeAggregate gets added natively in the future, it'll need a case here too. Might be worth a comment so the next person knows to update this method.
  • The modes != Seq(Partial) early return assumes uniform modes across all aggregate expressions. A brief comment explaining that assumption would help readability.

NaN handling

The Incompatible marking for floating-point types with strictFloatingPoint=true looks correct. Comet's DistinctArrayAggAccumulator deduplicates NaN (since ScalarValue treats NaN == NaN) while Spark does not. The expect_fallback tests for float/double confirm this works.

Docs

docs/source/user-guide/latest/expressions.md has an aggregate expressions table (line ~196) that lists all supported aggregates but doesn't include CollectSet yet. Would be good to add a row there.

Tests

Great coverage across types (bool, byte, short, int, bigint, float, double, string, binary, decimal, date, timestamp) plus NaN/Inf/+0/-0 edge cases and the dictionary encoding config matrix.

A couple of suggestions:

  • Maybe add a test for collect_set(DISTINCT col). It's semantically redundant but exercises a different planner path.
  • Could also consider a HAVING clause test, though less critical.

Benchmarks

The PR doesn't include benchmark results. Since the underlying DistinctArrayAggAccumulator does per-row ScalarValue::try_from_array and hashes into HashSet<ScalarValue>, it would be helpful to see numbers confirming native collect_set is faster than Spark's codegen fallback. Even a quick microbenchmark would give confidence.

Performance (not blocking, future opportunity)

The DistinctArrayAggAccumulator in DataFusion doesn't yet have a GroupsAccumulator implementation, so it takes the per-row accumulator path. Neil Conway has been doing a series of aggregate optimizations upstream (e.g., apache/datafusion#20504 making array_agg 190x faster via deferred materialization, apache/datafusion#20538 using hashbrown for array_distinct). Applying similar patterns to DistinctArrayAggAccumulator in DataFusion would benefit this code automatically. Worth filing an upstream issue if benchmarks show room for improvement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support collect_set built in function Integrate collect_set to Comet

2 participants