Skip to content

[feature] Add pluggable codec pipeline for RAW forward indexes#18092

Open
xiangfu0 wants to merge 3 commits intoapache:masterfrom
xiangfu0:claude/zealous-rosalind
Open

[feature] Add pluggable codec pipeline for RAW forward indexes#18092
xiangfu0 wants to merge 3 commits intoapache:masterfrom
xiangfu0:claude/zealous-rosalind

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Apr 3, 2026

Summary

Replace the single compressionCodec with a pluggable codec pipeline for RAW forward indexes, enabling an ordered list of transform and compression stages.

Key design decisions

  • compressionCodec and codecPipeline are mutually exclusive — setting both throws an error at both FieldConfig and ForwardIndexConfig levels. compressionCodec is deprecated; use codecPipeline for new configs.
  • All RAW codecs auto-derive a pipelinecompressionCodec is always internally converted to codecPipeline, so downstream code operates through a single unified path. CLP codecs are the only exception (they use separate creator/reader paths).
  • Legacy compound codecs preservedDELTA and DELTADELTA (which bundle delta+LZ4 in a custom format with type flag, count metadata, first value storage) are mapped to DELTA_LZ4 and DOUBLE_DELTA_LZ4 single-stage compressor pipelines. These are NOT byte-compatible with pipeline DELTA transform + LZ4 compressor.
  • Pipeline DELTA is a pure transform — in the pipeline world, DELTA is an in-place size-preserving transform. Users compose ["DELTA", "ZSTANDARD"] explicitly.

Key changes

  • Codec Pipeline: Ordered list of ChunkCodec stages applied left-to-right on write, right-to-left on read. At most one terminal compressor (must be last); all preceding stages are transforms.
  • Mutual exclusivity: compressionCodec and codecPipeline cannot both be set in FieldConfig or ForwardIndexConfig.
  • Auto-conversion: When codecPipeline is not set, compressionCodec is automatically converted to an equivalent pipeline in ForwardIndexConfig.
  • compressionCodec deprecated: Marked @Deprecated in both FieldConfig and ForwardIndexConfig. Retained for JSON backward compatibility.
  • Built-in Transforms: DELTA (delta encoding), DOUBLE_DELTA (delta-of-delta encoding), and XOR (Gorilla-style XOR encoding) for fixed-width columns (INT, LONG, FLOAT, DOUBLE).
  • Writer Version 7: New on-disk header format for pipelines with transforms. Compression-only pipelines continue to use existing writer versions for backward compatibility.
  • Table-level validation: TableConfigUtils validates that codecPipeline transforms are only used on single-value fixed-width columns (INT, LONG, FLOAT, DOUBLE); compression-only pipelines have no column type restrictions.

User Manual

What is a Codec Pipeline?

A codec pipeline is an ordered sequence of encoding stages applied to each data chunk in a RAW forward index. It replaces the single compressionCodec with a composable chain of transforms followed by an optional compressor.

  • Transforms (e.g., DELTA, DOUBLE_DELTA, XOR) are numeric, in-place, size-preserving operations that restructure data to make it more compressible.
  • Compressors (e.g., ZSTANDARD, LZ4, SNAPPY, GZIP) are byte-level operations that reduce data size.

On write, stages are applied left-to-right: transforms first, then compression.
On read, stages are applied right-to-left: decompression first, then reverse transforms.

Migration from compressionCodec

Existing table configs using compressionCodec continue to work without changes. Internally, compressionCodec is always auto-converted to an equivalent pipeline:

compressionCodec Auto-derived codecPipeline Notes
PASS_THROUGH [PASS_THROUGH]
SNAPPY [SNAPPY]
ZSTANDARD [ZSTANDARD]
LZ4 [LZ4]
GZIP [GZIP]
DELTA [DELTA_LZ4] Legacy compound codec (NOT the same as ["DELTA", "LZ4"])
DELTADELTA [DOUBLE_DELTA_LZ4] Legacy compound codec (NOT the same as ["DOUBLE_DELTA", "LZ4"])
CLP / CLPV2 / etc. (no pipeline) Uses separate CLP creator/reader
MV_ENTRY_DICT (no pipeline) Dict-encoded, not RAW

Important: DELTA_LZ4 and DOUBLE_DELTA_LZ4 are single-stage compressor entries that wrap the existing legacy DeltaCompressor/DeltaDecompressor. They use a custom on-disk format (type flag byte + count + first value + LZ4-compressed deltas) that is fundamentally different from the pipeline approach. To use the pure delta transform with composable compression, use an explicit pipeline like ["DELTA", "ZSTANDARD"].

Mutual exclusivity: Setting both compressionCodec and codecPipeline in a field config will throw an error.

Available Codecs

Codec Kind Description
PASS_THROUGH Compressor No compression (raw bytes)
SNAPPY Compressor Snappy compression
ZSTANDARD Compressor Zstandard compression
LZ4 Compressor LZ4 compression
GZIP Compressor GZIP compression
DELTA_LZ4 Compressor Legacy delta+LZ4 compound (for backward compatibility)
DOUBLE_DELTA_LZ4 Compressor Legacy delta-of-delta+LZ4 compound (for backward compatibility)
DELTA Transform Delta encoding: stores first value as-is, then differences from the previous value
DOUBLE_DELTA Transform Delta-of-delta: stores first value and first delta as-is, then delta-of-deltas
XOR Transform Gorilla-style XOR encoding: XORs consecutive values, producing many zero bits for slowly-changing data

Pipeline Rules

  1. At most one compressor, and it must be the last stage.
  2. All stages before the compressor must be transforms.
  3. If no compressor is specified, PASS_THROUGH is used implicitly.
  4. Maximum 8 stages per pipeline.
  5. Transforms currently support single-value fixed-width columns only (INT, LONG, FLOAT, DOUBLE).

When to Use

Data Pattern Recommended Pipeline Why
Monotonically increasing timestamps ["DELTA", "ZSTANDARD"] Delta reduces values to small deltas; ZSTD compresses well
Fixed-interval timestamps (e.g., every 60s) ["DOUBLE_DELTA", "ZSTANDARD"] Double-delta reduces to near-zero residuals
Slowly-changing counters ["DELTA", "LZ4"] Delta + fast LZ4 for low-latency reads
Highly regular sequences ["DELTA", "DELTA", "DELTA", "ZSTANDARD"] Stacked deltas for polynomial-like data
Slowly-changing FLOAT/DOUBLE (e.g., temperature) ["XOR", "ZSTANDARD"] XOR produces values with many zero bits; compresses well
Slowly-changing FLOAT/DOUBLE with trend ["DELTA", "XOR", "ZSTANDARD"] Delta removes trend; XOR handles residual float noise

Sample Configs

Basic: Delta + Zstandard for a timestamp column

{
  "tableName": "events_REALTIME",
  "tableType": "REALTIME",
  "fieldConfigList": [
    {
      "name": "eventTimestamp",
      "encodingType": "RAW",
      "codecPipeline": ["DELTA", "ZSTANDARD"]
    }
  ]
}

Double-delta for fixed-interval metrics

{
  "fieldConfigList": [
    {
      "name": "metricTimestamp",
      "encodingType": "RAW",
      "codecPipeline": ["DOUBLE_DELTA", "LZ4"]
    }
  ]
}

Backward-compatible: existing compressionCodec still works

{
  "fieldConfigList": [
    {
      "name": "userId",
      "encodingType": "RAW",
      "compressionCodec": "ZSTANDARD"
    }
  ]
}

This is internally converted to codecPipeline: ["ZSTANDARD"] — no config changes required.

Multiple columns with different pipelines

{
  "fieldConfigList": [
    {
      "name": "eventTimestamp",
      "encodingType": "RAW",
      "codecPipeline": ["DELTA", "ZSTANDARD"]
    },
    {
      "name": "requestCount",
      "encodingType": "RAW",
      "codecPipeline": ["DELTA", "LZ4"]
    },
    {
      "name": "userId",
      "encodingType": "RAW",
      "compressionCodec": "ZSTANDARD"
    }
  ]
}

XOR (Gorilla) for slowly-changing float metrics

{
  "fieldConfigList": [
    {
      "name": "temperature",
      "encodingType": "RAW",
      "codecPipeline": ["XOR", "ZSTANDARD"]
    }
  ]
}

Advanced: Triple-delta stacking for polynomial data

{
  "fieldConfigList": [
    {
      "name": "sensorReading",
      "encodingType": "RAW",
      "codecPipeline": ["DELTA", "DELTA", "DELTA", "ZSTANDARD"]
    }
  ]
}

New Files

File Purpose
ChunkCodec Unified codec enum: pure compressors (0-99), legacy compound codecs (6-7), transforms (100+)
ChunkCodecPipeline Immutable ordered sequence of codec stages with construction-time validation
ChunkTransform SPI interface for in-place encode/decode transforms
DeltaTransform Delta encoding (value − previous)
DoubleDeltaTransform Delta-of-delta encoding
XorTransform Gorilla-style XOR encoding for FLOAT/DOUBLE (also works on INT/LONG)
ChunkTransformFactory Maps ChunkCodecChunkTransform singleton
PipelineChunkCompressor Applies transforms then terminal compressor
PipelineChunkDecompressor Terminal decompressor then reverse transforms

Modified Files

File Change
FieldConfig Added codecPipeline field; mutual exclusivity validation; deprecated compressionCodec
ForwardIndexConfig Auto-derives pipeline from all RAW codecs (except CLP); conflict check; deprecated getCompressionCodec()
ForwardIndexReader Added default getCodecPipeline() method
ChunkCompressorFactory Pipeline-aware getCompressor/getDecompressor overloads; legacy compound codec support
BaseChunkForwardIndexWriter Version 7 header; validates V7↔pipeline invariant and transform value sizes
FixedByteChunkForwardIndexWriter Added constructor with pipeline parameter
SingleValueFixedByteRawIndexCreator Added constructor with pipeline parameter
BaseChunkForwardIndexReader Version 7 header parsing; validates storedType for transform pipelines
ForwardIndexCreatorFactory Uses pipeline path for transform pipelines; derives compressionType from pipeline for legacy/MV paths
ForwardIndexType Transform validation (SV, fixed-width); compression-only pipelines unrestricted
ForwardIndexHandler Unified pipeline-based comparison for reload/rewrite detection
TableConfigUtils Extended validateGorillaCompressionCodecIfPresent to validate codecPipeline transforms (SV, fixed-width)

Test plan

  • 66 tests in ChunkCodecPipelineTest — all layers (DELTA_LZ4/DOUBLE_DELTA_LZ4, fromCompressionType, legacy vs pipeline inequality, XOR transform for int/long/float/double)
  • 74 tests in FixedByteChunkSVForwardIndexTest — existing tests pass with auto-derivation
  • 34 tests in ForwardIndexTypeTest — validation tests pass
  • 31 tests in ForwardIndexHandlerTest — reload/rewrite detection passes
  • 14 tests in ForwardIndexConfigTest — auto-derivation, conflict, CLP/MV_ENTRY_DICT exclusion
  • TableConfigUtilsTest.testValidateFieldConfig — validates codecPipeline transforms on wrong types, MV columns, and valid SV INT; compression-only pipeline on STRING passes
  • Spotless, checkstyle, license checks pass

🤖 Generated with Claude Code

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an optional pre-compression transformCodec (DELTA / DOUBLE_DELTA) for RAW forward indexes, enabling entropy-reducing transforms to be composed with existing chunk compression codecs (e.g., DELTA + ZSTANDARD), with a new writer header field (v6) for backward compatibility.

Changes:

  • Add transformCodec to table config plumbing (FieldConfigForwardIndexConfig) and validate supported usage (SV INT/LONG RAW only).
  • Extend chunk forward index writer/reader to persist/read transform codec in a v6 header and wrap compression/decompression with transform stages.
  • Introduce transform SPI + implementations (Delta/DoubleDelta) and unit tests.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java Adds transformCodec to user-facing field config and builder/constructors.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java Exposes optional getTransformCodec() on forward index readers.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java Carries TransformCodec through index config (JSON + builder + equality).
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/TransformCodec.java Defines transform codec enum persisted in forward index headers.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkTransform.java Adds SPI interface for in-place encode/decode transforms.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaTransform.java Implements delta transform.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DoubleDeltaTransform.java Implements double-delta transform.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkTransformFactory.java Maps TransformCodec to concrete ChunkTransform.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/TransformCompressor.java Wraps an existing compressor with pre-transform encode stage.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/TransformDecompressor.java Wraps an existing decompressor with post-transform decode stage.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java Adds overloads to build compressor/decompressor pipelines with optional transforms.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java Writes v6 header with transform field; builds compressor pipeline with transform.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java Plumbs transform codec into fixed-byte writer construction.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java Plumbs transform codec into fixed-byte raw forward index creator.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java Reads v6 header transform field; builds decompressor pipeline with transform.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java Bumps writer version to 6 when transform is configured; routes to fixed-byte creator.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java Validates transform constraints and maps FieldConfig transform to internal config.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java Detects transform codec changes to decide on forward index rewrites.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java Updates default ForwardIndexConfig construction for new signature.
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/ChunkTransformTest.java Adds unit tests for delta/double-delta transform round-trips.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 3, 2026

Codecov Report

❌ Patch coverage is 74.71910% with 135 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.33%. Comparing base (22b3b6f) to head (902d275).

Files with missing lines Patch % Lines
...he/pinot/segment/spi/codec/ChunkCodecPipeline.java 40.57% 28 Missing and 13 partials ⚠️
...local/io/codec/transform/DoubleDeltaTransform.java 84.76% 12 Missing and 4 partials ⚠️
...he/pinot/segment/spi/index/ForwardIndexConfig.java 46.15% 12 Missing and 2 partials ⚠️
...ment/index/forward/ForwardIndexCreatorFactory.java 9.09% 5 Missing and 5 partials ⚠️
...org/apache/pinot/segment/spi/codec/ChunkCodec.java 72.97% 8 Missing and 2 partials ⚠️
...ocal/segment/index/loader/ForwardIndexHandler.java 65.00% 5 Missing and 2 partials ⚠️
...al/io/writer/impl/BaseChunkForwardIndexWriter.java 77.77% 0 Missing and 6 partials ⚠️
...apache/pinot/segment/spi/codec/ChunkTransform.java 0.00% 5 Missing ⚠️
...gment/local/io/codec/transform/DeltaTransform.java 92.72% 2 Missing and 2 partials ⚠️
...segment/local/io/codec/transform/XorTransform.java 92.72% 2 Missing and 2 partials ⚠️
... and 8 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18092      +/-   ##
============================================
+ Coverage     63.30%   63.33%   +0.02%     
- Complexity     1627     1664      +37     
============================================
  Files          3226     3235       +9     
  Lines        196636   197131     +495     
  Branches      30401    30497      +96     
============================================
+ Hits         124490   124846     +356     
- Misses        62170    62257      +87     
- Partials       9976    10028      +52     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.31% <74.71%> (+0.04%) ⬆️
java-21 63.31% <74.71%> (+0.02%) ⬆️
temurin 63.33% <74.71%> (+0.02%) ⬆️
unittests 63.32% <74.71%> (+0.02%) ⬆️
unittests1 55.13% <20.59%> (-0.15%) ⬇️
unittests2 35.02% <62.17%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 4 comments.

