diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 6dd621fcac4a..b7e5cf17fe0a 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -128,6 +128,12 @@ MemorySize Memory page size for caching. + +
chain-table.chain-partition-keys
+ (none) + String + Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain. +
chain-table.enabled
false @@ -1187,12 +1193,6 @@ String When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch. - -
chain-table.chain-partition-keys
- (none) - String - Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain. -
scan.file-creation-time-millis
(none) diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index d4c8b5a1678f..253652cb78e8 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -363,4 +363,4 @@ Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. - \ No newline at end of file + diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java index 8d0758a1fcb1..03a00e66632f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java @@ -66,4 +66,14 @@ public class BTreeIndexOptions { .intType() .defaultValue(4096) .withDescription("The max parallelism of Flink/Spark for building BTreeIndex."); + + public static final ConfigOption BTREE_WITH_FILE_META = + ConfigOptions.key("index.with-file-meta") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to store file metadata (ManifestEntry) inside the BTree " + + "global index. When enabled, query planning can " + + "bypass manifest HDFS/OSS reads entirely by reading data file " + + "metadata directly from the index."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java index 0542b7d95fc1..a0be6bca9dbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java @@ -18,10 +18,11 @@ package org.apache.paimon.globalindex; -import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.partition.PartitionPredicate; @@ -41,15 +42,20 @@ import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RowRangeIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.table.SpecialFields.ROW_ID; import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn; @@ -57,6 +63,8 @@ /** Scan for data evolution table. */ public class DataEvolutionBatchScan implements DataTableScan { + private static final Logger LOG = LoggerFactory.getLogger(DataEvolutionBatchScan.class); + private final FileStoreTable table; private final DataTableBatchScan batchScan; @@ -236,13 +244,45 @@ public Plan plan() { ScoreGetter scoreGetter = null; if (rowRangeIndex == null) { - Optional indexResult = evalGlobalIndex(); - if (indexResult.isPresent()) { - GlobalIndexResult result = indexResult.get(); + if (this.globalIndexResult != null) { + // Pre-set global index result pushed down from outer layer + GlobalIndexResult result = this.globalIndexResult; rowRangeIndex = RowRangeIndex.create(result.results().toRangeList()); if (result instanceof ScoredGlobalIndexResult) { scoreGetter = ((ScoredGlobalIndexResult) result).scoreGetter(); } + } else if (filter != null && table.coreOptions().globalIndexEnabled()) { + PartitionPredicate partitionFilter = + batchScan.snapshotReader().manifestsReader().partitionFilter(); + Optional optScanner = + GlobalIndexScanner.create(table, partitionFilter, filter); + if (optScanner.isPresent()) { + try (GlobalIndexScanner scanner = optScanner.get()) { + // Fast path: btree_file_meta (zero manifest reads) + if (scanner.hasFilePathIndex()) { + Optional fp = + scanner.scanWithFilePath(filter); + if (fp.isPresent()) { + Plan fastPlan = buildPlanFromFilePathIndex(fp.get()); + if (fastPlan != null) { + return fastPlan; + } + // Stale index detected; fall through to regular btree or batchScan + } + } + // Regular btree fallback (same scanner, no extra IO) + Optional idx = scanner.scan(filter); + if (idx.isPresent()) { + GlobalIndexResult result = idx.get(); + rowRangeIndex = RowRangeIndex.create(result.results().toRangeList()); + if (result instanceof ScoredGlobalIndexResult) { + scoreGetter = ((ScoredGlobalIndexResult) result).scoreGetter(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } } } @@ -254,30 +294,120 @@ public Plan plan() { return wrapToIndexSplits(splits, rowRangeIndex, scoreGetter); } - private Optional evalGlobalIndex() { - if (this.globalIndexResult != null) { - return Optional.of(globalIndexResult); + /** + * Builds a query plan using the pre-fetched {@link ManifestEntry} rows stored in + * btree_file_meta index. This method does NOT call {@link #batchScan} at all, so no manifest + * HDFS reads occur. + * + *

Returns {@code null} if the index is stale (i.e., one or more referenced data files no + * longer exist, typically due to compaction after the index was built). The caller should fall + * back to a regular manifest scan in that case. + */ + @Nullable + private Plan buildPlanFromFilePathIndex(FilePathGlobalIndexResult result) throws IOException { + ManifestEntrySerializer serializer = new ManifestEntrySerializer(); + + List entries = new ArrayList<>(); + for (byte[] bytes : result.manifestEntryBytes()) { + entries.add(serializer.deserializeFromBytes(bytes)); + } + + // Build rowRangeIndex once; use it for both planning-time file pruning and + // reading-time row-level filtering (IndexedSplit). + RowRangeIndex rowRangeIndex = null; + if (result.hasRowLevelFilter()) { + rowRangeIndex = RowRangeIndex.create(result.rowIndexResult().results().toRangeList()); + // Planning-time file pruning: discard files whose row range has no intersection + // with the matched row IDs, equivalent to what batchScan does during manifest scan. + final RowRangeIndex index = rowRangeIndex; + entries = + entries.stream() + .filter( + e -> { + long from = e.file().nonNullFirstRowId(); + long to = from + e.file().rowCount() - 1; + return !index.intersectedRanges(from, to).isEmpty(); + }) + .collect(Collectors.toList()); + } + + // Guard against stale index caused by compaction after the index was built. + // For typical point/narrow queries this is O(1~3) fileIO.exists() calls. + for (ManifestEntry entry : entries) { + if (!table.fileIO() + .exists( + table.store() + .pathFactory() + .createDataFilePathFactory(entry.partition(), entry.bucket()) + .toPath(entry.file()))) { + LOG.warn( + "btree_file_meta index is stale: file {} no longer exists " + + "(possibly removed by compaction). Falling back to manifest scan.", + entry.file().fileName()); + return null; + } } - if (filter == null) { - return Optional.empty(); + + List splits = buildSplitsFromEntries(entries); + if (splits.isEmpty()) { + return Collections::emptyList; } - CoreOptions options = table.coreOptions(); - if (!options.globalIndexEnabled()) { - return Optional.empty(); + + if (rowRangeIndex != null) { + return wrapToIndexSplits(splits, rowRangeIndex, null); } - PartitionPredicate partitionFilter = - batchScan.snapshotReader().manifestsReader().partitionFilter(); - Optional optionalScanner = - GlobalIndexScanner.create(table, partitionFilter, filter); - if (!optionalScanner.isPresent()) { - return Optional.empty(); + return () -> splits; + } + + /** + * Reconstructs {@link DataSplit} objects from a list of {@link ManifestEntry} instances. + * + *

Groups entries by {@code partition + bucket}, then builds one {@link DataSplit} per group. + * Uses {@link BinaryRow} directly as map key so that grouping relies on its content-based + * {@code equals}/{@code hashCode}, which is collision-free. + */ + private List buildSplitsFromEntries(List entries) { + // partition -> bucket -> files + Map>> grouped = new HashMap<>(); + Map> representative = new HashMap<>(); + + for (ManifestEntry entry : entries) { + grouped.computeIfAbsent(entry.partition(), k -> new HashMap<>()) + .computeIfAbsent(entry.bucket(), k -> new ArrayList<>()) + .add(entry.file()); + representative + .computeIfAbsent(entry.partition(), k -> new HashMap<>()) + .put(entry.bucket(), entry); } - try (GlobalIndexScanner scanner = optionalScanner.get()) { - return scanner.scan(filter); - } catch (IOException e) { - throw new RuntimeException(e); + List splits = new ArrayList<>(); + for (Map.Entry>> partEntry : + grouped.entrySet()) { + for (Map.Entry> bucketEntry : + partEntry.getValue().entrySet()) { + ManifestEntry rep = + representative.get(partEntry.getKey()).get(bucketEntry.getKey()); + List files = bucketEntry.getValue(); + // Sort by firstRowId for consistent ordering + files.sort((a, b) -> Long.compare(a.nonNullFirstRowId(), b.nonNullFirstRowId())); + + Long latestSnapshotId = + batchScan.snapshotReader().snapshotManager().latestSnapshotId(); + DataSplit split = + DataSplit.builder() + .withSnapshot(latestSnapshotId != null ? latestSnapshotId : -1L) + .withPartition(rep.partition()) + .withBucket(rep.bucket()) + .withBucketPath("") + .withTotalBuckets(rep.totalBuckets()) + .isStreaming(false) + .rawConvertible(false) + .withDataFiles(files) + .build(); + splits.add(split); + } } + return splits; } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/FilePathGlobalIndexResult.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/FilePathGlobalIndexResult.java new file mode 100644 index 000000000000..e189e6c4ee55 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/FilePathGlobalIndexResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Result of a BTree-with-file-meta global index query. Contains pre-fetched {@link + * org.apache.paimon.manifest.ManifestEntry} bytes (from the btree_file_meta index) that can be used + * to build a query plan without reading manifest files from HDFS, plus an optional row-level {@link + * GlobalIndexResult} (from the btree key index) for fine-grained row filtering inside matching + * files. + */ +public class FilePathGlobalIndexResult { + + private final List manifestEntryBytes; + @Nullable private final GlobalIndexResult rowIndexResult; + + public FilePathGlobalIndexResult( + List manifestEntryBytes, @Nullable GlobalIndexResult rowIndexResult) { + this.manifestEntryBytes = manifestEntryBytes; + this.rowIndexResult = rowIndexResult; + } + + /** + * Raw serialized {@link org.apache.paimon.manifest.ManifestEntry} bytes from btree_file_meta + * index. + */ + public List manifestEntryBytes() { + return manifestEntryBytes; + } + + /** Whether row-level filtering is available via the btree key index. */ + public boolean hasRowLevelFilter() { + return rowIndexResult != null; + } + + /** The row-level index result (btree key index), or {@code null} if not available. */ + @Nullable + public GlobalIndexResult rowIndexResult() { + return rowIndexResult; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java index 3092f1b3daaf..a92aed65d042 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java @@ -20,6 +20,9 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory; +import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder; +import org.apache.paimon.globalindex.btree.BTreeWithFileMetaReader; import org.apache.paimon.globalindex.io.GlobalIndexFileReader; import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; @@ -35,6 +38,8 @@ import org.apache.paimon.utils.ManifestReadThreadPool; import org.apache.paimon.utils.Range; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -63,6 +68,10 @@ public class GlobalIndexScanner implements Closeable { private final GlobalIndexEvaluator globalIndexEvaluator; private final IndexPathFactory indexPathFactory; + // Per-field map of BTreeWithFileMetaReader instances, keyed by field ID. + // Non-null only when btree (with companion btree_file_meta) index files are present. + @Nullable private final Map filePathIndexReaders; + public GlobalIndexScanner( Options options, RowType rowType, @@ -74,18 +83,76 @@ public GlobalIndexScanner( ManifestReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM)); this.indexPathFactory = indexPathFactory; GlobalIndexFileReader indexFileReader = meta -> fileIO.newInputStream(meta.filePath()); + + // Step 1: collect (fieldId, range) pairs that have a btree_file_meta companion + Map> binaryFileMetaKeyMap = new HashMap<>(); + for (IndexFileMeta indexFile : indexFiles) { + if (BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META.equals(indexFile.indexType())) { + GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); + binaryFileMetaKeyMap + .computeIfAbsent(meta.indexFieldId(), k -> new HashSet<>()) + .add(meta.rowRangeStart()); + } + } + + // Step 2: classify index files + // fieldId -> indexType -> range -> List Map>>> indexMetas = new HashMap<>(); + // fieldId -> (keyIndexByRange, binaryFileMetaByRange) + Map>> keyIndexByField = new HashMap<>(); + Map>> binaryFileMetaByField = new HashMap<>(); + Map fieldById = new HashMap<>(); + for (IndexFileMeta indexFile : indexFiles) { GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); int fieldId = meta.indexFieldId(); String indexType = indexFile.indexType(); - indexMetas - .computeIfAbsent(fieldId, k -> new HashMap<>()) - .computeIfAbsent(indexType, k -> new HashMap<>()) - .computeIfAbsent( - new Range(meta.rowRangeStart(), meta.rowRangeEnd()), - k -> new ArrayList<>()) - .add(indexFile); + Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); + + if (BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META.equals(indexType)) { + binaryFileMetaByField + .computeIfAbsent(fieldId, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(toGlobalMeta(indexFile)); + } else if (BTreeGlobalIndexerFactory.IDENTIFIER.equals(indexType) + && binaryFileMetaKeyMap + .getOrDefault(fieldId, Collections.emptySet()) + .contains(meta.rowRangeStart())) { + keyIndexByField + .computeIfAbsent(fieldId, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(toGlobalMeta(indexFile)); + fieldById.put(fieldId, rowType.getField(fieldId)); + } else { + indexMetas + .computeIfAbsent(fieldId, k -> new HashMap<>()) + .computeIfAbsent(indexType, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(indexFile); + } + } + + // Build BTreeWithFileMetaReader for fields that have btree (with btree_file_meta) files + if (!keyIndexByField.isEmpty()) { + this.filePathIndexReaders = new HashMap<>(); + for (Map.Entry>> entry : + keyIndexByField.entrySet()) { + int fieldId = entry.getKey(); + DataField dataField = fieldById.get(fieldId); + Map> binaryFileMetaForField = + binaryFileMetaByField.getOrDefault(fieldId, Collections.emptyMap()); + filePathIndexReaders.put( + fieldId, + new BTreeWithFileMetaReader( + entry.getValue(), + binaryFileMetaForField, + dataField, + options, + indexFileReader, + executor)); + } + } else { + this.filePathIndexReaders = null; } IntFunction> readersFunction = @@ -142,6 +209,35 @@ public Optional scan(Predicate predicate) { return globalIndexEvaluator.evaluate(predicate); } + /** + * Evaluates the predicate using btree/btree_file_meta index files and returns a {@link + * FilePathGlobalIndexResult} when available. Returns {@link Optional#empty()} if no FilePath + * Global Index files are present for the predicate fields. + */ + public Optional scanWithFilePath(Predicate predicate) + throws IOException { + if (filePathIndexReaders == null || filePathIndexReaders.isEmpty()) { + return Optional.empty(); + } + // For simplicity, we support one field with file-path index at a time. + // In the future this could be extended to multi-field AND/OR combinations. + for (BTreeWithFileMetaReader reader : filePathIndexReaders.values()) { + Optional result = reader.scan(predicate); + if (result.isPresent()) { + return result; + } + } + return Optional.empty(); + } + + /** + * Returns true if this scanner has any btree/btree_file_meta index files, i.e., whether {@link + * #scanWithFilePath(Predicate)} may return a non-empty result. + */ + public boolean hasFilePathIndex() { + return filePathIndexReaders != null && !filePathIndexReaders.isEmpty(); + } + private Collection createReaders( GlobalIndexFileReader indexFileReadWrite, Map>> indexMetas, diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaBuilder.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaBuilder.java new file mode 100644 index 000000000000..a3b45254781b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaBuilder.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.btree; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; +import org.apache.paimon.globalindex.GlobalIndexFileReadWrite; +import org.apache.paimon.globalindex.GlobalIndexParallelWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.Range; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas; + +/** + * Builder for BTree global index with embedded file metadata (key index + file-meta index). + * + *

Key index (index type {@link BTreeGlobalIndexerFactory#IDENTIFIER}, i.e. {@code "btree"}): + * standard BTree index mapping {@code key -> rowIds}. Per-entry fileNames are also tracked so that + * the file-meta index can be populated. + * + *

File-meta index (index type {@link #INDEX_TYPE_FILE_META}): a flat SST that maps {@code + * fileName -> ManifestEntry bytes}. One entry per data file covered by this index shard. + * + *

Both parts are committed atomically in a single {@link CommitMessage}. + */ +public class BTreeWithFileMetaBuilder implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + public static final String INDEX_TYPE_FILE_META = "btree_file_meta"; + + private final FileStoreTable table; + private final DataField indexField; + + public BTreeWithFileMetaBuilder(FileStoreTable table, DataField indexField) { + this.table = table; + this.indexField = indexField; + } + + public DataField getIndexField() { + return indexField; + } + + /** + * Builds btree key index + btree_file_meta index for one partition range and returns a single + * {@link CommitMessage} containing both. + * + * @param rowRange global row id range covered by this shard + * @param partition partition key + * @param split the DataSplit being indexed (used to gather DataFileMeta for btree_file_meta) + * @param data iterator of (indexField, globalRowId) rows + */ + public CommitMessage build( + Range rowRange, BinaryRow partition, DataSplit split, Iterator data) + throws IOException { + + // Build firstRowId -> DataFileMeta interval map for fast lookup + List dataFiles = split.dataFiles(); + // dataFiles are ordered by firstRowId within a split + long[] firstRowIds = new long[dataFiles.size()]; + for (int i = 0; i < dataFiles.size(); i++) { + firstRowIds[i] = dataFiles.get(i).nonNullFirstRowId(); + } + + IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); + GlobalIndexFileReadWrite rw = + new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory); + + // ---- Key index writer (standard BTree key→rowIds) ---- + GlobalIndexWriter rawWriter = + GlobalIndexBuilderUtils.createIndexWriter( + table, + BTreeGlobalIndexerFactory.IDENTIFIER, + indexField, + table.coreOptions().toConfiguration()); + if (!(rawWriter instanceof GlobalIndexParallelWriter)) { + throw new RuntimeException( + "BTree writer must implement GlobalIndexParallelWriter, found: " + + rawWriter.getClass().getName()); + } + GlobalIndexParallelWriter keyIndexWriter = (GlobalIndexParallelWriter) rawWriter; + + // ---- File-meta index writer (fileName → ManifestEntry bytes) ---- + ManifestEntrySerializer manifestSerializer = new ManifestEntrySerializer(); + BinaryFileMetaWriter binaryFileMetaWriter = + new BinaryFileMetaWriter(rw, manifestSerializer); + + // Write file-meta index entries (one per DataFileMeta in the split) + for (DataFileMeta file : dataFiles) { + ManifestEntry entry = + ManifestEntry.create( + FileKind.ADD, partition, split.bucket(), split.totalBuckets(), file); + binaryFileMetaWriter.write(file.fileName(), entry); + } + + // Write key-index entries + InternalRow.FieldGetter indexFieldGetter = + InternalRow.createFieldGetter(indexField.type(), 0); + while (data.hasNext()) { + InternalRow row = data.next(); + long globalRowId = row.getLong(1); + long localRowId = globalRowId - rowRange.from; + Object key = indexFieldGetter.getFieldOrNull(row); + keyIndexWriter.write(key, localRowId); + } + + // Finish both writers and collect ResultEntries + List keyIndexEntries = keyIndexWriter.finish(); + List binaryFileMetaEntries = binaryFileMetaWriter.finish(); + + // Build IndexFileMetas for btree key index + List keyIndexMetas = + toIndexFileMetas( + table.fileIO(), + indexPathFactory, + table.coreOptions(), + rowRange, + indexField.id(), + BTreeGlobalIndexerFactory.IDENTIFIER, + keyIndexEntries); + + // Build IndexFileMetas for btree_file_meta (rowRange and fieldId are same, to allow + // scanning) + List binaryFileMetaIndexMetas = + toIndexFileMetas( + table.fileIO(), + indexPathFactory, + table.coreOptions(), + rowRange, + indexField.id(), + INDEX_TYPE_FILE_META, + binaryFileMetaEntries); + + List allMetas = new ArrayList<>(); + allMetas.addAll(keyIndexMetas); + allMetas.addAll(binaryFileMetaIndexMetas); + + // Single CommitMessage → atomic commit of btree key index + btree_file_meta + DataIncrement dataIncrement = DataIncrement.indexIncrement(allMetas); + return new CommitMessageImpl( + partition, 0, null, dataIncrement, CompactIncrement.emptyIncrement()); + } + + /** + * Flush overload compatible with the existing {@link BTreeGlobalIndexBuilder} API, where we + * have already-written btree key index result entries plus an explicit list of ManifestEntry + * rows for btree_file_meta. + */ + public CommitMessage flushIndex( + Range rowRange, + List keyIndexResultEntries, + List binaryFileMetaResultEntries, + BinaryRow partition) + throws IOException { + + IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); + + List keyIndexMetas = + toIndexFileMetas( + table.fileIO(), + indexPathFactory, + table.coreOptions(), + rowRange, + indexField.id(), + BTreeGlobalIndexerFactory.IDENTIFIER, + keyIndexResultEntries); + + List binaryFileMetaIndexMetas = + toIndexFileMetas( + table.fileIO(), + indexPathFactory, + table.coreOptions(), + rowRange, + indexField.id(), + INDEX_TYPE_FILE_META, + binaryFileMetaResultEntries); + + List allMetas = new ArrayList<>(); + allMetas.addAll(keyIndexMetas); + allMetas.addAll(binaryFileMetaIndexMetas); + + DataIncrement dataIncrement = DataIncrement.indexIncrement(allMetas); + return new CommitMessageImpl( + partition, 0, null, dataIncrement, CompactIncrement.emptyIncrement()); + } + + /** + * Creates a new binary file-meta index writer that accumulates {@code fileName -> + * ManifestEntry} mappings. Call {@link BinaryFileMetaWriter#finish()} to get the {@link + * ResultEntry} list. + */ + public BinaryFileMetaWriter createBinaryFileMetaWriter() throws IOException { + IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); + GlobalIndexFileReadWrite rw = + new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory); + return new BinaryFileMetaWriter(rw, new ManifestEntrySerializer()); + } + + /** + * Resolves which {@link DataFileMeta} a global row id belongs to by binary search over + * pre-computed {@code firstRowIds} array. + */ + static int findFileIndex(long[] firstRowIds, long globalRowId) { + int lo = 0; + int hi = firstRowIds.length - 1; + while (lo < hi) { + int mid = (lo + hi + 1) >>> 1; + if (firstRowIds[mid] <= globalRowId) { + lo = mid; + } else { + hi = mid - 1; + } + } + return lo; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaReader.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaReader.java new file mode 100644 index 000000000000..52d3e7c91665 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaReader.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.btree; + +import org.apache.paimon.globalindex.FilePathGlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexEvaluator; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.OffsetGlobalIndexReader; +import org.apache.paimon.globalindex.UnionGlobalIndexReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Range; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +/** + * Reader for BTree-with-file-meta index (btree + btree_file_meta). + * + *

btree key index files are evaluated with the standard BTree predicate visitor to obtain + * matching row IDs. btree_file_meta files are read fully to obtain {@link + * org.apache.paimon.manifest.ManifestEntry} rows for the files covered by this index shard. + * + *

Returns a {@link FilePathGlobalIndexResult} combining both. + */ +public class BTreeWithFileMetaReader { + + private final Map> keyIndexByRange; + private final Map> binaryFileMetaByRange; + private final DataField indexField; + private final Options options; + private final GlobalIndexFileReader fileReader; + private final ExecutorService executor; + + public BTreeWithFileMetaReader( + Map> keyIndexByRange, + Map> binaryFileMetaByRange, + DataField indexField, + Options options, + GlobalIndexFileReader fileReader, + ExecutorService executor) { + this.keyIndexByRange = keyIndexByRange; + this.binaryFileMetaByRange = binaryFileMetaByRange; + this.indexField = indexField; + this.options = options; + this.fileReader = fileReader; + this.executor = executor; + } + + /** + * Evaluates the predicate against the btree key index and reads btree_file_meta manifest + * entries. + * + * @return {@link Optional#empty()} if no btree key index files exist, otherwise a {@link + * FilePathGlobalIndexResult} with row-level bitmap and manifest entry rows. + */ + public Optional scan(Predicate predicate) throws IOException { + if (keyIndexByRange.isEmpty()) { + return Optional.empty(); + } + + // ---- btree key index: standard BTree predicate evaluation ---- + BTreeGlobalIndexer indexer = new BTreeGlobalIndexer(indexField, options); + RowType singleFieldRowType = new RowType(false, Collections.singletonList(indexField)); + + List unionReaders = new ArrayList<>(); + for (Map.Entry> entry : keyIndexByRange.entrySet()) { + Range range = entry.getKey(); + List metas = entry.getValue(); + GlobalIndexReader innerReader = indexer.createReader(fileReader, metas); + unionReaders.add(new OffsetGlobalIndexReader(innerReader, range.from, range.to)); + } + + GlobalIndexReader keyIndexUnion = new UnionGlobalIndexReader(unionReaders, executor); + GlobalIndexEvaluator evaluator = + new GlobalIndexEvaluator( + singleFieldRowType, fieldId -> Collections.singletonList(keyIndexUnion)); + + Optional keyIndexResult; + try { + keyIndexResult = evaluator.evaluate(predicate); + } finally { + evaluator.close(); + } + + if (!keyIndexResult.isPresent() || keyIndexResult.get().results().isEmpty()) { + return Optional.empty(); + } + + // ---- btree_file_meta: read all ManifestEntry raw bytes ---- + List manifestEntryBytes = readAllBinaryFileMetaEntries(); + if (manifestEntryBytes.isEmpty()) { + // No btree_file_meta data — return key-index result only (no file-path optimization) + return Optional.empty(); + } + + return Optional.of(new FilePathGlobalIndexResult(manifestEntryBytes, keyIndexResult.get())); + } + + private List readAllBinaryFileMetaEntries() throws IOException { + // Use LinkedHashMap to deduplicate by fileName across multiple SST files. + // When the same range is written by multiple parallel subtasks, each subtask produces a + // complete btree_file_meta SST with identical entries. putIfAbsent keeps only the first + // occurrence and preserves insertion order. + LinkedHashMap seen = new LinkedHashMap<>(); + + for (List metas : binaryFileMetaByRange.values()) { + for (GlobalIndexIOMeta meta : metas) { + BinaryFileMetaIndexReader reader = + new BinaryFileMetaIndexReader(fileReader, meta, options); + try { + BinaryFileMetaIndexReader.EntryIterator it = reader.iterator(); + while (it.hasNext()) { + String fileName = it.nextKey(); + byte[] valueBytes = it.nextValue(); + seen.putIfAbsent(fileName, valueBytes); + } + } finally { + reader.close(); + } + } + } + return new ArrayList<>(seen.values()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaIndexReader.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaIndexReader.java new file mode 100644 index 000000000000..6f9c9d91f1d6 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaIndexReader.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.btree; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.io.cache.CacheManager; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.options.Options; +import org.apache.paimon.sst.BlockCache; +import org.apache.paimon.sst.BlockIterator; +import org.apache.paimon.sst.SstFileReader; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Comparator; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * Sequential reader for btree_file_meta SST files (fileName → ManifestEntry bytes). + * + *

btree_file_meta files are written by {@link BinaryFileMetaWriter} using the same SST format as + * btree key index files, but without a BTreeIndexMeta (null meta) and without a null bitmap. Keys + * are UTF-8 encoded file names; values are serialized {@link + * org.apache.paimon.manifest.ManifestEntry} bytes. + */ +public class BinaryFileMetaIndexReader implements Closeable { + + private final SeekableInputStream input; + private final SstFileReader sstReader; + + public BinaryFileMetaIndexReader( + GlobalIndexFileReader fileReader, GlobalIndexIOMeta meta, Options options) + throws IOException { + this.input = fileReader.getInputStream(meta); + long fileSize = meta.fileSize(); + + CacheManager cacheManager = + new CacheManager( + options.get(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE), + options.get(BTreeIndexOptions.BTREE_INDEX_HIGH_PRIORITY_POOL_RATIO)); + + BlockCache blockCache = new BlockCache(meta.filePath(), input, cacheManager); + BTreeFileFooter footer = readFooter(blockCache, fileSize); + + Comparator sliceComparator = lexicographicComparator(); + this.sstReader = + new SstFileReader(sliceComparator, blockCache, footer.getIndexBlockHandle(), null); + } + + /** Returns an iterator over all (key, value) pairs in insertion order. */ + public EntryIterator iterator() { + return new EntryIterator(); + } + + @Override + public void close() throws IOException { + sstReader.close(); + input.close(); + } + + private BTreeFileFooter readFooter(BlockCache blockCache, long fileSize) { + MemorySegment footerBytes = + blockCache.getBlock( + fileSize - BTreeFileFooter.ENCODED_LENGTH, + BTreeFileFooter.ENCODED_LENGTH, + b -> b, + true); + return BTreeFileFooter.readFooter(MemorySlice.wrap(footerBytes).toInput()); + } + + private static Comparator lexicographicComparator() { + return (a, b) -> { + int lenA = a.length(); + int lenB = b.length(); + int min = Math.min(lenA, lenB); + for (int i = 0; i < min; i++) { + int diff = (a.readByte(i) & 0xFF) - (b.readByte(i) & 0xFF); + if (diff != 0) { + return diff; + } + } + return lenA - lenB; + }; + } + + /** + * Iterator over btree_file_meta SST entries, yielding (fileName, ManifestEntry bytes) pairs. + */ + public class EntryIterator { + + private final SstFileReader.SstFileIterator fileIter; + private BlockIterator dataIter; + private String nextKey; + private byte[] nextValue; + + private EntryIterator() { + this.fileIter = sstReader.createIterator(); + } + + public boolean hasNext() throws IOException { + if (nextValue != null) { + return true; + } + + while (true) { + if (dataIter != null && dataIter.hasNext()) { + Map.Entry entry = dataIter.next(); + nextKey = new String(entry.getKey().copyBytes(), StandardCharsets.UTF_8); + nextValue = entry.getValue().copyBytes(); + return true; + } + + dataIter = fileIter.readBatch(); + if (dataIter == null) { + return false; + } + } + } + + /** + * Returns the fileName (key) of the current entry. Must be called after {@link #hasNext()} + * returns {@code true} and before {@link #nextValue()}. + */ + public String nextKey() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entries in btree_file_meta file."); + } + return nextKey; + } + + /** Returns the raw ManifestEntry bytes (value) and advances the iterator. */ + public byte[] nextValue() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entries in btree_file_meta file."); + } + byte[] result = nextValue; + nextKey = null; + nextValue = null; + return result; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaWriter.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaWriter.java new file mode 100644 index 000000000000..07d8612064a5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.btree; + +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexFileReadWrite; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; +import org.apache.paimon.memory.MemorySlice; +import org.apache.paimon.sst.BlockHandle; +import org.apache.paimon.sst.BloomFilterHandle; +import org.apache.paimon.sst.SstFileWriter; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * btree_file_meta writer: stores {@code fileName -> ManifestEntry bytes} in a single SST file. + * + *

Duplicate fileNames are silently ignored (only the first occurrence is written). Keys must be + * written in lexicographic order of fileName, which is guaranteed because callers iterate over + * {@link org.apache.paimon.io.DataFileMeta} lists that are already ordered. + */ +public class BinaryFileMetaWriter { + + /** Prefix used when creating the SST file name. */ + public static final String FILE_PREFIX = "btree-file-meta"; + + private final String fileName; + private final PositionOutputStream out; + private final SstFileWriter writer; + private final ManifestEntrySerializer manifestSerializer; + private final KeySerializer keySerializer; + private final Set writtenFileNames = new LinkedHashSet<>(); + private long rowCount = 0; + + public BinaryFileMetaWriter( + GlobalIndexFileReadWrite rw, ManifestEntrySerializer manifestSerializer) + throws IOException { + this.fileName = rw.newFileName(FILE_PREFIX); + this.out = rw.newOutputStream(this.fileName); + // Use no compression and default block size (64 KiB) for simplicity + this.writer = + new SstFileWriter( + out, + 64 * 1024, + null, + BlockCompressionFactory.create(new CompressOptions("none", 1))); + this.manifestSerializer = manifestSerializer; + this.keySerializer = new KeySerializer.StringSerializer(); + } + + /** + * Writes a single {@code fileName -> ManifestEntry} mapping. If {@code fileName} was already + * written, this call is a no-op (idempotent). + */ + public void write(String fileName, ManifestEntry entry) throws IOException { + if (writtenFileNames.contains(fileName)) { + return; + } + writtenFileNames.add(fileName); + rowCount++; + + byte[] keyBytes = + keySerializer.serialize(org.apache.paimon.data.BinaryString.fromString(fileName)); + byte[] valueBytes = manifestSerializer.serializeToBytes(entry); + writer.put(keyBytes, valueBytes); + } + + /** + * Finalizes the SST file and returns a single {@link ResultEntry}. Must be called exactly once. + */ + public List finish() throws IOException { + if (rowCount == 0) { + // Nothing was written – close and return empty (caller should handle this) + out.close(); + return Collections.emptyList(); + } + + writer.flush(); + BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter(); + BlockHandle indexBlockHandle = writer.writeIndexBlock(); + BTreeFileFooter footer = new BTreeFileFooter(bloomFilterHandle, indexBlockHandle, null); + MemorySlice footerEncoding = BTreeFileFooter.writeFooter(footer); + writer.writeSlice(footerEncoding); + out.close(); + + return Collections.singletonList(new ResultEntry(fileName, rowCount, null)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BinaryFileMetaDeduplicationTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BinaryFileMetaDeduplicationTest.java new file mode 100644 index 000000000000..908949a35889 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BinaryFileMetaDeduplicationTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.btree; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexFileReadWrite; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; + +import static org.apache.paimon.io.DataFileTestUtils.newFile; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests verifying that {@link BinaryFileMetaIndexReader.EntryIterator} exposes keys correctly + * and that the {@link LinkedHashMap}-based deduplication in {@link BTreeWithFileMetaReader} + * eliminates duplicate {@code fileName} entries produced by parallel subtasks writing identical + * {@code btree_file_meta} SST files. + */ +public class BinaryFileMetaDeduplicationTest { + + @TempDir java.nio.file.Path tempDir; + + /** Write a single {@code btree_file_meta} SST with the given fileName -> entry mappings. */ + private List writeSst( + GlobalIndexFileReadWrite rw, + ManifestEntrySerializer serializer, + List entries) + throws IOException { + BinaryFileMetaWriter writer = new BinaryFileMetaWriter(rw, serializer); + for (ManifestEntry entry : entries) { + writer.write(entry.file().fileName(), entry); + } + return writer.finish(); + } + + private GlobalIndexIOMeta toIOMeta(LocalFileIO fileIO, IndexPathFactory factory, String name) + throws IOException { + Path p = factory.toPath(name); + return new GlobalIndexIOMeta(p, fileIO.getFileSize(p), null); + } + + // ------------------------------------------------------------------------- + // Test 1: EntryIterator correctly exposes both key and value + // ------------------------------------------------------------------------- + + @Test + public void testEntryIteratorExposesKeyAndValue() throws IOException { + LocalFileIO fileIO = new LocalFileIO(); + Path dir = new Path(tempDir.toString()); + IndexPathFactory factory = new SimpleIndexPathFactory(dir); + GlobalIndexFileReadWrite rw = new GlobalIndexFileReadWrite(fileIO, factory); + ManifestEntrySerializer serializer = new ManifestEntrySerializer(); + + ManifestEntry entry1 = makeEntry("data-file-aaa.parquet"); + ManifestEntry entry2 = makeEntry("data-file-bbb.parquet"); + + List entries = new ArrayList<>(); + entries.add(entry1); + entries.add(entry2); + List results = writeSst(rw, serializer, entries); + assertThat(results).hasSize(1); + + GlobalIndexIOMeta meta = toIOMeta(fileIO, factory, results.get(0).fileName()); + Options options = new Options(); + try (BinaryFileMetaIndexReader reader = new BinaryFileMetaIndexReader(rw, meta, options)) { + BinaryFileMetaIndexReader.EntryIterator it = reader.iterator(); + + // First entry + assertThat(it.hasNext()).isTrue(); + assertThat(it.nextKey()).isEqualTo("data-file-aaa.parquet"); + it.nextValue(); // advance + + // Second entry + assertThat(it.hasNext()).isTrue(); + assertThat(it.nextKey()).isEqualTo("data-file-bbb.parquet"); + it.nextValue(); + + assertThat(it.hasNext()).isFalse(); + } + } + + // ------------------------------------------------------------------------- + // Test 2: Identical SSTs from N parallel subtasks are deduplicated by fileName + // ------------------------------------------------------------------------- + + @Test + public void testDuplicateSstDeduplicationByFileName() throws IOException { + LocalFileIO fileIO = new LocalFileIO(); + Path dir = new Path(tempDir.toString()); + IndexPathFactory factory = new SimpleIndexPathFactory(dir); + GlobalIndexFileReadWrite rw = new GlobalIndexFileReadWrite(fileIO, factory); + ManifestEntrySerializer serializer = new ManifestEntrySerializer(); + + ManifestEntry e1 = makeEntry("data-file-001.parquet"); + ManifestEntry e2 = makeEntry("data-file-002.parquet"); + ManifestEntry e3 = makeEntry("data-file-003.parquet"); + + List entries = new ArrayList<>(); + entries.add(e1); + entries.add(e2); + entries.add(e3); + + // Simulate 3 parallel subtasks each writing an identical btree_file_meta SST + List metas = new ArrayList<>(); + int parallelism = 3; + for (int i = 0; i < parallelism; i++) { + List results = writeSst(rw, serializer, entries); + assertThat(results).hasSize(1); + metas.add(toIOMeta(fileIO, factory, results.get(0).fileName())); + } + assertThat(metas).hasSize(parallelism); + + // Simulate what BTreeWithFileMetaReader.readAllBinaryFileMetaEntries() does + LinkedHashMap seen = new LinkedHashMap<>(); + Options options = new Options(); + for (GlobalIndexIOMeta meta : metas) { + try (BinaryFileMetaIndexReader reader = + new BinaryFileMetaIndexReader(rw, meta, options)) { + BinaryFileMetaIndexReader.EntryIterator it = reader.iterator(); + while (it.hasNext()) { + String fileName = it.nextKey(); + byte[] valueBytes = it.nextValue(); + seen.putIfAbsent(fileName, valueBytes); + } + } + } + + // Despite 3 SST files each containing the same 3 fileNames, deduplication must yield + // exactly 3 distinct ManifestEntry bytes — not 3 * 3 = 9. + assertThat(seen).hasSize(3); + assertThat(seen.keySet()) + .containsExactly( + "data-file-001.parquet", "data-file-002.parquet", "data-file-003.parquet"); + } + + // ------------------------------------------------------------------------- + // Test 3: Partially-overlapping SSTs are correctly merged + // ------------------------------------------------------------------------- + + @Test + public void testPartiallyOverlappingSstsAreMerged() throws IOException { + LocalFileIO fileIO = new LocalFileIO(); + Path dir = new Path(tempDir.toString()); + IndexPathFactory factory = new SimpleIndexPathFactory(dir); + GlobalIndexFileReadWrite rw = new GlobalIndexFileReadWrite(fileIO, factory); + ManifestEntrySerializer serializer = new ManifestEntrySerializer(); + + // SST A: file-001, file-002 + List setA = new ArrayList<>(); + setA.add(makeEntry("data-file-001.parquet")); + setA.add(makeEntry("data-file-002.parquet")); + List resA = writeSst(rw, serializer, setA); + + // SST B: file-002 (duplicate), file-003 (new) + List setB = new ArrayList<>(); + setB.add(makeEntry("data-file-002.parquet")); + setB.add(makeEntry("data-file-003.parquet")); + List resB = writeSst(rw, serializer, setB); + + List metas = new ArrayList<>(); + metas.add(toIOMeta(fileIO, factory, resA.get(0).fileName())); + metas.add(toIOMeta(fileIO, factory, resB.get(0).fileName())); + + LinkedHashMap seen = new LinkedHashMap<>(); + Options options = new Options(); + for (GlobalIndexIOMeta meta : metas) { + try (BinaryFileMetaIndexReader reader = + new BinaryFileMetaIndexReader(rw, meta, options)) { + BinaryFileMetaIndexReader.EntryIterator it = reader.iterator(); + while (it.hasNext()) { + String fileName = it.nextKey(); + byte[] valueBytes = it.nextValue(); + seen.putIfAbsent(fileName, valueBytes); + } + } + } + + // file-002 appears in both SSTs but must appear exactly once + assertThat(seen).hasSize(3); + assertThat(seen.keySet()) + .containsExactlyInAnyOrder( + "data-file-001.parquet", "data-file-002.parquet", "data-file-003.parquet"); + } + + // ------------------------------------------------------------------------- + // Helper: create a minimal ManifestEntry with the given data file name + // ------------------------------------------------------------------------- + + private static ManifestEntry makeEntry(String dataFileName) { + return ManifestEntry.create( + FileKind.ADD, BinaryRow.EMPTY_ROW, 0, 1, newFile(dataFileName, 0, 0, 99, 0L)); + } + + // ------------------------------------------------------------------------- + // Minimal IndexPathFactory backed by a temp directory + // ------------------------------------------------------------------------- + + private static class SimpleIndexPathFactory implements IndexPathFactory { + + private final Path dir; + + SimpleIndexPathFactory(Path dir) { + this.dir = dir; + } + + @Override + public Path toPath(String fileName) { + return new Path(dir, fileName); + } + + @Override + public Path newPath() { + return new Path(dir, UUID.randomUUID().toString() + ".index"); + } + + @Override + public boolean isExternalPath() { + return false; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 35830faac19b..40a9cf3c22a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -35,19 +35,26 @@ import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.globalindex.GlobalIndexParallelWriter; +import org.apache.paimon.globalindex.IndexedSplit; +import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder; import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder; +import org.apache.paimon.globalindex.btree.BinaryFileMetaWriter; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.BatchWriteBuilder; -import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; @@ -61,6 +68,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -84,6 +93,7 @@ public static boolean buildIndex( PartitionPredicate partitionPredicate, Options userOptions) throws Exception { + boolean withFileMeta = userOptions.get(BTreeIndexOptions.BTREE_WITH_FILE_META); DataStream allCommitMessages = null; for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = @@ -118,6 +128,7 @@ public static boolean buildIndex( int indexFieldPos = readType.getFieldIndex(indexColumn); int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name()); DataType indexFieldType = readType.getTypeAt(indexFieldPos); + DataField indexDataField = table.rowType().getField(indexColumn); // 3. Calculate maximum parallelism bound long recordsPerRange = userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); @@ -131,6 +142,9 @@ public static boolean buildIndex( sortColumns.add(indexColumn); int partitionFieldSize = table.partitionKeys().size(); BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionFieldSize); + ManifestEntrySerializer manifestSerializer = + withFileMeta ? new ManifestEntrySerializer() : null; + for (Map.Entry>> partitionEntry : partitionRangeSplits.entrySet()) { BinaryRow partition = partitionEntry.getKey(); @@ -141,6 +155,34 @@ public static boolean buildIndex( continue; } + // Pre-serialize ManifestEntries for file-meta index (if withFileMeta is + // enabled) + List manifestEntryBytesList = null; + if (withFileMeta && manifestSerializer != null) { + manifestEntryBytesList = new ArrayList<>(); + for (Split split : rangeSplits) { + DataSplit dataSplit = + split instanceof IndexedSplit + ? ((IndexedSplit) split).dataSplit() + : (split instanceof DataSplit + ? (DataSplit) split + : null); + if (dataSplit == null) { + continue; + } + for (org.apache.paimon.io.DataFileMeta file : dataSplit.dataFiles()) { + ManifestEntry me = + ManifestEntry.create( + FileKind.ADD, + partition, + dataSplit.bucket(), + dataSplit.totalBuckets(), + file); + manifestEntryBytesList.add(manifestSerializer.serializeToBytes(me)); + } + } + } + DataStream commitMessages = executeForPartitionRange( env, @@ -148,6 +190,9 @@ public static boolean buildIndex( rangeSplits, readBuilder, indexBuilder, + withFileMeta + ? new BTreeWithFileMetaBuilder(table, indexDataField) + : null, partitionFieldSize, binaryRowSerializer.serializeToBytes(partition), indexFieldPos, @@ -157,7 +202,8 @@ public static boolean buildIndex( coreOptions, readType, recordsPerRange, - maxParallelism); + maxParallelism, + manifestEntryBytesList); allCommitMessages = allCommitMessages == null @@ -197,6 +243,7 @@ protected static DataStream executeForPartitionRange( List rangeSplits, ReadBuilder readBuilder, BTreeGlobalIndexBuilder indexBuilder, + @Nullable BTreeWithFileMetaBuilder fileMetaBuilder, int partitionFieldSize, byte[] partition, int indexFieldPos, @@ -206,7 +253,8 @@ protected static DataStream executeForPartitionRange( CoreOptions coreOptions, RowType readType, long recordsPerRange, - int maxParallelism) { + int maxParallelism, + @Nullable List manifestEntryBytesList) { int parallelism = Math.max((int) (range.count() / recordsPerRange), 1); parallelism = Math.min(parallelism, maxParallelism); @@ -246,9 +294,11 @@ protected static DataStream executeForPartitionRange( partitionFieldSize, partition, indexBuilder, + fileMetaBuilder, indexFieldPos, rowIdPos, - indexFieldType)) + indexFieldType, + manifestEntryBytesList)) .setParallelism(parallelism); } @@ -306,13 +356,20 @@ private static class WriteIndexOperator extends BoundedOneInputOperator manifestEntryBytesList; private transient long counter; private transient GlobalIndexParallelWriter currentWriter; - private transient List commitMessages; + /** Accumulated key index result batches; flushed as CommitMessages in endInput(). */ + private transient List> pendingKeyIndexBatches; + private transient InternalRow.FieldGetter indexFieldGetter; private transient BinaryRowSerializer binaryRowSerializer; @@ -321,22 +378,26 @@ public WriteIndexOperator( int partitionFieldSize, byte[] partition, BTreeGlobalIndexBuilder builder, + @Nullable BTreeWithFileMetaBuilder fileMetaBuilder, int indexFieldPos, int rowIdPos, - DataType indexFieldType) { + DataType indexFieldType, + @Nullable List manifestEntryBytesList) { this.rowRange = rowRange; this.partitionFieldSize = partitionFieldSize; this.partition = partition; this.builder = builder; + this.fileMetaBuilder = fileMetaBuilder; this.indexFieldPos = indexFieldPos; this.rowIdPos = rowIdPos; this.indexFieldType = indexFieldType; + this.manifestEntryBytesList = manifestEntryBytesList; } @Override public void open() throws Exception { super.open(); - commitMessages = new ArrayList<>(); + pendingKeyIndexBatches = new ArrayList<>(); indexFieldGetter = InternalRow.createFieldGetter(indexFieldType, indexFieldPos); this.binaryRowSerializer = new BinaryRowSerializer(partitionFieldSize); } @@ -345,11 +406,7 @@ public void open() throws Exception { public void processElement(StreamRecord element) throws IOException { InternalRow row = new FlinkRowWrapper(element.getValue()); if (currentWriter != null && counter >= builder.recordsPerRange()) { - commitMessages.add( - builder.flushIndex( - rowRange, - currentWriter.finish(), - binaryRowSerializer.deserializeFromBytes(partition))); + pendingKeyIndexBatches.add(currentWriter.finish()); currentWriter = null; counter = 0; } @@ -366,19 +423,59 @@ public void processElement(StreamRecord element) throws IOException { @Override public void endInput() throws IOException { - if (counter > 0) { - commitMessages.add( - builder.flushIndex( - rowRange, - currentWriter.finish(), - binaryRowSerializer.deserializeFromBytes(partition))); + if (counter > 0 && currentWriter != null) { + pendingKeyIndexBatches.add(currentWriter.finish()); + } + + if (!pendingKeyIndexBatches.isEmpty()) { + BinaryRow partitionRow = binaryRowSerializer.deserializeFromBytes(partition); + if (fileMetaBuilder != null) { + // Build file-meta index exactly once for the entire range handled by this + // instance. Only the first key index batch carries file-meta entries; + // subsequent batches carry none (avoiding duplicate file-meta references in + // the index manifest). + List binaryFileMetaEntries = buildBinaryFileMetaEntries(); + for (int i = 0; i < pendingKeyIndexBatches.size(); i++) { + List keyIndexEntries = pendingKeyIndexBatches.get(i); + List binaryFileMetaForThisBatch = + i == 0 ? binaryFileMetaEntries : new ArrayList<>(); + output.collect( + new StreamRecord<>( + new Committable( + BatchWriteBuilder.COMMIT_IDENTIFIER, + fileMetaBuilder.flushIndex( + rowRange, + keyIndexEntries, + binaryFileMetaForThisBatch, + partitionRow)))); + } + } else { + for (List keyIndexEntries : pendingKeyIndexBatches) { + output.collect( + new StreamRecord<>( + new Committable( + BatchWriteBuilder.COMMIT_IDENTIFIER, + builder.flushIndex( + rowRange, keyIndexEntries, partitionRow)))); + } + } + } + + pendingKeyIndexBatches.clear(); + } + + private List buildBinaryFileMetaEntries() throws IOException { + if (manifestEntryBytesList == null || manifestEntryBytesList.isEmpty()) { + return new ArrayList<>(); } - for (CommitMessage message : commitMessages) { - output.collect( - new StreamRecord<>( - new Committable(BatchWriteBuilder.COMMIT_IDENTIFIER, message))); + ManifestEntrySerializer serializer = new ManifestEntrySerializer(); + BinaryFileMetaWriter binaryFileMetaWriter = + fileMetaBuilder.createBinaryFileMetaWriter(); + for (byte[] entryBytes : manifestEntryBytesList) { + ManifestEntry entry = serializer.deserializeFromBytes(entryBytes); + binaryFileMetaWriter.write(entry.file().fileName(), entry); } - commitMessages.clear(); + return binaryFileMetaWriter.finish(); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java index d6368da23f01..b77f8f32f059 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java @@ -19,6 +19,8 @@ package org.apache.paimon.flink; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory; +import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.FileStoreTable; @@ -135,6 +137,55 @@ private void insertPartitionRows( } } + @Test + public void testBTreeIndexWithFileMeta() throws Catalog.TableNotExistException { + sql( + "CREATE TABLE T_FM (id INT, name STRING) WITH (" + + "'global-index.enabled' = 'true', " + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true'" + + ")"); + String values = + IntStream.range(0, 1_000) + .mapToObj(i -> String.format("(%s, %s)", i, "'name_" + i + "'")) + .collect(Collectors.joining(",")); + sql("INSERT INTO T_FM VALUES " + values); + sql( + "CALL sys.create_global_index(`table` => 'default.T_FM', index_column => 'name'," + + " index_type => 'btree', options => 'index.with-file-meta=true')"); + + FileStoreTable table = paimonTable("T_FM"); + List allEntries = table.store().newIndexFileHandler().scanEntries(); + + // assert key-index (btree) entries exist + List keyIndexEntries = + allEntries.stream() + .map(IndexManifestEntry::indexFile) + .filter(f -> BTreeGlobalIndexerFactory.IDENTIFIER.equals(f.indexType())) + .collect(Collectors.toList()); + assertThat(keyIndexEntries).isNotEmpty(); + + // assert file-meta index (btree_file_meta) entries exist + List fileMetaEntries = + allEntries.stream() + .map(IndexManifestEntry::indexFile) + .filter( + f -> + BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META.equals( + f.indexType())) + .collect(Collectors.toList()); + assertThat(fileMetaEntries).isNotEmpty(); + + long totalRowCount = keyIndexEntries.stream().mapToLong(IndexFileMeta::rowCount).sum(); + assertThat(totalRowCount).isEqualTo(1000L); + + // assert query returns correct results via manifest-free read path + assertThat(sql("SELECT * FROM T_FM WHERE name = 'name_100'")) + .containsOnly(Row.of(100, "name_100")); + assertThat(sql("SELECT * FROM T_FM WHERE name = 'name_999'")) + .containsOnly(Row.of(999, "name_999")); + } + private void buildBTreeIndexForTable(String tableName, String indexColumn) { sql( "CALL sys.create_global_index(`table` => 'default.%s', index_column => '%s', index_type => 'btree')", diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java index 5f7bfa453e5e..7114583818c7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java @@ -21,8 +21,17 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.globalindex.GlobalIndexParallelWriter; +import org.apache.paimon.globalindex.IndexedSplit; +import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder; import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder; +import org.apache.paimon.globalindex.btree.BinaryFileMetaWriter; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.spark.SparkRow; @@ -50,6 +59,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import org.apache.spark.sql.functions; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -107,6 +118,7 @@ public List buildIndex( List selectedColumns = new ArrayList<>(readType.getFieldNames()); // Calculate maximum parallelism bound + boolean withFileMeta = options.get(BTreeIndexOptions.BTREE_WITH_FILE_META); long recordsPerRange = options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); int maxParallelism = options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM); @@ -115,6 +127,8 @@ public List buildIndex( sortColumns.add(indexField.name()); final int partitionKeyNum = table.partitionKeys().size(); BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionKeyNum); + ManifestEntrySerializer manifestSerializer = + withFileMeta ? new ManifestEntrySerializer() : null; for (Map.Entry>> partitionEntry : partitionRangeSplits.entrySet()) { for (Map.Entry> entry : partitionEntry.getValue().entrySet()) { @@ -126,6 +140,31 @@ public List buildIndex( int partitionNum = Math.max((int) (range.count() / recordsPerRange), 1); partitionNum = Math.min(partitionNum, maxParallelism); + // Pre-serialize ManifestEntries for file-meta index (if withFileMeta is enabled) + List manifestEntryBytesList = null; + if (withFileMeta && manifestSerializer != null) { + manifestEntryBytesList = new ArrayList<>(); + for (Split split : rangeSplits) { + DataSplit dataSplit = + split instanceof IndexedSplit + ? ((IndexedSplit) split).dataSplit() + : (split instanceof DataSplit ? (DataSplit) split : null); + if (dataSplit == null) { + continue; + } + for (DataFileMeta file : dataSplit.dataFiles()) { + ManifestEntry me = + ManifestEntry.create( + FileKind.ADD, + partitionEntry.getKey(), + dataSplit.bucket(), + dataSplit.totalBuckets(), + file); + manifestEntryBytesList.add(manifestSerializer.serializeToBytes(me)); + } + } + } + Dataset source = PaimonUtils.createDataset( spark, @@ -148,6 +187,17 @@ public List buildIndex( final byte[] serializedBuilder = InstantiationUtil.serializeObject(indexBuilder); final byte[] partitionBytes = binaryRowSerializer.serializeToBytes(partitionEntry.getKey()); + final byte[] serializedFileMetaBuilder = + withFileMeta + ? InstantiationUtil.serializeObject( + new BTreeWithFileMetaBuilder(table, indexField)) + : null; + final byte[] serializedManifestEntries = + (withFileMeta + && manifestEntryBytesList != null + && !manifestEntryBytesList.isEmpty()) + ? InstantiationUtil.serializeObject(manifestEntryBytesList) + : null; JavaRDD written = partitioned .javaRDD() @@ -160,7 +210,9 @@ public List buildIndex( serializedBuilder, range, partitionKeyNum, - partitionBytes)); + partitionBytes, + serializedFileMetaBuilder, + serializedManifestEntries)); List commitBytes = written.collect(); allMessages.addAll(CommitMessageSerializer.deserializeAll(commitBytes)); } @@ -173,15 +225,98 @@ private static Iterator buildBTreeIndex( byte[] serializedBuilder, Range range, int partitionKeyNum, - byte[] partitionBytes) + byte[] partitionBytes, + @Nullable byte[] serializedFileMetaBuilder, + @Nullable byte[] serializedManifestEntries) throws IOException, ClassNotFoundException { final BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionKeyNum); BinaryRow partition = binaryRowSerializer.deserializeFromBytes(partitionBytes); BTreeGlobalIndexBuilder builder = InstantiationUtil.deserializeObject( serializedBuilder, BTreeGlobalIndexBuilder.class.getClassLoader()); + + if (serializedFileMetaBuilder != null) { + BTreeWithFileMetaBuilder fileMetaBuilder = + InstantiationUtil.deserializeObject( + serializedFileMetaBuilder, + BTreeWithFileMetaBuilder.class.getClassLoader()); + List manifestBytes = + serializedManifestEntries != null + ? InstantiationUtil.>deserializeObject( + serializedManifestEntries, + BTreeIndexTopoBuilder.class.getClassLoader()) + : new ArrayList<>(); + return CommitMessageSerializer.serializeAll( + buildWithFileMeta( + builder, + fileMetaBuilder, + range, + partition, + input, + manifestBytes)) + .iterator(); + } return CommitMessageSerializer.serializeAll( builder.buildForSinglePartition(range, partition, input)) .iterator(); } + + private static List buildWithFileMeta( + BTreeGlobalIndexBuilder builder, + BTreeWithFileMetaBuilder fileMetaBuilder, + Range range, + BinaryRow partition, + Iterator data, + List manifestEntryBytes) + throws IOException { + // Build file-meta index entries from pre-serialized manifest entry bytes + ManifestEntrySerializer meSerializer = new ManifestEntrySerializer(); + BinaryFileMetaWriter binaryFileMetaWriter = fileMetaBuilder.createBinaryFileMetaWriter(); + for (byte[] bytes : manifestEntryBytes) { + ManifestEntry entry = meSerializer.deserializeFromBytes(bytes); + binaryFileMetaWriter.write(entry.file().fileName(), entry); + } + List binaryFileMetaEntries = binaryFileMetaWriter.finish(); + + // Build key index entries, respecting recordsPerRange for splitting (mirrors Flink logic). + // file-meta entries are only attached to the first flush; subsequent batches carry none to + // avoid duplicate btree_file_meta SSTs in the index manifest. + long counter = 0; + boolean firstFlush = true; + GlobalIndexParallelWriter currentWriter = null; + List messages = new ArrayList<>(); + InternalRow.FieldGetter indexFieldGetter = + InternalRow.createFieldGetter(fileMetaBuilder.getIndexField().type(), 0); + + while (data.hasNext()) { + InternalRow row = data.next(); + if (currentWriter != null && counter >= builder.recordsPerRange()) { + messages.add( + fileMetaBuilder.flushIndex( + range, + currentWriter.finish(), + firstFlush ? binaryFileMetaEntries : Collections.emptyList(), + partition)); + firstFlush = false; + currentWriter = null; + counter = 0; + } + counter++; + if (currentWriter == null) { + currentWriter = builder.createWriter(); + } + long localRowId = row.getLong(1) - range.from; + currentWriter.write(indexFieldGetter.getFieldOrNull(row), localRowId); + } + + if (counter > 0) { + messages.add( + fileMetaBuilder.flushIndex( + range, + currentWriter.finish(), + firstFlush ? binaryFileMetaEntries : Collections.emptyList(), + partition)); + } + return messages; + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala index b45475ff0c82..507ab050a2f5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.procedure -import org.apache.paimon.globalindex.btree.{BTreeIndexMeta, KeySerializer} +import org.apache.paimon.globalindex.btree.{BTreeGlobalIndexerFactory, BTreeIndexMeta, BTreeWithFileMetaBuilder, KeySerializer} import org.apache.paimon.memory.MemorySlice import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.types.VarCharType @@ -346,6 +346,117 @@ class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest } } + test("create btree global index with file meta") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'btree-index.records-per-range' = '1000') + |""".stripMargin) + + val values = + (0 until 10000).map(i => s"($i, 'name_$i')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name'," + + " index_type => 'btree', options => 'btree-index.records-per-range=1000,index.with-file-meta=true')") + .collect() + .head + + assert(output.getBoolean(0)) + + val table = loadTable("T") + val allEntries = table.store().newIndexFileHandler().scanEntries().asScala + + // assert key-index (btree) entries exist with correct row count + val keyIndexEntries = + allEntries.filter(_.indexFile().indexType() == BTreeGlobalIndexerFactory.IDENTIFIER) + assert(keyIndexEntries.nonEmpty) + val totalRowCount = keyIndexEntries.map(_.indexFile().rowCount()).sum + assert(totalRowCount == 10000L) + + // assert file-meta index (btree_file_meta) entries exist + val fileMetaEntries = + allEntries.filter( + _.indexFile().indexType() == BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META) + assert(fileMetaEntries.nonEmpty) + + // assert query returns correct results via manifest-free read path + val result100 = spark.sql("SELECT * FROM T WHERE name = 'name_100'").collect() + assert(result100.length == 1) + assert(result100.head.getInt(0) == 100) + + val result9999 = spark.sql("SELECT * FROM T WHERE name = 'name_9999'").collect() + assert(result9999.length == 1) + assert(result9999.head.getInt(0) == 9999) + } + } + + test("create btree global index with file meta and multiple partitions") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + | PARTITIONED BY (pt) + |""".stripMargin) + + var values = (0 until 5000).map(i => s"($i, 'name_p0_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 5000).map(i => s"($i, 'name_p1_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name'," + + " index_type => 'btree', options => 'btree-index.records-per-range=1000,index.with-file-meta=true')") + .collect() + .head + + assert(output.getBoolean(0)) + + val table = loadTable("T") + val allEntries = table.store().newIndexFileHandler().scanEntries().asScala + + // assert key-index entries for both partitions + val keyIndexEntries = + allEntries.filter(_.indexFile().indexType() == BTreeGlobalIndexerFactory.IDENTIFIER) + assert(keyIndexEntries.nonEmpty) + val totalRowCount = keyIndexEntries.map(_.indexFile().rowCount()).sum + assert(totalRowCount == 10000L) + + val keyIndexByPartition = keyIndexEntries.groupBy(_.partition()) + assert(keyIndexByPartition.size == 2) + keyIndexByPartition.values.foreach { + entries => assert(entries.map(_.indexFile().rowCount()).sum == 5000L) + } + + // assert file-meta index entries exist + assert( + allEntries.exists( + _.indexFile().indexType() == BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META)) + + // assert queries return correct results + val resP0 = spark.sql("SELECT * FROM T WHERE name = 'name_p0_100'").collect() + assert(resP0.length == 1) + assert(resP0.head.getString(2) == "p0") + + val resP1 = spark.sql("SELECT * FROM T WHERE name = 'name_p1_200'").collect() + assert(resP1.length == 1) + assert(resP1.head.getString(2) == "p1") + } + } + private def assertMultiplePartitionsResult( tableName: String, rowCount: Long,