[feature] Add pluggable codec pipeline for RAW forward indexes#18092
[feature] Add pluggable codec pipeline for RAW forward indexes#18092xiangfu0 wants to merge 3 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds an optional pre-compression transformCodec (DELTA / DOUBLE_DELTA) for RAW forward indexes, enabling entropy-reducing transforms to be composed with existing chunk compression codecs (e.g., DELTA + ZSTANDARD), with a new writer header field (v6) for backward compatibility.
Changes:
- Add
transformCodecto table config plumbing (FieldConfig→ForwardIndexConfig) and validate supported usage (SV INT/LONG RAW only). - Extend chunk forward index writer/reader to persist/read transform codec in a v6 header and wrap compression/decompression with transform stages.
- Introduce transform SPI + implementations (Delta/DoubleDelta) and unit tests.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java | Adds transformCodec to user-facing field config and builder/constructors. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java | Exposes optional getTransformCodec() on forward index readers. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java | Carries TransformCodec through index config (JSON + builder + equality). |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/TransformCodec.java | Defines transform codec enum persisted in forward index headers. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkTransform.java | Adds SPI interface for in-place encode/decode transforms. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaTransform.java | Implements delta transform. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DoubleDeltaTransform.java | Implements double-delta transform. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkTransformFactory.java | Maps TransformCodec to concrete ChunkTransform. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/TransformCompressor.java | Wraps an existing compressor with pre-transform encode stage. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/TransformDecompressor.java | Wraps an existing decompressor with post-transform decode stage. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java | Adds overloads to build compressor/decompressor pipelines with optional transforms. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java | Writes v6 header with transform field; builds compressor pipeline with transform. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java | Plumbs transform codec into fixed-byte writer construction. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java | Plumbs transform codec into fixed-byte raw forward index creator. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java | Reads v6 header transform field; builds decompressor pipeline with transform. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java | Bumps writer version to 6 when transform is configured; routes to fixed-byte creator. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java | Validates transform constraints and maps FieldConfig transform to internal config. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java | Detects transform codec changes to decide on forward index rewrites. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java | Updates default ForwardIndexConfig construction for new signature. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/ChunkTransformTest.java | Adds unit tests for delta/double-delta transform round-trips. |
...cal/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
Outdated
Show resolved
Hide resolved
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/TransformCodec.java
Outdated
Show resolved
Hide resolved
...nt-local/src/test/java/org/apache/pinot/segment/local/io/compression/ChunkTransformTest.java
Outdated
Show resolved
Hide resolved
...nt-local/src/test/java/org/apache/pinot/segment/local/io/compression/ChunkTransformTest.java
Outdated
Show resolved
Hide resolved
...-local/src/main/java/org/apache/pinot/segment/local/io/compression/DoubleDeltaTransform.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18092 +/- ##
============================================
+ Coverage 63.30% 63.33% +0.02%
- Complexity 1627 1664 +37
============================================
Files 3226 3235 +9
Lines 196636 197131 +495
Branches 30401 30497 +96
============================================
+ Hits 124490 124846 +356
- Misses 62170 62257 +87
- Partials 9976 10028 +52
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
...cal/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
Show resolved
Hide resolved
...rg/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
Show resolved
Hide resolved
3f067db to
75e1468
Compare
75e1468 to
92e2595
Compare
92e2595 to
624a538
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java:114
- When
codecPipelineis configured butchunkCompressionTypeis null, the current logic skips assigning a default compression type (if (chunkCompressionType == null && codecPipeline == null)). For non-fixed-width SV columns (e.g., STRING/BYTES), the pipeline is not used later, sochunkCompressionTypestays null and will be passed intoSingleValueVarByteRawIndexCreator, which expects a non-nullChunkCompressionTypeand will fail at runtime. Fix by either derivingchunkCompressionTypefromcodecPipelinewhen present, or rejectingcodecPipelinefor types/paths where pipeline isn’t supported yet (and/or still falling back to the default compression type when pipeline can’t be applied).
ChunkCodecPipeline codecPipeline = indexConfig.getCodecPipeline();
ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType();
if (chunkCompressionType == null && codecPipeline == null) {
chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
}
boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
int writerVersion = indexConfig.getRawIndexWriterVersion();
int targetMaxChunkSize = indexConfig.getTargetMaxChunkSizeBytes();
int targetDocsPerChunk = indexConfig.getTargetDocsPerChunk();
// Auto-bump writer version to 7 when pipeline is set
if (codecPipeline != null && writerVersion < 7) {
writerVersion = 7;
}
if (fieldSpec.isSingleValueField()) {
if (codecPipeline != null && storedType.isFixedWidth()) {
return new SingleValueFixedByteRawIndexCreator(indexDir,
codecPipeline.getChunkCompressionType(), codecPipeline, columnName, numTotalDocs,
storedType, writerVersion, targetDocsPerChunk);
}
return getRawIndexCreatorForSVColumn(indexDir, chunkCompressionType, columnName, storedType, numTotalDocs,
context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion, targetMaxChunkSize,
targetDocsPerChunk);
} else {
...src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
Show resolved
Hide resolved
...cal/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
Outdated
Show resolved
Hide resolved
...cal/src/main/java/org/apache/pinot/segment/local/io/compression/PipelineChunkCompressor.java
Outdated
Show resolved
Hide resolved
4877e7d to
90c650f
Compare
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
Show resolved
Hide resolved
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
Show resolved
Hide resolved
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
Outdated
Show resolved
Hide resolved
...l/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
Outdated
Show resolved
Hide resolved
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/ChunkCodecPipeline.java
Outdated
Show resolved
Hide resolved
b6589b2 to
3877377
Compare
xiangfu0
left a comment
There was a problem hiding this comment.
Found a few high-signal issues; see inline comments.
| int targetDocsPerChunk = indexConfig.getTargetDocsPerChunk(); | ||
|
|
||
| // Auto-bump writer version to 7 when pipeline has transforms | ||
| if (codecPipeline != null && codecPipeline.hasTransforms() && writerVersion < 7) { |
There was a problem hiding this comment.
This makes the new V7 on-disk format reachable during ordinary segment creation, but older Pinot servers cannot safely read that header. Before this PR, BaseChunkForwardIndexReader always treated the word after totalDocs as ChunkCompressionType; on a V7 segment it will read pipelineLength/codecs as legacy header fields instead of rejecting the file. During a rolling upgrade, a new server/minion can therefore write segments that old servers misparse or fail to query. We need a mixed-version/cluster-version gate before auto-emitting V7 here, or we break Pinot's cross-version segment compatibility.
7ea5f1a to
8896a4c
Compare
Replace the single compressionCodec with a codecPipeline field that supports an ordered list of transform and compression stages. Transforms (DELTA, DOUBLE_DELTA) are applied left-to-right on write and right-to-left on read, with at most one terminal compressor. Uses writer version 7 with a new header format that stores the full pipeline. Includes 53 tests covering unit, pipeline round-trip, and full end-to-end writer/reader validation including triple-delta stacking scenarios. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8896a4c to
902d275
Compare
Summary
Replace the single
compressionCodecwith a pluggable codec pipeline for RAW forward indexes, enabling an ordered list of transform and compression stages.Key design decisions
compressionCodecandcodecPipelineare mutually exclusive — setting both throws an error at bothFieldConfigandForwardIndexConfiglevels.compressionCodecis deprecated; usecodecPipelinefor new configs.compressionCodecis always internally converted tocodecPipeline, so downstream code operates through a single unified path. CLP codecs are the only exception (they use separate creator/reader paths).DELTAandDELTADELTA(which bundle delta+LZ4 in a custom format with type flag, count metadata, first value storage) are mapped toDELTA_LZ4andDOUBLE_DELTA_LZ4single-stage compressor pipelines. These are NOT byte-compatible with pipelineDELTAtransform +LZ4compressor.DELTAis an in-place size-preserving transform. Users compose["DELTA", "ZSTANDARD"]explicitly.Key changes
ChunkCodecstages applied left-to-right on write, right-to-left on read. At most one terminal compressor (must be last); all preceding stages are transforms.compressionCodecandcodecPipelinecannot both be set inFieldConfigorForwardIndexConfig.codecPipelineis not set,compressionCodecis automatically converted to an equivalent pipeline inForwardIndexConfig.compressionCodecdeprecated: Marked@Deprecatedin bothFieldConfigandForwardIndexConfig. Retained for JSON backward compatibility.DELTA(delta encoding),DOUBLE_DELTA(delta-of-delta encoding), andXOR(Gorilla-style XOR encoding) for fixed-width columns (INT, LONG, FLOAT, DOUBLE).TableConfigUtilsvalidates thatcodecPipelinetransforms are only used on single-value fixed-width columns (INT, LONG, FLOAT, DOUBLE); compression-only pipelines have no column type restrictions.User Manual
What is a Codec Pipeline?
A codec pipeline is an ordered sequence of encoding stages applied to each data chunk in a RAW forward index. It replaces the single
compressionCodecwith a composable chain of transforms followed by an optional compressor.DELTA,DOUBLE_DELTA,XOR) are numeric, in-place, size-preserving operations that restructure data to make it more compressible.ZSTANDARD,LZ4,SNAPPY,GZIP) are byte-level operations that reduce data size.On write, stages are applied left-to-right: transforms first, then compression.
On read, stages are applied right-to-left: decompression first, then reverse transforms.
Migration from
compressionCodecExisting table configs using
compressionCodeccontinue to work without changes. Internally,compressionCodecis always auto-converted to an equivalent pipeline:compressionCodeccodecPipelinePASS_THROUGH[PASS_THROUGH]SNAPPY[SNAPPY]ZSTANDARD[ZSTANDARD]LZ4[LZ4]GZIP[GZIP]DELTA[DELTA_LZ4]["DELTA", "LZ4"])DELTADELTA[DOUBLE_DELTA_LZ4]["DOUBLE_DELTA", "LZ4"])CLP/CLPV2/ etc.MV_ENTRY_DICTImportant:
DELTA_LZ4andDOUBLE_DELTA_LZ4are single-stage compressor entries that wrap the existing legacy DeltaCompressor/DeltaDecompressor. They use a custom on-disk format (type flag byte + count + first value + LZ4-compressed deltas) that is fundamentally different from the pipeline approach. To use the pure delta transform with composable compression, use an explicit pipeline like["DELTA", "ZSTANDARD"].Mutual exclusivity: Setting both
compressionCodecandcodecPipelinein a field config will throw an error.Available Codecs
PASS_THROUGHSNAPPYZSTANDARDLZ4GZIPDELTA_LZ4DOUBLE_DELTA_LZ4DELTADOUBLE_DELTAXORPipeline Rules
PASS_THROUGHis used implicitly.When to Use
["DELTA", "ZSTANDARD"]["DOUBLE_DELTA", "ZSTANDARD"]["DELTA", "LZ4"]["DELTA", "DELTA", "DELTA", "ZSTANDARD"]["XOR", "ZSTANDARD"]["DELTA", "XOR", "ZSTANDARD"]Sample Configs
Basic: Delta + Zstandard for a timestamp column
{ "tableName": "events_REALTIME", "tableType": "REALTIME", "fieldConfigList": [ { "name": "eventTimestamp", "encodingType": "RAW", "codecPipeline": ["DELTA", "ZSTANDARD"] } ] }Double-delta for fixed-interval metrics
{ "fieldConfigList": [ { "name": "metricTimestamp", "encodingType": "RAW", "codecPipeline": ["DOUBLE_DELTA", "LZ4"] } ] }Backward-compatible: existing
compressionCodecstill works{ "fieldConfigList": [ { "name": "userId", "encodingType": "RAW", "compressionCodec": "ZSTANDARD" } ] }This is internally converted to
codecPipeline: ["ZSTANDARD"]— no config changes required.Multiple columns with different pipelines
{ "fieldConfigList": [ { "name": "eventTimestamp", "encodingType": "RAW", "codecPipeline": ["DELTA", "ZSTANDARD"] }, { "name": "requestCount", "encodingType": "RAW", "codecPipeline": ["DELTA", "LZ4"] }, { "name": "userId", "encodingType": "RAW", "compressionCodec": "ZSTANDARD" } ] }XOR (Gorilla) for slowly-changing float metrics
{ "fieldConfigList": [ { "name": "temperature", "encodingType": "RAW", "codecPipeline": ["XOR", "ZSTANDARD"] } ] }Advanced: Triple-delta stacking for polynomial data
{ "fieldConfigList": [ { "name": "sensorReading", "encodingType": "RAW", "codecPipeline": ["DELTA", "DELTA", "DELTA", "ZSTANDARD"] } ] }New Files
ChunkCodecChunkCodecPipelineChunkTransformDeltaTransformDoubleDeltaTransformXorTransformChunkTransformFactoryChunkCodec→ChunkTransformsingletonPipelineChunkCompressorPipelineChunkDecompressorModified Files
FieldConfigcodecPipelinefield; mutual exclusivity validation; deprecatedcompressionCodecForwardIndexConfiggetCompressionCodec()ForwardIndexReaderdefault getCodecPipeline()methodChunkCompressorFactorygetCompressor/getDecompressoroverloads; legacy compound codec supportBaseChunkForwardIndexWriterFixedByteChunkForwardIndexWriterSingleValueFixedByteRawIndexCreatorBaseChunkForwardIndexReaderForwardIndexCreatorFactoryForwardIndexTypeForwardIndexHandlerTableConfigUtilsvalidateGorillaCompressionCodecIfPresentto validatecodecPipelinetransforms (SV, fixed-width)Test plan
ChunkCodecPipelineTest— all layers (DELTA_LZ4/DOUBLE_DELTA_LZ4, fromCompressionType, legacy vs pipeline inequality, XOR transform for int/long/float/double)FixedByteChunkSVForwardIndexTest— existing tests pass with auto-derivationForwardIndexTypeTest— validation tests passForwardIndexHandlerTest— reload/rewrite detection passesForwardIndexConfigTest— auto-derivation, conflict, CLP/MV_ENTRY_DICT exclusionTableConfigUtilsTest.testValidateFieldConfig— validates codecPipeline transforms on wrong types, MV columns, and valid SV INT; compression-only pipeline on STRING passes🤖 Generated with Claude Code