@xiangfu0 xiangfu0 force-pushed the claude/zealous-rosalind branch from 3f067db to 75e1468 Compare April 4, 2026 03:18
@xiangfu0 xiangfu0 changed the title Add transformCodec support for RAW forward indexes [feature] Add pluggable codec pipeline for RAW forward indexes Apr 4, 2026
@xiangfu0 xiangfu0 force-pushed the claude/zealous-rosalind branch from 75e1468 to 92e2595 Compare April 4, 2026 06:17
@xiangfu0 xiangfu0 requested a review from Copilot April 4, 2026 07:03
@xiangfu0 xiangfu0 force-pushed the claude/zealous-rosalind branch from 92e2595 to 624a538 Compare April 4, 2026 07:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java:114

  • When codecPipeline is configured but chunkCompressionType is null, the current logic skips assigning a default compression type (if (chunkCompressionType == null && codecPipeline == null)). For non-fixed-width SV columns (e.g., STRING/BYTES), the pipeline is not used later, so chunkCompressionType stays null and will be passed into SingleValueVarByteRawIndexCreator, which expects a non-null ChunkCompressionType and will fail at runtime. Fix by either deriving chunkCompressionType from codecPipeline when present, or rejecting codecPipeline for types/paths where pipeline isn’t supported yet (and/or still falling back to the default compression type when pipeline can’t be applied).
      ChunkCodecPipeline codecPipeline = indexConfig.getCodecPipeline();
      ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType();
      if (chunkCompressionType == null && codecPipeline == null) {
        chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
      }
      boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
      int writerVersion = indexConfig.getRawIndexWriterVersion();
      int targetMaxChunkSize = indexConfig.getTargetMaxChunkSizeBytes();
      int targetDocsPerChunk = indexConfig.getTargetDocsPerChunk();

      // Auto-bump writer version to 7 when pipeline is set
      if (codecPipeline != null && writerVersion < 7) {
        writerVersion = 7;
      }

      if (fieldSpec.isSingleValueField()) {
        if (codecPipeline != null && storedType.isFixedWidth()) {
          return new SingleValueFixedByteRawIndexCreator(indexDir,
              codecPipeline.getChunkCompressionType(), codecPipeline, columnName, numTotalDocs,
              storedType, writerVersion, targetDocsPerChunk);
        }
        return getRawIndexCreatorForSVColumn(indexDir, chunkCompressionType, columnName, storedType, numTotalDocs,
            context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion, targetMaxChunkSize,
            targetDocsPerChunk);
      } else {

@xiangfu0 xiangfu0 force-pushed the claude/zealous-rosalind branch 9 times, most recently from 4877e7d to 90c650f Compare April 4, 2026 10:21
@xiangfu0 xiangfu0 requested review from Copilot April 4, 2026 21:37
@xiangfu0 xiangfu0 added ingestion Related to data ingestion pipeline feature New functionality labels Apr 4, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 24 changed files in this pull request and generated 6 comments.

@xiangfu0 xiangfu0 force-pushed the claude/zealous-rosalind branch 6 times, most recently from b6589b2 to 3877377 Compare April 6, 2026 00:44
Copy link
Copy Markdown
Contributor Author

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a few high-signal issues; see inline comments.

int targetDocsPerChunk = indexConfig.getTargetDocsPerChunk();

// Auto-bump writer version to 7 when pipeline has transforms
if (codecPipeline != null && codecPipeline.hasTransforms() && writerVersion < 7) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the new V7 on-disk format reachable during ordinary segment creation, but older Pinot servers cannot safely read that header. Before this PR, BaseChunkForwardIndexReader always treated the word after totalDocs as ChunkCompressionType; on a V7 segment it will read pipelineLength/codecs as legacy header fields instead of rejecting the file. During a rolling upgrade, a new server/minion can therefore write segments that old servers misparse or fail to query. We need a mixed-version/cluster-version gate before auto-emitting V7 here, or we break Pinot's cross-version segment compatibility.

@xiangfu0 xiangfu0 force-pushed the claude/zealous-rosalind branch 2 times, most recently from 7ea5f1a to 8896a4c Compare April 12, 2026 10:15
xiangfu0 and others added 3 commits April 13, 2026 01:44
Replace the single compressionCodec with a codecPipeline field that supports
an ordered list of transform and compression stages. Transforms (DELTA,
DOUBLE_DELTA) are applied left-to-right on write and right-to-left on read,
with at most one terminal compressor. Uses writer version 7 with a new header
format that stores the full pipeline. Includes 53 tests covering unit,
pipeline round-trip, and full end-to-end writer/reader validation including
triple-delta stacking scenarios.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality ingestion Related to data ingestion pipeline

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants