Skip to content

feat: Add native support for max_by and min_by#3969

Open
pchintar wants to merge 3 commits intoapache:mainfrom
pchintar:maxmin-by-native
Open

feat: Add native support for max_by and min_by#3969
pchintar wants to merge 3 commits intoapache:mainfrom
pchintar:maxmin-by-native

Conversation

@pchintar
Copy link
Copy Markdown

@pchintar pchintar commented Apr 16, 2026

Which issue does this PR close?

Closes #3841 .

Rationale for this change

This change adds native support for MAX_BY and MIN_BY.

These aggregates are commonly used in grouped queries. Without native support, they fall back, which prevents execution from staying within Comet’s aggregation pipeline. This change enables them to run natively and align their behavior with Spark.

What changes are included in this PR?

  • Added a native implementation for max_by and min_by (maxmin_by.rs) using a shared design

    • maintains the current best (value, ordering) pair per group
    • updates and merges state using ordering comparison
    • single-pass execution with constant state per group
  • Implemented GroupsAccumulator support to integrate with Comet’s grouped aggregation path

    • avoids scalar accumulation and per-row overhead
    • includes specialized handling for primitive, byte/string, and struct ordering types, with a row-based fallback for general cases
    • enables execution through CometHashAggregate for grouped workloads
  • Added serialization and planner wiring

    • proto definitions for MaxBy / MinBy
    • Spark-side serde (CometMaxBy, CometMinBy)
    • registration in QueryPlanSerde
    • planner support to construct the native aggregate
  • Extended operator support

    • enabled execution under HashAggregate
    • added support for SortAggregateExec where selected by Spark
    • ensured both partial and final aggregation stages execute natively

How are these changes tested?

Validated against Spark for:

  • grouped and non-grouped queries
  • null handling (both ordering and value)
  • tie behavior (equal ordering selects the latest value)
  • struct ordering
  • both hash and sort aggregate plans

Results match Spark semantics, and supported queries execute without any fallback.

Note: I've made the two changes required: 1)Updating the title correctly & 2)Adding the Licence header for maxmin_by.rs

@pchintar pchintar changed the title Add native support for max_by and min_by feat: Add native support for max_by and min_by Apr 17, 2026
@pchintar
Copy link
Copy Markdown
Author

Hi, so I understand the issue with the formatting of rust files & so I ran cargo fmt --all to address the lint failure. But I don't know what's causing the remaining few couple of errors?

@coderfender
Copy link
Copy Markdown
Contributor

@pchintar , you would want to download the logs and grep for FAILED and see which particular spark tests failed and fix code accordingly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add native Comet support for max_by and min_by aggregates (HashAggregate + SortAggregate)

2 participants