evalGlobalIndex() {
- if (this.globalIndexResult != null) {
- return Optional.of(globalIndexResult);
+ /**
+ * Builds a query plan using the pre-fetched {@link ManifestEntry} rows stored in
+ * btree_file_meta index. This method does NOT call {@link #batchScan} at all, so no manifest
+ * HDFS reads occur.
+ *
+ * Returns {@code null} if the index is stale (i.e., one or more referenced data files no
+ * longer exist, typically due to compaction after the index was built). The caller should fall
+ * back to a regular manifest scan in that case.
+ */
+ @Nullable
+ private Plan buildPlanFromFilePathIndex(FilePathGlobalIndexResult result) throws IOException {
+ ManifestEntrySerializer serializer = new ManifestEntrySerializer();
+
+ List entries = new ArrayList<>();
+ for (byte[] bytes : result.manifestEntryBytes()) {
+ entries.add(serializer.deserializeFromBytes(bytes));
+ }
+
+ // Build rowRangeIndex once; use it for both planning-time file pruning and
+ // reading-time row-level filtering (IndexedSplit).
+ RowRangeIndex rowRangeIndex = null;
+ if (result.hasRowLevelFilter()) {
+ rowRangeIndex = RowRangeIndex.create(result.rowIndexResult().results().toRangeList());
+ // Planning-time file pruning: discard files whose row range has no intersection
+ // with the matched row IDs, equivalent to what batchScan does during manifest scan.
+ final RowRangeIndex index = rowRangeIndex;
+ entries =
+ entries.stream()
+ .filter(
+ e -> {
+ long from = e.file().nonNullFirstRowId();
+ long to = from + e.file().rowCount() - 1;
+ return !index.intersectedRanges(from, to).isEmpty();
+ })
+ .collect(Collectors.toList());
+ }
+
+ // Guard against stale index caused by compaction after the index was built.
+ // For typical point/narrow queries this is O(1~3) fileIO.exists() calls.
+ for (ManifestEntry entry : entries) {
+ if (!table.fileIO()
+ .exists(
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(entry.partition(), entry.bucket())
+ .toPath(entry.file()))) {
+ LOG.warn(
+ "btree_file_meta index is stale: file {} no longer exists "
+ + "(possibly removed by compaction). Falling back to manifest scan.",
+ entry.file().fileName());
+ return null;
+ }
}
- if (filter == null) {
- return Optional.empty();
+
+ List splits = buildSplitsFromEntries(entries);
+ if (splits.isEmpty()) {
+ return Collections::emptyList;
}
- CoreOptions options = table.coreOptions();
- if (!options.globalIndexEnabled()) {
- return Optional.empty();
+
+ if (rowRangeIndex != null) {
+ return wrapToIndexSplits(splits, rowRangeIndex, null);
}
- PartitionPredicate partitionFilter =
- batchScan.snapshotReader().manifestsReader().partitionFilter();
- Optional optionalScanner =
- GlobalIndexScanner.create(table, partitionFilter, filter);
- if (!optionalScanner.isPresent()) {
- return Optional.empty();
+ return () -> splits;
+ }
+
+ /**
+ * Reconstructs {@link DataSplit} objects from a list of {@link ManifestEntry} instances.
+ *
+ * Groups entries by {@code partition + bucket}, then builds one {@link DataSplit} per group.
+ * Uses {@link BinaryRow} directly as map key so that grouping relies on its content-based
+ * {@code equals}/{@code hashCode}, which is collision-free.
+ */
+ private List buildSplitsFromEntries(List entries) {
+ // partition -> bucket -> files
+ Map>> grouped = new HashMap<>();
+ Map> representative = new HashMap<>();
+
+ for (ManifestEntry entry : entries) {
+ grouped.computeIfAbsent(entry.partition(), k -> new HashMap<>())
+ .computeIfAbsent(entry.bucket(), k -> new ArrayList<>())
+ .add(entry.file());
+ representative
+ .computeIfAbsent(entry.partition(), k -> new HashMap<>())
+ .put(entry.bucket(), entry);
}
- try (GlobalIndexScanner scanner = optionalScanner.get()) {
- return scanner.scan(filter);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ List splits = new ArrayList<>();
+ for (Map.Entry>> partEntry :
+ grouped.entrySet()) {
+ for (Map.Entry> bucketEntry :
+ partEntry.getValue().entrySet()) {
+ ManifestEntry rep =
+ representative.get(partEntry.getKey()).get(bucketEntry.getKey());
+ List files = bucketEntry.getValue();
+ // Sort by firstRowId for consistent ordering
+ files.sort((a, b) -> Long.compare(a.nonNullFirstRowId(), b.nonNullFirstRowId()));
+
+ Long latestSnapshotId =
+ batchScan.snapshotReader().snapshotManager().latestSnapshotId();
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(latestSnapshotId != null ? latestSnapshotId : -1L)
+ .withPartition(rep.partition())
+ .withBucket(rep.bucket())
+ .withBucketPath("")
+ .withTotalBuckets(rep.totalBuckets())
+ .isStreaming(false)
+ .rawConvertible(false)
+ .withDataFiles(files)
+ .build();
+ splits.add(split);
+ }
}
+ return splits;
}
@VisibleForTesting
diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/FilePathGlobalIndexResult.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/FilePathGlobalIndexResult.java
new file mode 100644
index 000000000000..e189e6c4ee55
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/FilePathGlobalIndexResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Result of a BTree-with-file-meta global index query. Contains pre-fetched {@link
+ * org.apache.paimon.manifest.ManifestEntry} bytes (from the btree_file_meta index) that can be used
+ * to build a query plan without reading manifest files from HDFS, plus an optional row-level {@link
+ * GlobalIndexResult} (from the btree key index) for fine-grained row filtering inside matching
+ * files.
+ */
+public class FilePathGlobalIndexResult {
+
+ private final List manifestEntryBytes;
+ @Nullable private final GlobalIndexResult rowIndexResult;
+
+ public FilePathGlobalIndexResult(
+ List manifestEntryBytes, @Nullable GlobalIndexResult rowIndexResult) {
+ this.manifestEntryBytes = manifestEntryBytes;
+ this.rowIndexResult = rowIndexResult;
+ }
+
+ /**
+ * Raw serialized {@link org.apache.paimon.manifest.ManifestEntry} bytes from btree_file_meta
+ * index.
+ */
+ public List manifestEntryBytes() {
+ return manifestEntryBytes;
+ }
+
+ /** Whether row-level filtering is available via the btree key index. */
+ public boolean hasRowLevelFilter() {
+ return rowIndexResult != null;
+ }
+
+ /** The row-level index result (btree key index), or {@code null} if not available. */
+ @Nullable
+ public GlobalIndexResult rowIndexResult() {
+ return rowIndexResult;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
index 3092f1b3daaf..a92aed65d042 100644
--- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
@@ -20,6 +20,9 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
+import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder;
+import org.apache.paimon.globalindex.btree.BTreeWithFileMetaReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
@@ -35,6 +38,8 @@
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Range;
+import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -63,6 +68,10 @@ public class GlobalIndexScanner implements Closeable {
private final GlobalIndexEvaluator globalIndexEvaluator;
private final IndexPathFactory indexPathFactory;
+ // Per-field map of BTreeWithFileMetaReader instances, keyed by field ID.
+ // Non-null only when btree (with companion btree_file_meta) index files are present.
+ @Nullable private final Map filePathIndexReaders;
+
public GlobalIndexScanner(
Options options,
RowType rowType,
@@ -74,18 +83,76 @@ public GlobalIndexScanner(
ManifestReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM));
this.indexPathFactory = indexPathFactory;
GlobalIndexFileReader indexFileReader = meta -> fileIO.newInputStream(meta.filePath());
+
+ // Step 1: collect (fieldId, range) pairs that have a btree_file_meta companion
+ Map> binaryFileMetaKeyMap = new HashMap<>();
+ for (IndexFileMeta indexFile : indexFiles) {
+ if (BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META.equals(indexFile.indexType())) {
+ GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+ binaryFileMetaKeyMap
+ .computeIfAbsent(meta.indexFieldId(), k -> new HashSet<>())
+ .add(meta.rowRangeStart());
+ }
+ }
+
+ // Step 2: classify index files
+ // fieldId -> indexType -> range -> List
Map>>> indexMetas = new HashMap<>();
+ // fieldId -> (keyIndexByRange, binaryFileMetaByRange)
+ Map>> keyIndexByField = new HashMap<>();
+ Map>> binaryFileMetaByField = new HashMap<>();
+ Map fieldById = new HashMap<>();
+
for (IndexFileMeta indexFile : indexFiles) {
GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
int fieldId = meta.indexFieldId();
String indexType = indexFile.indexType();
- indexMetas
- .computeIfAbsent(fieldId, k -> new HashMap<>())
- .computeIfAbsent(indexType, k -> new HashMap<>())
- .computeIfAbsent(
- new Range(meta.rowRangeStart(), meta.rowRangeEnd()),
- k -> new ArrayList<>())
- .add(indexFile);
+ Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd());
+
+ if (BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META.equals(indexType)) {
+ binaryFileMetaByField
+ .computeIfAbsent(fieldId, k -> new HashMap<>())
+ .computeIfAbsent(range, k -> new ArrayList<>())
+ .add(toGlobalMeta(indexFile));
+ } else if (BTreeGlobalIndexerFactory.IDENTIFIER.equals(indexType)
+ && binaryFileMetaKeyMap
+ .getOrDefault(fieldId, Collections.emptySet())
+ .contains(meta.rowRangeStart())) {
+ keyIndexByField
+ .computeIfAbsent(fieldId, k -> new HashMap<>())
+ .computeIfAbsent(range, k -> new ArrayList<>())
+ .add(toGlobalMeta(indexFile));
+ fieldById.put(fieldId, rowType.getField(fieldId));
+ } else {
+ indexMetas
+ .computeIfAbsent(fieldId, k -> new HashMap<>())
+ .computeIfAbsent(indexType, k -> new HashMap<>())
+ .computeIfAbsent(range, k -> new ArrayList<>())
+ .add(indexFile);
+ }
+ }
+
+ // Build BTreeWithFileMetaReader for fields that have btree (with btree_file_meta) files
+ if (!keyIndexByField.isEmpty()) {
+ this.filePathIndexReaders = new HashMap<>();
+ for (Map.Entry>> entry :
+ keyIndexByField.entrySet()) {
+ int fieldId = entry.getKey();
+ DataField dataField = fieldById.get(fieldId);
+ Map> binaryFileMetaForField =
+ binaryFileMetaByField.getOrDefault(fieldId, Collections.emptyMap());
+ filePathIndexReaders.put(
+ fieldId,
+ new BTreeWithFileMetaReader(
+ entry.getValue(),
+ binaryFileMetaForField,
+ dataField,
+ options,
+ indexFileReader,
+ executor));
+ }
+ } else {
+ this.filePathIndexReaders = null;
}
IntFunction> readersFunction =
@@ -142,6 +209,35 @@ public Optional scan(Predicate predicate) {
return globalIndexEvaluator.evaluate(predicate);
}
+ /**
+ * Evaluates the predicate using btree/btree_file_meta index files and returns a {@link
+ * FilePathGlobalIndexResult} when available. Returns {@link Optional#empty()} if no FilePath
+ * Global Index files are present for the predicate fields.
+ */
+ public Optional scanWithFilePath(Predicate predicate)
+ throws IOException {
+ if (filePathIndexReaders == null || filePathIndexReaders.isEmpty()) {
+ return Optional.empty();
+ }
+ // For simplicity, we support one field with file-path index at a time.
+ // In the future this could be extended to multi-field AND/OR combinations.
+ for (BTreeWithFileMetaReader reader : filePathIndexReaders.values()) {
+ Optional result = reader.scan(predicate);
+ if (result.isPresent()) {
+ return result;
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Returns true if this scanner has any btree/btree_file_meta index files, i.e., whether {@link
+ * #scanWithFilePath(Predicate)} may return a non-empty result.
+ */
+ public boolean hasFilePathIndex() {
+ return filePathIndexReaders != null && !filePathIndexReaders.isEmpty();
+ }
+
private Collection createReaders(
GlobalIndexFileReader indexFileReadWrite,
Map>> indexMetas,
diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaBuilder.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaBuilder.java
new file mode 100644
index 000000000000..a3b45254781b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaBuilder.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Range;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
+
+/**
+ * Builder for BTree global index with embedded file metadata (key index + file-meta index).
+ *
+ * Key index (index type {@link BTreeGlobalIndexerFactory#IDENTIFIER}, i.e. {@code "btree"}):
+ * standard BTree index mapping {@code key -> rowIds}. Per-entry fileNames are also tracked so that
+ * the file-meta index can be populated.
+ *
+ *
File-meta index (index type {@link #INDEX_TYPE_FILE_META}): a flat SST that maps {@code
+ * fileName -> ManifestEntry bytes}. One entry per data file covered by this index shard.
+ *
+ *
Both parts are committed atomically in a single {@link CommitMessage}.
+ */
+public class BTreeWithFileMetaBuilder implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String INDEX_TYPE_FILE_META = "btree_file_meta";
+
+ private final FileStoreTable table;
+ private final DataField indexField;
+
+ public BTreeWithFileMetaBuilder(FileStoreTable table, DataField indexField) {
+ this.table = table;
+ this.indexField = indexField;
+ }
+
+ public DataField getIndexField() {
+ return indexField;
+ }
+
+ /**
+ * Builds btree key index + btree_file_meta index for one partition range and returns a single
+ * {@link CommitMessage} containing both.
+ *
+ * @param rowRange global row id range covered by this shard
+ * @param partition partition key
+ * @param split the DataSplit being indexed (used to gather DataFileMeta for btree_file_meta)
+ * @param data iterator of (indexField, globalRowId) rows
+ */
+ public CommitMessage build(
+ Range rowRange, BinaryRow partition, DataSplit split, Iterator data)
+ throws IOException {
+
+ // Build firstRowId -> DataFileMeta interval map for fast lookup
+ List dataFiles = split.dataFiles();
+ // dataFiles are ordered by firstRowId within a split
+ long[] firstRowIds = new long[dataFiles.size()];
+ for (int i = 0; i < dataFiles.size(); i++) {
+ firstRowIds[i] = dataFiles.get(i).nonNullFirstRowId();
+ }
+
+ IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
+ GlobalIndexFileReadWrite rw =
+ new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory);
+
+ // ---- Key index writer (standard BTree key→rowIds) ----
+ GlobalIndexWriter rawWriter =
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ BTreeGlobalIndexerFactory.IDENTIFIER,
+ indexField,
+ table.coreOptions().toConfiguration());
+ if (!(rawWriter instanceof GlobalIndexParallelWriter)) {
+ throw new RuntimeException(
+ "BTree writer must implement GlobalIndexParallelWriter, found: "
+ + rawWriter.getClass().getName());
+ }
+ GlobalIndexParallelWriter keyIndexWriter = (GlobalIndexParallelWriter) rawWriter;
+
+ // ---- File-meta index writer (fileName → ManifestEntry bytes) ----
+ ManifestEntrySerializer manifestSerializer = new ManifestEntrySerializer();
+ BinaryFileMetaWriter binaryFileMetaWriter =
+ new BinaryFileMetaWriter(rw, manifestSerializer);
+
+ // Write file-meta index entries (one per DataFileMeta in the split)
+ for (DataFileMeta file : dataFiles) {
+ ManifestEntry entry =
+ ManifestEntry.create(
+ FileKind.ADD, partition, split.bucket(), split.totalBuckets(), file);
+ binaryFileMetaWriter.write(file.fileName(), entry);
+ }
+
+ // Write key-index entries
+ InternalRow.FieldGetter indexFieldGetter =
+ InternalRow.createFieldGetter(indexField.type(), 0);
+ while (data.hasNext()) {
+ InternalRow row = data.next();
+ long globalRowId = row.getLong(1);
+ long localRowId = globalRowId - rowRange.from;
+ Object key = indexFieldGetter.getFieldOrNull(row);
+ keyIndexWriter.write(key, localRowId);
+ }
+
+ // Finish both writers and collect ResultEntries
+ List keyIndexEntries = keyIndexWriter.finish();
+ List binaryFileMetaEntries = binaryFileMetaWriter.finish();
+
+ // Build IndexFileMetas for btree key index
+ List keyIndexMetas =
+ toIndexFileMetas(
+ table.fileIO(),
+ indexPathFactory,
+ table.coreOptions(),
+ rowRange,
+ indexField.id(),
+ BTreeGlobalIndexerFactory.IDENTIFIER,
+ keyIndexEntries);
+
+ // Build IndexFileMetas for btree_file_meta (rowRange and fieldId are same, to allow
+ // scanning)
+ List binaryFileMetaIndexMetas =
+ toIndexFileMetas(
+ table.fileIO(),
+ indexPathFactory,
+ table.coreOptions(),
+ rowRange,
+ indexField.id(),
+ INDEX_TYPE_FILE_META,
+ binaryFileMetaEntries);
+
+ List allMetas = new ArrayList<>();
+ allMetas.addAll(keyIndexMetas);
+ allMetas.addAll(binaryFileMetaIndexMetas);
+
+ // Single CommitMessage → atomic commit of btree key index + btree_file_meta
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(allMetas);
+ return new CommitMessageImpl(
+ partition, 0, null, dataIncrement, CompactIncrement.emptyIncrement());
+ }
+
+ /**
+ * Flush overload compatible with the existing {@link BTreeGlobalIndexBuilder} API, where we
+ * have already-written btree key index result entries plus an explicit list of ManifestEntry
+ * rows for btree_file_meta.
+ */
+ public CommitMessage flushIndex(
+ Range rowRange,
+ List keyIndexResultEntries,
+ List binaryFileMetaResultEntries,
+ BinaryRow partition)
+ throws IOException {
+
+ IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
+
+ List keyIndexMetas =
+ toIndexFileMetas(
+ table.fileIO(),
+ indexPathFactory,
+ table.coreOptions(),
+ rowRange,
+ indexField.id(),
+ BTreeGlobalIndexerFactory.IDENTIFIER,
+ keyIndexResultEntries);
+
+ List binaryFileMetaIndexMetas =
+ toIndexFileMetas(
+ table.fileIO(),
+ indexPathFactory,
+ table.coreOptions(),
+ rowRange,
+ indexField.id(),
+ INDEX_TYPE_FILE_META,
+ binaryFileMetaResultEntries);
+
+ List allMetas = new ArrayList<>();
+ allMetas.addAll(keyIndexMetas);
+ allMetas.addAll(binaryFileMetaIndexMetas);
+
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(allMetas);
+ return new CommitMessageImpl(
+ partition, 0, null, dataIncrement, CompactIncrement.emptyIncrement());
+ }
+
+ /**
+ * Creates a new binary file-meta index writer that accumulates {@code fileName ->
+ * ManifestEntry} mappings. Call {@link BinaryFileMetaWriter#finish()} to get the {@link
+ * ResultEntry} list.
+ */
+ public BinaryFileMetaWriter createBinaryFileMetaWriter() throws IOException {
+ IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
+ GlobalIndexFileReadWrite rw =
+ new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory);
+ return new BinaryFileMetaWriter(rw, new ManifestEntrySerializer());
+ }
+
+ /**
+ * Resolves which {@link DataFileMeta} a global row id belongs to by binary search over
+ * pre-computed {@code firstRowIds} array.
+ */
+ static int findFileIndex(long[] firstRowIds, long globalRowId) {
+ int lo = 0;
+ int hi = firstRowIds.length - 1;
+ while (lo < hi) {
+ int mid = (lo + hi + 1) >>> 1;
+ if (firstRowIds[mid] <= globalRowId) {
+ lo = mid;
+ } else {
+ hi = mid - 1;
+ }
+ }
+ return lo;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaReader.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaReader.java
new file mode 100644
index 000000000000..52d3e7c91665
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeWithFileMetaReader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.FilePathGlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexEvaluator;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
+import org.apache.paimon.globalindex.UnionGlobalIndexReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Reader for BTree-with-file-meta index (btree + btree_file_meta).
+ *
+ * btree key index files are evaluated with the standard BTree predicate visitor to obtain
+ * matching row IDs. btree_file_meta files are read fully to obtain {@link
+ * org.apache.paimon.manifest.ManifestEntry} rows for the files covered by this index shard.
+ *
+ *
Returns a {@link FilePathGlobalIndexResult} combining both.
+ */
+public class BTreeWithFileMetaReader {
+
+ private final Map> keyIndexByRange;
+ private final Map> binaryFileMetaByRange;
+ private final DataField indexField;
+ private final Options options;
+ private final GlobalIndexFileReader fileReader;
+ private final ExecutorService executor;
+
+ public BTreeWithFileMetaReader(
+ Map> keyIndexByRange,
+ Map> binaryFileMetaByRange,
+ DataField indexField,
+ Options options,
+ GlobalIndexFileReader fileReader,
+ ExecutorService executor) {
+ this.keyIndexByRange = keyIndexByRange;
+ this.binaryFileMetaByRange = binaryFileMetaByRange;
+ this.indexField = indexField;
+ this.options = options;
+ this.fileReader = fileReader;
+ this.executor = executor;
+ }
+
+ /**
+ * Evaluates the predicate against the btree key index and reads btree_file_meta manifest
+ * entries.
+ *
+ * @return {@link Optional#empty()} if no btree key index files exist, otherwise a {@link
+ * FilePathGlobalIndexResult} with row-level bitmap and manifest entry rows.
+ */
+ public Optional scan(Predicate predicate) throws IOException {
+ if (keyIndexByRange.isEmpty()) {
+ return Optional.empty();
+ }
+
+ // ---- btree key index: standard BTree predicate evaluation ----
+ BTreeGlobalIndexer indexer = new BTreeGlobalIndexer(indexField, options);
+ RowType singleFieldRowType = new RowType(false, Collections.singletonList(indexField));
+
+ List unionReaders = new ArrayList<>();
+ for (Map.Entry> entry : keyIndexByRange.entrySet()) {
+ Range range = entry.getKey();
+ List metas = entry.getValue();
+ GlobalIndexReader innerReader = indexer.createReader(fileReader, metas);
+ unionReaders.add(new OffsetGlobalIndexReader(innerReader, range.from, range.to));
+ }
+
+ GlobalIndexReader keyIndexUnion = new UnionGlobalIndexReader(unionReaders, executor);
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ singleFieldRowType, fieldId -> Collections.singletonList(keyIndexUnion));
+
+ Optional keyIndexResult;
+ try {
+ keyIndexResult = evaluator.evaluate(predicate);
+ } finally {
+ evaluator.close();
+ }
+
+ if (!keyIndexResult.isPresent() || keyIndexResult.get().results().isEmpty()) {
+ return Optional.empty();
+ }
+
+ // ---- btree_file_meta: read all ManifestEntry raw bytes ----
+ List manifestEntryBytes = readAllBinaryFileMetaEntries();
+ if (manifestEntryBytes.isEmpty()) {
+ // No btree_file_meta data — return key-index result only (no file-path optimization)
+ return Optional.empty();
+ }
+
+ return Optional.of(new FilePathGlobalIndexResult(manifestEntryBytes, keyIndexResult.get()));
+ }
+
+ private List readAllBinaryFileMetaEntries() throws IOException {
+ // Use LinkedHashMap to deduplicate by fileName across multiple SST files.
+ // When the same range is written by multiple parallel subtasks, each subtask produces a
+ // complete btree_file_meta SST with identical entries. putIfAbsent keeps only the first
+ // occurrence and preserves insertion order.
+ LinkedHashMap seen = new LinkedHashMap<>();
+
+ for (List metas : binaryFileMetaByRange.values()) {
+ for (GlobalIndexIOMeta meta : metas) {
+ BinaryFileMetaIndexReader reader =
+ new BinaryFileMetaIndexReader(fileReader, meta, options);
+ try {
+ BinaryFileMetaIndexReader.EntryIterator it = reader.iterator();
+ while (it.hasNext()) {
+ String fileName = it.nextKey();
+ byte[] valueBytes = it.nextValue();
+ seen.putIfAbsent(fileName, valueBytes);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ }
+ return new ArrayList<>(seen.values());
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaIndexReader.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaIndexReader.java
new file mode 100644
index 000000000000..6f9c9d91f1d6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaIndexReader.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.sst.BlockCache;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Sequential reader for btree_file_meta SST files (fileName → ManifestEntry bytes).
+ *
+ * btree_file_meta files are written by {@link BinaryFileMetaWriter} using the same SST format as
+ * btree key index files, but without a BTreeIndexMeta (null meta) and without a null bitmap. Keys
+ * are UTF-8 encoded file names; values are serialized {@link
+ * org.apache.paimon.manifest.ManifestEntry} bytes.
+ */
+public class BinaryFileMetaIndexReader implements Closeable {
+
+ private final SeekableInputStream input;
+ private final SstFileReader sstReader;
+
+ public BinaryFileMetaIndexReader(
+ GlobalIndexFileReader fileReader, GlobalIndexIOMeta meta, Options options)
+ throws IOException {
+ this.input = fileReader.getInputStream(meta);
+ long fileSize = meta.fileSize();
+
+ CacheManager cacheManager =
+ new CacheManager(
+ options.get(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE),
+ options.get(BTreeIndexOptions.BTREE_INDEX_HIGH_PRIORITY_POOL_RATIO));
+
+ BlockCache blockCache = new BlockCache(meta.filePath(), input, cacheManager);
+ BTreeFileFooter footer = readFooter(blockCache, fileSize);
+
+ Comparator sliceComparator = lexicographicComparator();
+ this.sstReader =
+ new SstFileReader(sliceComparator, blockCache, footer.getIndexBlockHandle(), null);
+ }
+
+ /** Returns an iterator over all (key, value) pairs in insertion order. */
+ public EntryIterator iterator() {
+ return new EntryIterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ sstReader.close();
+ input.close();
+ }
+
+ private BTreeFileFooter readFooter(BlockCache blockCache, long fileSize) {
+ MemorySegment footerBytes =
+ blockCache.getBlock(
+ fileSize - BTreeFileFooter.ENCODED_LENGTH,
+ BTreeFileFooter.ENCODED_LENGTH,
+ b -> b,
+ true);
+ return BTreeFileFooter.readFooter(MemorySlice.wrap(footerBytes).toInput());
+ }
+
+ private static Comparator lexicographicComparator() {
+ return (a, b) -> {
+ int lenA = a.length();
+ int lenB = b.length();
+ int min = Math.min(lenA, lenB);
+ for (int i = 0; i < min; i++) {
+ int diff = (a.readByte(i) & 0xFF) - (b.readByte(i) & 0xFF);
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return lenA - lenB;
+ };
+ }
+
+ /**
+ * Iterator over btree_file_meta SST entries, yielding (fileName, ManifestEntry bytes) pairs.
+ */
+ public class EntryIterator {
+
+ private final SstFileReader.SstFileIterator fileIter;
+ private BlockIterator dataIter;
+ private String nextKey;
+ private byte[] nextValue;
+
+ private EntryIterator() {
+ this.fileIter = sstReader.createIterator();
+ }
+
+ public boolean hasNext() throws IOException {
+ if (nextValue != null) {
+ return true;
+ }
+
+ while (true) {
+ if (dataIter != null && dataIter.hasNext()) {
+ Map.Entry entry = dataIter.next();
+ nextKey = new String(entry.getKey().copyBytes(), StandardCharsets.UTF_8);
+ nextValue = entry.getValue().copyBytes();
+ return true;
+ }
+
+ dataIter = fileIter.readBatch();
+ if (dataIter == null) {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Returns the fileName (key) of the current entry. Must be called after {@link #hasNext()}
+ * returns {@code true} and before {@link #nextValue()}.
+ */
+ public String nextKey() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more entries in btree_file_meta file.");
+ }
+ return nextKey;
+ }
+
+ /** Returns the raw ManifestEntry bytes (value) and advances the iterator. */
+ public byte[] nextValue() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more entries in btree_file_meta file.");
+ }
+ byte[] result = nextValue;
+ nextKey = null;
+ nextValue = null;
+ return result;
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaWriter.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaWriter.java
new file mode 100644
index 000000000000..07d8612064a5
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BinaryFileMetaWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
+import org.apache.paimon.sst.SstFileWriter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * btree_file_meta writer: stores {@code fileName -> ManifestEntry bytes} in a single SST file.
+ *
+ * Duplicate fileNames are silently ignored (only the first occurrence is written). Keys must be
+ * written in lexicographic order of fileName, which is guaranteed because callers iterate over
+ * {@link org.apache.paimon.io.DataFileMeta} lists that are already ordered.
+ */
+public class BinaryFileMetaWriter {
+
+ /** Prefix used when creating the SST file name. */
+ public static final String FILE_PREFIX = "btree-file-meta";
+
+ private final String fileName;
+ private final PositionOutputStream out;
+ private final SstFileWriter writer;
+ private final ManifestEntrySerializer manifestSerializer;
+ private final KeySerializer keySerializer;
+ private final Set writtenFileNames = new LinkedHashSet<>();
+ private long rowCount = 0;
+
+ public BinaryFileMetaWriter(
+ GlobalIndexFileReadWrite rw, ManifestEntrySerializer manifestSerializer)
+ throws IOException {
+ this.fileName = rw.newFileName(FILE_PREFIX);
+ this.out = rw.newOutputStream(this.fileName);
+ // Use no compression and default block size (64 KiB) for simplicity
+ this.writer =
+ new SstFileWriter(
+ out,
+ 64 * 1024,
+ null,
+ BlockCompressionFactory.create(new CompressOptions("none", 1)));
+ this.manifestSerializer = manifestSerializer;
+ this.keySerializer = new KeySerializer.StringSerializer();
+ }
+
+ /**
+ * Writes a single {@code fileName -> ManifestEntry} mapping. If {@code fileName} was already
+ * written, this call is a no-op (idempotent).
+ */
+ public void write(String fileName, ManifestEntry entry) throws IOException {
+ if (writtenFileNames.contains(fileName)) {
+ return;
+ }
+ writtenFileNames.add(fileName);
+ rowCount++;
+
+ byte[] keyBytes =
+ keySerializer.serialize(org.apache.paimon.data.BinaryString.fromString(fileName));
+ byte[] valueBytes = manifestSerializer.serializeToBytes(entry);
+ writer.put(keyBytes, valueBytes);
+ }
+
+ /**
+ * Finalizes the SST file and returns a single {@link ResultEntry}. Must be called exactly once.
+ */
+ public List finish() throws IOException {
+ if (rowCount == 0) {
+ // Nothing was written – close and return empty (caller should handle this)
+ out.close();
+ return Collections.emptyList();
+ }
+
+ writer.flush();
+ BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter();
+ BlockHandle indexBlockHandle = writer.writeIndexBlock();
+ BTreeFileFooter footer = new BTreeFileFooter(bloomFilterHandle, indexBlockHandle, null);
+ MemorySlice footerEncoding = BTreeFileFooter.writeFooter(footer);
+ writer.writeSlice(footerEncoding);
+ out.close();
+
+ return Collections.singletonList(new ResultEntry(fileName, rowCount, null));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BinaryFileMetaDeduplicationTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BinaryFileMetaDeduplicationTest.java
new file mode 100644
index 000000000000..908949a35889
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BinaryFileMetaDeduplicationTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.paimon.io.DataFileTestUtils.newFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests verifying that {@link BinaryFileMetaIndexReader.EntryIterator} exposes keys correctly
+ * and that the {@link LinkedHashMap}-based deduplication in {@link BTreeWithFileMetaReader}
+ * eliminates duplicate {@code fileName} entries produced by parallel subtasks writing identical
+ * {@code btree_file_meta} SST files.
+ */
+public class BinaryFileMetaDeduplicationTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ /** Write a single {@code btree_file_meta} SST with the given fileName -> entry mappings. */
+ private List writeSst(
+ GlobalIndexFileReadWrite rw,
+ ManifestEntrySerializer serializer,
+ List entries)
+ throws IOException {
+ BinaryFileMetaWriter writer = new BinaryFileMetaWriter(rw, serializer);
+ for (ManifestEntry entry : entries) {
+ writer.write(entry.file().fileName(), entry);
+ }
+ return writer.finish();
+ }
+
+ private GlobalIndexIOMeta toIOMeta(LocalFileIO fileIO, IndexPathFactory factory, String name)
+ throws IOException {
+ Path p = factory.toPath(name);
+ return new GlobalIndexIOMeta(p, fileIO.getFileSize(p), null);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 1: EntryIterator correctly exposes both key and value
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testEntryIteratorExposesKeyAndValue() throws IOException {
+ LocalFileIO fileIO = new LocalFileIO();
+ Path dir = new Path(tempDir.toString());
+ IndexPathFactory factory = new SimpleIndexPathFactory(dir);
+ GlobalIndexFileReadWrite rw = new GlobalIndexFileReadWrite(fileIO, factory);
+ ManifestEntrySerializer serializer = new ManifestEntrySerializer();
+
+ ManifestEntry entry1 = makeEntry("data-file-aaa.parquet");
+ ManifestEntry entry2 = makeEntry("data-file-bbb.parquet");
+
+ List entries = new ArrayList<>();
+ entries.add(entry1);
+ entries.add(entry2);
+ List results = writeSst(rw, serializer, entries);
+ assertThat(results).hasSize(1);
+
+ GlobalIndexIOMeta meta = toIOMeta(fileIO, factory, results.get(0).fileName());
+ Options options = new Options();
+ try (BinaryFileMetaIndexReader reader = new BinaryFileMetaIndexReader(rw, meta, options)) {
+ BinaryFileMetaIndexReader.EntryIterator it = reader.iterator();
+
+ // First entry
+ assertThat(it.hasNext()).isTrue();
+ assertThat(it.nextKey()).isEqualTo("data-file-aaa.parquet");
+ it.nextValue(); // advance
+
+ // Second entry
+ assertThat(it.hasNext()).isTrue();
+ assertThat(it.nextKey()).isEqualTo("data-file-bbb.parquet");
+ it.nextValue();
+
+ assertThat(it.hasNext()).isFalse();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 2: Identical SSTs from N parallel subtasks are deduplicated by fileName
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testDuplicateSstDeduplicationByFileName() throws IOException {
+ LocalFileIO fileIO = new LocalFileIO();
+ Path dir = new Path(tempDir.toString());
+ IndexPathFactory factory = new SimpleIndexPathFactory(dir);
+ GlobalIndexFileReadWrite rw = new GlobalIndexFileReadWrite(fileIO, factory);
+ ManifestEntrySerializer serializer = new ManifestEntrySerializer();
+
+ ManifestEntry e1 = makeEntry("data-file-001.parquet");
+ ManifestEntry e2 = makeEntry("data-file-002.parquet");
+ ManifestEntry e3 = makeEntry("data-file-003.parquet");
+
+ List entries = new ArrayList<>();
+ entries.add(e1);
+ entries.add(e2);
+ entries.add(e3);
+
+ // Simulate 3 parallel subtasks each writing an identical btree_file_meta SST
+ List metas = new ArrayList<>();
+ int parallelism = 3;
+ for (int i = 0; i < parallelism; i++) {
+ List results = writeSst(rw, serializer, entries);
+ assertThat(results).hasSize(1);
+ metas.add(toIOMeta(fileIO, factory, results.get(0).fileName()));
+ }
+ assertThat(metas).hasSize(parallelism);
+
+ // Simulate what BTreeWithFileMetaReader.readAllBinaryFileMetaEntries() does
+ LinkedHashMap seen = new LinkedHashMap<>();
+ Options options = new Options();
+ for (GlobalIndexIOMeta meta : metas) {
+ try (BinaryFileMetaIndexReader reader =
+ new BinaryFileMetaIndexReader(rw, meta, options)) {
+ BinaryFileMetaIndexReader.EntryIterator it = reader.iterator();
+ while (it.hasNext()) {
+ String fileName = it.nextKey();
+ byte[] valueBytes = it.nextValue();
+ seen.putIfAbsent(fileName, valueBytes);
+ }
+ }
+ }
+
+ // Despite 3 SST files each containing the same 3 fileNames, deduplication must yield
+ // exactly 3 distinct ManifestEntry bytes — not 3 * 3 = 9.
+ assertThat(seen).hasSize(3);
+ assertThat(seen.keySet())
+ .containsExactly(
+ "data-file-001.parquet", "data-file-002.parquet", "data-file-003.parquet");
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 3: Partially-overlapping SSTs are correctly merged
+ // -------------------------------------------------------------------------
+
+ @Test
+ public void testPartiallyOverlappingSstsAreMerged() throws IOException {
+ LocalFileIO fileIO = new LocalFileIO();
+ Path dir = new Path(tempDir.toString());
+ IndexPathFactory factory = new SimpleIndexPathFactory(dir);
+ GlobalIndexFileReadWrite rw = new GlobalIndexFileReadWrite(fileIO, factory);
+ ManifestEntrySerializer serializer = new ManifestEntrySerializer();
+
+ // SST A: file-001, file-002
+ List setA = new ArrayList<>();
+ setA.add(makeEntry("data-file-001.parquet"));
+ setA.add(makeEntry("data-file-002.parquet"));
+ List resA = writeSst(rw, serializer, setA);
+
+ // SST B: file-002 (duplicate), file-003 (new)
+ List setB = new ArrayList<>();
+ setB.add(makeEntry("data-file-002.parquet"));
+ setB.add(makeEntry("data-file-003.parquet"));
+ List resB = writeSst(rw, serializer, setB);
+
+ List metas = new ArrayList<>();
+ metas.add(toIOMeta(fileIO, factory, resA.get(0).fileName()));
+ metas.add(toIOMeta(fileIO, factory, resB.get(0).fileName()));
+
+ LinkedHashMap seen = new LinkedHashMap<>();
+ Options options = new Options();
+ for (GlobalIndexIOMeta meta : metas) {
+ try (BinaryFileMetaIndexReader reader =
+ new BinaryFileMetaIndexReader(rw, meta, options)) {
+ BinaryFileMetaIndexReader.EntryIterator it = reader.iterator();
+ while (it.hasNext()) {
+ String fileName = it.nextKey();
+ byte[] valueBytes = it.nextValue();
+ seen.putIfAbsent(fileName, valueBytes);
+ }
+ }
+ }
+
+ // file-002 appears in both SSTs but must appear exactly once
+ assertThat(seen).hasSize(3);
+ assertThat(seen.keySet())
+ .containsExactlyInAnyOrder(
+ "data-file-001.parquet", "data-file-002.parquet", "data-file-003.parquet");
+ }
+
+ // -------------------------------------------------------------------------
+ // Helper: create a minimal ManifestEntry with the given data file name
+ // -------------------------------------------------------------------------
+
+ private static ManifestEntry makeEntry(String dataFileName) {
+ return ManifestEntry.create(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 0, 1, newFile(dataFileName, 0, 0, 99, 0L));
+ }
+
+ // -------------------------------------------------------------------------
+ // Minimal IndexPathFactory backed by a temp directory
+ // -------------------------------------------------------------------------
+
+ private static class SimpleIndexPathFactory implements IndexPathFactory {
+
+ private final Path dir;
+
+ SimpleIndexPathFactory(Path dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(dir, fileName);
+ }
+
+ @Override
+ public Path newPath() {
+ return new Path(dir, UUID.randomUUID().toString() + ".index");
+ }
+
+ @Override
+ public boolean isExternalPath() {
+ return false;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index 35830faac19b..40a9cf3c22a9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -35,19 +35,26 @@
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder;
+import org.apache.paimon.globalindex.btree.BinaryFileMetaWriter;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
@@ -61,6 +68,8 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -84,6 +93,7 @@ public static boolean buildIndex(
PartitionPredicate partitionPredicate,
Options userOptions)
throws Exception {
+ boolean withFileMeta = userOptions.get(BTreeIndexOptions.BTREE_WITH_FILE_META);
DataStream allCommitMessages = null;
for (String indexColumn : indexColumns) {
BTreeGlobalIndexBuilder indexBuilder =
@@ -118,6 +128,7 @@ public static boolean buildIndex(
int indexFieldPos = readType.getFieldIndex(indexColumn);
int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+ DataField indexDataField = table.rowType().getField(indexColumn);
// 3. Calculate maximum parallelism bound
long recordsPerRange = userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
@@ -131,6 +142,9 @@ public static boolean buildIndex(
sortColumns.add(indexColumn);
int partitionFieldSize = table.partitionKeys().size();
BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionFieldSize);
+ ManifestEntrySerializer manifestSerializer =
+ withFileMeta ? new ManifestEntrySerializer() : null;
+
for (Map.Entry>> partitionEntry :
partitionRangeSplits.entrySet()) {
BinaryRow partition = partitionEntry.getKey();
@@ -141,6 +155,34 @@ public static boolean buildIndex(
continue;
}
+ // Pre-serialize ManifestEntries for file-meta index (if withFileMeta is
+ // enabled)
+ List manifestEntryBytesList = null;
+ if (withFileMeta && manifestSerializer != null) {
+ manifestEntryBytesList = new ArrayList<>();
+ for (Split split : rangeSplits) {
+ DataSplit dataSplit =
+ split instanceof IndexedSplit
+ ? ((IndexedSplit) split).dataSplit()
+ : (split instanceof DataSplit
+ ? (DataSplit) split
+ : null);
+ if (dataSplit == null) {
+ continue;
+ }
+ for (org.apache.paimon.io.DataFileMeta file : dataSplit.dataFiles()) {
+ ManifestEntry me =
+ ManifestEntry.create(
+ FileKind.ADD,
+ partition,
+ dataSplit.bucket(),
+ dataSplit.totalBuckets(),
+ file);
+ manifestEntryBytesList.add(manifestSerializer.serializeToBytes(me));
+ }
+ }
+ }
+
DataStream commitMessages =
executeForPartitionRange(
env,
@@ -148,6 +190,9 @@ public static boolean buildIndex(
rangeSplits,
readBuilder,
indexBuilder,
+ withFileMeta
+ ? new BTreeWithFileMetaBuilder(table, indexDataField)
+ : null,
partitionFieldSize,
binaryRowSerializer.serializeToBytes(partition),
indexFieldPos,
@@ -157,7 +202,8 @@ public static boolean buildIndex(
coreOptions,
readType,
recordsPerRange,
- maxParallelism);
+ maxParallelism,
+ manifestEntryBytesList);
allCommitMessages =
allCommitMessages == null
@@ -197,6 +243,7 @@ protected static DataStream executeForPartitionRange(
List rangeSplits,
ReadBuilder readBuilder,
BTreeGlobalIndexBuilder indexBuilder,
+ @Nullable BTreeWithFileMetaBuilder fileMetaBuilder,
int partitionFieldSize,
byte[] partition,
int indexFieldPos,
@@ -206,7 +253,8 @@ protected static DataStream executeForPartitionRange(
CoreOptions coreOptions,
RowType readType,
long recordsPerRange,
- int maxParallelism) {
+ int maxParallelism,
+ @Nullable List manifestEntryBytesList) {
int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
parallelism = Math.min(parallelism, maxParallelism);
@@ -246,9 +294,11 @@ protected static DataStream executeForPartitionRange(
partitionFieldSize,
partition,
indexBuilder,
+ fileMetaBuilder,
indexFieldPos,
rowIdPos,
- indexFieldType))
+ indexFieldType,
+ manifestEntryBytesList))
.setParallelism(parallelism);
}
@@ -306,13 +356,20 @@ private static class WriteIndexOperator extends BoundedOneInputOperator manifestEntryBytesList;
private transient long counter;
private transient GlobalIndexParallelWriter currentWriter;
- private transient List commitMessages;
+ /** Accumulated key index result batches; flushed as CommitMessages in endInput(). */
+ private transient List> pendingKeyIndexBatches;
+
private transient InternalRow.FieldGetter indexFieldGetter;
private transient BinaryRowSerializer binaryRowSerializer;
@@ -321,22 +378,26 @@ public WriteIndexOperator(
int partitionFieldSize,
byte[] partition,
BTreeGlobalIndexBuilder builder,
+ @Nullable BTreeWithFileMetaBuilder fileMetaBuilder,
int indexFieldPos,
int rowIdPos,
- DataType indexFieldType) {
+ DataType indexFieldType,
+ @Nullable List manifestEntryBytesList) {
this.rowRange = rowRange;
this.partitionFieldSize = partitionFieldSize;
this.partition = partition;
this.builder = builder;
+ this.fileMetaBuilder = fileMetaBuilder;
this.indexFieldPos = indexFieldPos;
this.rowIdPos = rowIdPos;
this.indexFieldType = indexFieldType;
+ this.manifestEntryBytesList = manifestEntryBytesList;
}
@Override
public void open() throws Exception {
super.open();
- commitMessages = new ArrayList<>();
+ pendingKeyIndexBatches = new ArrayList<>();
indexFieldGetter = InternalRow.createFieldGetter(indexFieldType, indexFieldPos);
this.binaryRowSerializer = new BinaryRowSerializer(partitionFieldSize);
}
@@ -345,11 +406,7 @@ public void open() throws Exception {
public void processElement(StreamRecord element) throws IOException {
InternalRow row = new FlinkRowWrapper(element.getValue());
if (currentWriter != null && counter >= builder.recordsPerRange()) {
- commitMessages.add(
- builder.flushIndex(
- rowRange,
- currentWriter.finish(),
- binaryRowSerializer.deserializeFromBytes(partition)));
+ pendingKeyIndexBatches.add(currentWriter.finish());
currentWriter = null;
counter = 0;
}
@@ -366,19 +423,59 @@ public void processElement(StreamRecord element) throws IOException {
@Override
public void endInput() throws IOException {
- if (counter > 0) {
- commitMessages.add(
- builder.flushIndex(
- rowRange,
- currentWriter.finish(),
- binaryRowSerializer.deserializeFromBytes(partition)));
+ if (counter > 0 && currentWriter != null) {
+ pendingKeyIndexBatches.add(currentWriter.finish());
+ }
+
+ if (!pendingKeyIndexBatches.isEmpty()) {
+ BinaryRow partitionRow = binaryRowSerializer.deserializeFromBytes(partition);
+ if (fileMetaBuilder != null) {
+ // Build file-meta index exactly once for the entire range handled by this
+ // instance. Only the first key index batch carries file-meta entries;
+ // subsequent batches carry none (avoiding duplicate file-meta references in
+ // the index manifest).
+ List binaryFileMetaEntries = buildBinaryFileMetaEntries();
+ for (int i = 0; i < pendingKeyIndexBatches.size(); i++) {
+ List keyIndexEntries = pendingKeyIndexBatches.get(i);
+ List binaryFileMetaForThisBatch =
+ i == 0 ? binaryFileMetaEntries : new ArrayList<>();
+ output.collect(
+ new StreamRecord<>(
+ new Committable(
+ BatchWriteBuilder.COMMIT_IDENTIFIER,
+ fileMetaBuilder.flushIndex(
+ rowRange,
+ keyIndexEntries,
+ binaryFileMetaForThisBatch,
+ partitionRow))));
+ }
+ } else {
+ for (List keyIndexEntries : pendingKeyIndexBatches) {
+ output.collect(
+ new StreamRecord<>(
+ new Committable(
+ BatchWriteBuilder.COMMIT_IDENTIFIER,
+ builder.flushIndex(
+ rowRange, keyIndexEntries, partitionRow))));
+ }
+ }
+ }
+
+ pendingKeyIndexBatches.clear();
+ }
+
+ private List buildBinaryFileMetaEntries() throws IOException {
+ if (manifestEntryBytesList == null || manifestEntryBytesList.isEmpty()) {
+ return new ArrayList<>();
}
- for (CommitMessage message : commitMessages) {
- output.collect(
- new StreamRecord<>(
- new Committable(BatchWriteBuilder.COMMIT_IDENTIFIER, message)));
+ ManifestEntrySerializer serializer = new ManifestEntrySerializer();
+ BinaryFileMetaWriter binaryFileMetaWriter =
+ fileMetaBuilder.createBinaryFileMetaWriter();
+ for (byte[] entryBytes : manifestEntryBytesList) {
+ ManifestEntry entry = serializer.deserializeFromBytes(entryBytes);
+ binaryFileMetaWriter.write(entry.file().fileName(), entry);
}
- commitMessages.clear();
+ return binaryFileMetaWriter.finish();
}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index d6368da23f01..b77f8f32f059 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -19,6 +19,8 @@
package org.apache.paimon.flink;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
+import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.FileStoreTable;
@@ -135,6 +137,55 @@ private void insertPartitionRows(
}
}
+ @Test
+ public void testBTreeIndexWithFileMeta() throws Catalog.TableNotExistException {
+ sql(
+ "CREATE TABLE T_FM (id INT, name STRING) WITH ("
+ + "'global-index.enabled' = 'true', "
+ + "'row-tracking.enabled' = 'true', "
+ + "'data-evolution.enabled' = 'true'"
+ + ")");
+ String values =
+ IntStream.range(0, 1_000)
+ .mapToObj(i -> String.format("(%s, %s)", i, "'name_" + i + "'"))
+ .collect(Collectors.joining(","));
+ sql("INSERT INTO T_FM VALUES " + values);
+ sql(
+ "CALL sys.create_global_index(`table` => 'default.T_FM', index_column => 'name',"
+ + " index_type => 'btree', options => 'index.with-file-meta=true')");
+
+ FileStoreTable table = paimonTable("T_FM");
+ List allEntries = table.store().newIndexFileHandler().scanEntries();
+
+ // assert key-index (btree) entries exist
+ List keyIndexEntries =
+ allEntries.stream()
+ .map(IndexManifestEntry::indexFile)
+ .filter(f -> BTreeGlobalIndexerFactory.IDENTIFIER.equals(f.indexType()))
+ .collect(Collectors.toList());
+ assertThat(keyIndexEntries).isNotEmpty();
+
+ // assert file-meta index (btree_file_meta) entries exist
+ List fileMetaEntries =
+ allEntries.stream()
+ .map(IndexManifestEntry::indexFile)
+ .filter(
+ f ->
+ BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META.equals(
+ f.indexType()))
+ .collect(Collectors.toList());
+ assertThat(fileMetaEntries).isNotEmpty();
+
+ long totalRowCount = keyIndexEntries.stream().mapToLong(IndexFileMeta::rowCount).sum();
+ assertThat(totalRowCount).isEqualTo(1000L);
+
+ // assert query returns correct results via manifest-free read path
+ assertThat(sql("SELECT * FROM T_FM WHERE name = 'name_100'"))
+ .containsOnly(Row.of(100, "name_100"));
+ assertThat(sql("SELECT * FROM T_FM WHERE name = 'name_999'"))
+ .containsOnly(Row.of(999, "name_999"));
+ }
+
private void buildBTreeIndexForTable(String tableName, String indexColumn) {
sql(
"CALL sys.create_global_index(`table` => 'default.%s', index_column => '%s', index_type => 'btree')",
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index 5f7bfa453e5e..7114583818c7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -21,8 +21,17 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.globalindex.btree.BTreeWithFileMetaBuilder;
+import org.apache.paimon.globalindex.btree.BinaryFileMetaWriter;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.spark.SparkRow;
@@ -50,6 +59,8 @@
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.functions;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -107,6 +118,7 @@ public List buildIndex(
List selectedColumns = new ArrayList<>(readType.getFieldNames());
// Calculate maximum parallelism bound
+ boolean withFileMeta = options.get(BTreeIndexOptions.BTREE_WITH_FILE_META);
long recordsPerRange = options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
int maxParallelism = options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
@@ -115,6 +127,8 @@ public List buildIndex(
sortColumns.add(indexField.name());
final int partitionKeyNum = table.partitionKeys().size();
BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionKeyNum);
+ ManifestEntrySerializer manifestSerializer =
+ withFileMeta ? new ManifestEntrySerializer() : null;
for (Map.Entry>> partitionEntry :
partitionRangeSplits.entrySet()) {
for (Map.Entry> entry : partitionEntry.getValue().entrySet()) {
@@ -126,6 +140,31 @@ public List buildIndex(
int partitionNum = Math.max((int) (range.count() / recordsPerRange), 1);
partitionNum = Math.min(partitionNum, maxParallelism);
+ // Pre-serialize ManifestEntries for file-meta index (if withFileMeta is enabled)
+ List manifestEntryBytesList = null;
+ if (withFileMeta && manifestSerializer != null) {
+ manifestEntryBytesList = new ArrayList<>();
+ for (Split split : rangeSplits) {
+ DataSplit dataSplit =
+ split instanceof IndexedSplit
+ ? ((IndexedSplit) split).dataSplit()
+ : (split instanceof DataSplit ? (DataSplit) split : null);
+ if (dataSplit == null) {
+ continue;
+ }
+ for (DataFileMeta file : dataSplit.dataFiles()) {
+ ManifestEntry me =
+ ManifestEntry.create(
+ FileKind.ADD,
+ partitionEntry.getKey(),
+ dataSplit.bucket(),
+ dataSplit.totalBuckets(),
+ file);
+ manifestEntryBytesList.add(manifestSerializer.serializeToBytes(me));
+ }
+ }
+ }
+
Dataset source =
PaimonUtils.createDataset(
spark,
@@ -148,6 +187,17 @@ public List buildIndex(
final byte[] serializedBuilder = InstantiationUtil.serializeObject(indexBuilder);
final byte[] partitionBytes =
binaryRowSerializer.serializeToBytes(partitionEntry.getKey());
+ final byte[] serializedFileMetaBuilder =
+ withFileMeta
+ ? InstantiationUtil.serializeObject(
+ new BTreeWithFileMetaBuilder(table, indexField))
+ : null;
+ final byte[] serializedManifestEntries =
+ (withFileMeta
+ && manifestEntryBytesList != null
+ && !manifestEntryBytesList.isEmpty())
+ ? InstantiationUtil.serializeObject(manifestEntryBytesList)
+ : null;
JavaRDD written =
partitioned
.javaRDD()
@@ -160,7 +210,9 @@ public List buildIndex(
serializedBuilder,
range,
partitionKeyNum,
- partitionBytes));
+ partitionBytes,
+ serializedFileMetaBuilder,
+ serializedManifestEntries));
List commitBytes = written.collect();
allMessages.addAll(CommitMessageSerializer.deserializeAll(commitBytes));
}
@@ -173,15 +225,98 @@ private static Iterator buildBTreeIndex(
byte[] serializedBuilder,
Range range,
int partitionKeyNum,
- byte[] partitionBytes)
+ byte[] partitionBytes,
+ @Nullable byte[] serializedFileMetaBuilder,
+ @Nullable byte[] serializedManifestEntries)
throws IOException, ClassNotFoundException {
final BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionKeyNum);
BinaryRow partition = binaryRowSerializer.deserializeFromBytes(partitionBytes);
BTreeGlobalIndexBuilder builder =
InstantiationUtil.deserializeObject(
serializedBuilder, BTreeGlobalIndexBuilder.class.getClassLoader());
+
+ if (serializedFileMetaBuilder != null) {
+ BTreeWithFileMetaBuilder fileMetaBuilder =
+ InstantiationUtil.deserializeObject(
+ serializedFileMetaBuilder,
+ BTreeWithFileMetaBuilder.class.getClassLoader());
+ List manifestBytes =
+ serializedManifestEntries != null
+ ? InstantiationUtil.>deserializeObject(
+ serializedManifestEntries,
+ BTreeIndexTopoBuilder.class.getClassLoader())
+ : new ArrayList<>();
+ return CommitMessageSerializer.serializeAll(
+ buildWithFileMeta(
+ builder,
+ fileMetaBuilder,
+ range,
+ partition,
+ input,
+ manifestBytes))
+ .iterator();
+ }
return CommitMessageSerializer.serializeAll(
builder.buildForSinglePartition(range, partition, input))
.iterator();
}
+
+ private static List buildWithFileMeta(
+ BTreeGlobalIndexBuilder builder,
+ BTreeWithFileMetaBuilder fileMetaBuilder,
+ Range range,
+ BinaryRow partition,
+ Iterator data,
+ List manifestEntryBytes)
+ throws IOException {
+ // Build file-meta index entries from pre-serialized manifest entry bytes
+ ManifestEntrySerializer meSerializer = new ManifestEntrySerializer();
+ BinaryFileMetaWriter binaryFileMetaWriter = fileMetaBuilder.createBinaryFileMetaWriter();
+ for (byte[] bytes : manifestEntryBytes) {
+ ManifestEntry entry = meSerializer.deserializeFromBytes(bytes);
+ binaryFileMetaWriter.write(entry.file().fileName(), entry);
+ }
+ List binaryFileMetaEntries = binaryFileMetaWriter.finish();
+
+ // Build key index entries, respecting recordsPerRange for splitting (mirrors Flink logic).
+ // file-meta entries are only attached to the first flush; subsequent batches carry none to
+ // avoid duplicate btree_file_meta SSTs in the index manifest.
+ long counter = 0;
+ boolean firstFlush = true;
+ GlobalIndexParallelWriter currentWriter = null;
+ List messages = new ArrayList<>();
+ InternalRow.FieldGetter indexFieldGetter =
+ InternalRow.createFieldGetter(fileMetaBuilder.getIndexField().type(), 0);
+
+ while (data.hasNext()) {
+ InternalRow row = data.next();
+ if (currentWriter != null && counter >= builder.recordsPerRange()) {
+ messages.add(
+ fileMetaBuilder.flushIndex(
+ range,
+ currentWriter.finish(),
+ firstFlush ? binaryFileMetaEntries : Collections.emptyList(),
+ partition));
+ firstFlush = false;
+ currentWriter = null;
+ counter = 0;
+ }
+ counter++;
+ if (currentWriter == null) {
+ currentWriter = builder.createWriter();
+ }
+ long localRowId = row.getLong(1) - range.from;
+ currentWriter.write(indexFieldGetter.getFieldOrNull(row), localRowId);
+ }
+
+ if (counter > 0) {
+ messages.add(
+ fileMetaBuilder.flushIndex(
+ range,
+ currentWriter.finish(),
+ firstFlush ? binaryFileMetaEntries : Collections.emptyList(),
+ partition));
+ }
+ return messages;
+ }
}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index b45475ff0c82..507ab050a2f5 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.procedure
-import org.apache.paimon.globalindex.btree.{BTreeIndexMeta, KeySerializer}
+import org.apache.paimon.globalindex.btree.{BTreeGlobalIndexerFactory, BTreeIndexMeta, BTreeWithFileMetaBuilder, KeySerializer}
import org.apache.paimon.memory.MemorySlice
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.types.VarCharType
@@ -346,6 +346,117 @@ class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest
}
}
+ test("create btree global index with file meta") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true',
+ | 'btree-index.records-per-range' = '1000')
+ |""".stripMargin)
+
+ val values =
+ (0 until 10000).map(i => s"($i, 'name_$i')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name'," +
+ " index_type => 'btree', options => 'btree-index.records-per-range=1000,index.with-file-meta=true')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val allEntries = table.store().newIndexFileHandler().scanEntries().asScala
+
+ // assert key-index (btree) entries exist with correct row count
+ val keyIndexEntries =
+ allEntries.filter(_.indexFile().indexType() == BTreeGlobalIndexerFactory.IDENTIFIER)
+ assert(keyIndexEntries.nonEmpty)
+ val totalRowCount = keyIndexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 10000L)
+
+ // assert file-meta index (btree_file_meta) entries exist
+ val fileMetaEntries =
+ allEntries.filter(
+ _.indexFile().indexType() == BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META)
+ assert(fileMetaEntries.nonEmpty)
+
+ // assert query returns correct results via manifest-free read path
+ val result100 = spark.sql("SELECT * FROM T WHERE name = 'name_100'").collect()
+ assert(result100.length == 1)
+ assert(result100.head.getInt(0) == 100)
+
+ val result9999 = spark.sql("SELECT * FROM T WHERE name = 'name_9999'").collect()
+ assert(result9999.length == 1)
+ assert(result9999.head.getInt(0) == 9999)
+ }
+ }
+
+ test("create btree global index with file meta and multiple partitions") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values = (0 until 5000).map(i => s"($i, 'name_p0_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 5000).map(i => s"($i, 'name_p1_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name'," +
+ " index_type => 'btree', options => 'btree-index.records-per-range=1000,index.with-file-meta=true')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val allEntries = table.store().newIndexFileHandler().scanEntries().asScala
+
+ // assert key-index entries for both partitions
+ val keyIndexEntries =
+ allEntries.filter(_.indexFile().indexType() == BTreeGlobalIndexerFactory.IDENTIFIER)
+ assert(keyIndexEntries.nonEmpty)
+ val totalRowCount = keyIndexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 10000L)
+
+ val keyIndexByPartition = keyIndexEntries.groupBy(_.partition())
+ assert(keyIndexByPartition.size == 2)
+ keyIndexByPartition.values.foreach {
+ entries => assert(entries.map(_.indexFile().rowCount()).sum == 5000L)
+ }
+
+ // assert file-meta index entries exist
+ assert(
+ allEntries.exists(
+ _.indexFile().indexType() == BTreeWithFileMetaBuilder.INDEX_TYPE_FILE_META))
+
+ // assert queries return correct results
+ val resP0 = spark.sql("SELECT * FROM T WHERE name = 'name_p0_100'").collect()
+ assert(resP0.length == 1)
+ assert(resP0.head.getString(2) == "p0")
+
+ val resP1 = spark.sql("SELECT * FROM T WHERE name = 'name_p1_200'").collect()
+ assert(resP1.length == 1)
+ assert(resP1.head.getString(2) == "p1")
+ }
+ }
+
private def assertMultiplePartitionsResult(
tableName: String,
rowCount: Long,