Skip to content

Add batch aggregation for sketch ValueAggregators in MergeRollupTask#17825

Open
davecromberge wants to merge 4 commits intoapache:masterfrom
permutive-engineering:feature-contrib/batch-aggregation-theta-sketches
Open

Add batch aggregation for sketch ValueAggregators in MergeRollupTask#17825
davecromberge wants to merge 4 commits intoapache:masterfrom
permutive-engineering:feature-contrib/batch-aggregation-theta-sketches

Conversation

@davecromberge
Copy link
Copy Markdown
Member

@davecromberge davecromberge commented Mar 5, 2026

Reduces serialization overhead from O(N-1) to O(1) by merging all
sketches for a key in a single operation using zero-copy Memory.wrap().

Changes:

  • Add aggregateBatch() and supportsBatchAggregation() to ValueAggregator interface
  • Implement batch aggregation for DistinctCountThetaSketchAggregator, IntegerTupleSketchAggregator, and DistinctCountCPCSketchAggregator
  • Modify RollupReducer to use batch aggregation when supported
  • Add reducerMaxBatchSize config to SegmentProcessorConfig (default 1000)
  • Add JMH benchmarks comparing pairwise vs batch aggregation

Batch aggregation optimizations:

  • Zero-copy sketch wrapping via Sketch.wrap(Memory.wrap(bytes))
  • Theta-based sorting for early termination (Theta/Tuple sketches)
  • Single final serialization instead of N-1 intermediate serializations

Benchmark results (500 sketches per key):

  • Theta sketch: 189ms → 2ms (91x faster)
  • Tuple sketch: 133ms → 11ms (12x faster)
  • CPC sketch: 16ms → 4ms (4x faster)

Tag:
performance

release-notes:

  • New configuration options
  • Signature changes to public methods/interfaces

@davecromberge
Copy link
Copy Markdown
Member Author

Summary Table with Speedups (From JMH testing)

Sketch N Pairwise (ms) Batch (ms) Speedup
Theta 10 1.81 0.16 11x
Theta 50 18.06 0.46 39x
Theta 100 38.86 0.71 55x
Theta 500 188.93 2.07 91x
Tuple 10 1.35 0.27 5x
Tuple 50 12.64 1.06 12x
Tuple 100 27.91 2.10 13x
Tuple 500 133.25 10.76 12x
CPC 10 0.30 0.08 4x
CPC 50 1.68 0.38 4x
CPC 100 3.10 0.75 4x
CPC 500 15.90 3.92 4x

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 5, 2026

Codecov Report

❌ Patch coverage is 74.86034% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.26%. Comparing base (3ca14f9) to head (365c25e).
⚠️ Report is 47 commits behind head on master.

Files with missing lines Patch % Lines
...core/segment/processing/reducer/RollupReducer.java 72.22% 13 Missing and 12 partials ⚠️
...segment/processing/aggregator/ValueAggregator.java 11.11% 8 Missing ⚠️
...ion/tasks/mergerollup/MergeRollupTaskExecutor.java 11.11% 7 Missing and 1 partial ⚠️
...aggregator/DistinctCountThetaSketchAggregator.java 91.66% 1 Missing and 1 partial ⚠️
...ssing/aggregator/IntegerTupleSketchAggregator.java 90.47% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17825      +/-   ##
============================================
+ Coverage     63.20%   63.26%   +0.06%     
- Complexity     1456     1466      +10     
============================================
  Files          3188     3191       +3     
  Lines        191736   192181     +445     
  Branches      29347    29453     +106     
============================================
+ Hits         121178   121580     +402     
+ Misses        61067    61065       -2     
- Partials       9491     9536      +45     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.23% <74.86%> (+0.04%) ⬆️
java-21 63.24% <74.86%> (+0.06%) ⬆️
temurin 63.26% <74.86%> (+0.06%) ⬆️
unittests 63.26% <74.86%> (+0.06%) ⬆️
unittests1 55.59% <78.23%> (+0.03%) ⬆️
unittests2 34.23% <0.55%> (+0.10%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

 Reduces serialization overhead from O(N-1) to O(1) by merging all
 sketches for a key in a single operation using zero-copy Memory.wrap().

 Changes:
 - Add aggregateBatch() and supportsBatchAggregation() to ValueAggregator interface
 - Implement batch aggregation for DistinctCountThetaSketchAggregator,
   IntegerTupleSketchAggregator, and DistinctCountCPCSketchAggregator
 - Modify RollupReducer to use batch aggregation when supported
 - Add reducerMaxBatchSize config to SegmentProcessorConfig (default 500)
 - Add JMH benchmarks comparing pairwise vs batch aggregation

 Batch aggregation optimizations:
 - Zero-copy sketch wrapping via Sketch.wrap(Memory.wrap(bytes))
 - Theta-based sorting for early termination (Theta/Tuple sketches)
 - Single final serialization instead of N-1 intermediate serializations

 Benchmark results (500 sketches per key):
 - Theta sketch: 189ms → 2ms (91x faster)
 - Tuple sketch: 133ms → 11ms (12x faster)
 - CPC sketch: 16ms → 4ms (4x faster)
@davecromberge davecromberge force-pushed the feature-contrib/batch-aggregation-theta-sketches branch from 2bc90d7 to 3b9a667 Compare March 5, 2026 13:39
@xiangfu0 xiangfu0 requested review from Copilot and xiangfu0 March 8, 2026 07:23
@xiangfu0 xiangfu0 added enhancement Improvement to existing functionality query Related to query processing functions Related to scalar or aggregation functions labels Mar 8, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces batch aggregation support for sketch-based ValueAggregators and updates the rollup reducer to leverage it, aiming to reduce intermediate sketch (de)serialization overhead during Merge/Rollup minion tasks.

Changes:

  • Extend ValueAggregator with aggregateBatch() and supportsBatchAggregation(), and implement optimized batch paths for Theta/Tuple/CPC sketch aggregators.
  • Update RollupReducer to use batch aggregation when available, and add reducerMaxBatchSize plumbing via SegmentProcessorConfig / task configs.
  • Add/expand unit tests for batch aggregation and add a JMH benchmark for pairwise vs batch sketch merging.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java Reads reducerMaxBatchSize from task configs into SegmentProcessorConfig.
pinot-perf/src/main/java/org/apache/pinot/perf/aggregation/BenchmarkSketchBatchAggregation.java Adds a JMH benchmark comparing pairwise vs batch sketch aggregation.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java Adds rollup tests for theta-sketch batch aggregation and config defaulting.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java Adds unit tests for tuple sketch batch aggregation and equivalence vs pairwise.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java Adds unit tests for theta sketch batch aggregation and equivalence vs pairwise.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java Adds unit tests for CPC sketch batch aggregation and equivalence vs pairwise.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java Adds batch-reduce path with batching/flush behavior controlled by max batch size.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java Wires reducerMaxBatchSize from config into RollupReducer.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java Adds reducerMaxBatchSize to the framework config with defaulting behavior.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java Adds default aggregateBatch() and supportsBatchAggregation() to the interface.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java Implements optimized batch union with theta-based sorting.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java Implements optimized batch union with Sketch.wrap(Memory.wrap(bytes)) and theta sorting.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java Implements batch union for CPC sketches.
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java Introduces reducerMaxBatchSize config key and default constant.

davecromberge and others added 3 commits March 10, 2026 11:30
…ing/reducer/RollupReducer.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…c/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Previously, if ANY aggregator supported batch aggregation, ALL metric
columns had their values buffered. This changed memory behavior from
O(1) streaming to O(N) buffering for wide schemas where only one
sketch column benefited.

Now uses hybrid approach:
- Batch aggregation (O(N) buffer) only for aggregators that support it
- Pairwise aggregation (O(1) memory) for simple aggregators like SUM/MIN/MAX

This preserves the batch optimization benefits for sketches while
maintaining streaming memory behavior for other metric columns.
@Jackie-Jiang Jackie-Jiang added ingestion Related to data ingestion pipeline minion Related to Pinot Minion task framework and removed functions Related to scalar or aggregation functions labels Mar 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Improvement to existing functionality ingestion Related to data ingestion pipeline minion Related to Pinot Minion task framework query Related to query processing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants