Add batch aggregation for sketch ValueAggregators in MergeRollupTask#17825
Add batch aggregation for sketch ValueAggregators in MergeRollupTask#17825davecromberge wants to merge 4 commits intoapache:masterfrom
Conversation
|
Summary Table with Speedups (From JMH testing)
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17825 +/- ##
============================================
+ Coverage 63.20% 63.26% +0.06%
- Complexity 1456 1466 +10
============================================
Files 3188 3191 +3
Lines 191736 192181 +445
Branches 29347 29453 +106
============================================
+ Hits 121178 121580 +402
+ Misses 61067 61065 -2
- Partials 9491 9536 +45
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Reduces serialization overhead from O(N-1) to O(1) by merging all sketches for a key in a single operation using zero-copy Memory.wrap(). Changes: - Add aggregateBatch() and supportsBatchAggregation() to ValueAggregator interface - Implement batch aggregation for DistinctCountThetaSketchAggregator, IntegerTupleSketchAggregator, and DistinctCountCPCSketchAggregator - Modify RollupReducer to use batch aggregation when supported - Add reducerMaxBatchSize config to SegmentProcessorConfig (default 500) - Add JMH benchmarks comparing pairwise vs batch aggregation Batch aggregation optimizations: - Zero-copy sketch wrapping via Sketch.wrap(Memory.wrap(bytes)) - Theta-based sorting for early termination (Theta/Tuple sketches) - Single final serialization instead of N-1 intermediate serializations Benchmark results (500 sketches per key): - Theta sketch: 189ms → 2ms (91x faster) - Tuple sketch: 133ms → 11ms (12x faster) - CPC sketch: 16ms → 4ms (4x faster)
2bc90d7 to
3b9a667
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces batch aggregation support for sketch-based ValueAggregators and updates the rollup reducer to leverage it, aiming to reduce intermediate sketch (de)serialization overhead during Merge/Rollup minion tasks.
Changes:
- Extend
ValueAggregatorwithaggregateBatch()andsupportsBatchAggregation(), and implement optimized batch paths for Theta/Tuple/CPC sketch aggregators. - Update
RollupReducerto use batch aggregation when available, and addreducerMaxBatchSizeplumbing viaSegmentProcessorConfig/ task configs. - Add/expand unit tests for batch aggregation and add a JMH benchmark for pairwise vs batch sketch merging.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java |
Reads reducerMaxBatchSize from task configs into SegmentProcessorConfig. |
pinot-perf/src/main/java/org/apache/pinot/perf/aggregation/BenchmarkSketchBatchAggregation.java |
Adds a JMH benchmark comparing pairwise vs batch sketch aggregation. |
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java |
Adds rollup tests for theta-sketch batch aggregation and config defaulting. |
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregatorTest.java |
Adds unit tests for tuple sketch batch aggregation and equivalence vs pairwise. |
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregatorTest.java |
Adds unit tests for theta sketch batch aggregation and equivalence vs pairwise. |
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregatorTest.java |
Adds unit tests for CPC sketch batch aggregation and equivalence vs pairwise. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java |
Adds batch-reduce path with batching/flush behavior controlled by max batch size. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java |
Wires reducerMaxBatchSize from config into RollupReducer. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java |
Adds reducerMaxBatchSize to the framework config with defaulting behavior. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java |
Adds default aggregateBatch() and supportsBatchAggregation() to the interface. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java |
Implements optimized batch union with theta-based sorting. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java |
Implements optimized batch union with Sketch.wrap(Memory.wrap(bytes)) and theta sorting. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java |
Implements batch union for CPC sketches. |
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java |
Introduces reducerMaxBatchSize config key and default constant. |
…ing/reducer/RollupReducer.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…c/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Previously, if ANY aggregator supported batch aggregation, ALL metric columns had their values buffered. This changed memory behavior from O(1) streaming to O(N) buffering for wide schemas where only one sketch column benefited. Now uses hybrid approach: - Batch aggregation (O(N) buffer) only for aggregators that support it - Pairwise aggregation (O(1) memory) for simple aggregators like SUM/MIN/MAX This preserves the batch optimization benefits for sketches while maintaining streaming memory behavior for other metric columns.
Reduces serialization overhead from O(N-1) to O(1) by merging all
sketches for a key in a single operation using zero-copy Memory.wrap().
Changes:
Batch aggregation optimizations:
Benchmark results (500 sketches per key):
Tag:
performancerelease-notes: