Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
<tr>
<td><h5>chain-table.chain-partition-keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
</tr>
<tr>
<td><h5>chain-table.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -1187,12 +1193,6 @@
<td>String</td>
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
</tr>
<tr>
<td><h5>chain-table.chain-partition-keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
</tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,4 @@
<td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,14 @@ public class BTreeIndexOptions {
.intType()
.defaultValue(4096)
.withDescription("The max parallelism of Flink/Spark for building BTreeIndex.");

public static final ConfigOption<Boolean> BTREE_WITH_FILE_META =
ConfigOptions.key("index.with-file-meta")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to store file metadata (ManifestEntry) inside the BTree "
+ "global index. When enabled, query planning can "
+ "bypass manifest HDFS/OSS reads entirely by reading data file "
+ "metadata directly from the index.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.paimon.globalindex;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.partition.PartitionPredicate;
Expand All @@ -41,22 +42,29 @@
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RowRangeIndex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.table.SpecialFields.ROW_ID;
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;

/** Scan for data evolution table. */
public class DataEvolutionBatchScan implements DataTableScan {

private static final Logger LOG = LoggerFactory.getLogger(DataEvolutionBatchScan.class);

private final FileStoreTable table;
private final DataTableBatchScan batchScan;

Expand Down Expand Up @@ -236,13 +244,45 @@ public Plan plan() {
ScoreGetter scoreGetter = null;

if (rowRangeIndex == null) {
Optional<GlobalIndexResult> indexResult = evalGlobalIndex();
if (indexResult.isPresent()) {
GlobalIndexResult result = indexResult.get();
if (this.globalIndexResult != null) {
// Pre-set global index result pushed down from outer layer
GlobalIndexResult result = this.globalIndexResult;
rowRangeIndex = RowRangeIndex.create(result.results().toRangeList());
if (result instanceof ScoredGlobalIndexResult) {
scoreGetter = ((ScoredGlobalIndexResult) result).scoreGetter();
}
} else if (filter != null && table.coreOptions().globalIndexEnabled()) {
PartitionPredicate partitionFilter =
batchScan.snapshotReader().manifestsReader().partitionFilter();
Optional<GlobalIndexScanner> optScanner =
GlobalIndexScanner.create(table, partitionFilter, filter);
if (optScanner.isPresent()) {
try (GlobalIndexScanner scanner = optScanner.get()) {
// Fast path: btree_file_meta (zero manifest reads)
if (scanner.hasFilePathIndex()) {
Optional<FilePathGlobalIndexResult> fp =
scanner.scanWithFilePath(filter);
if (fp.isPresent()) {
Plan fastPlan = buildPlanFromFilePathIndex(fp.get());
if (fastPlan != null) {
return fastPlan;
}
// Stale index detected; fall through to regular btree or batchScan
}
}
// Regular btree fallback (same scanner, no extra IO)
Optional<GlobalIndexResult> idx = scanner.scan(filter);
if (idx.isPresent()) {
GlobalIndexResult result = idx.get();
rowRangeIndex = RowRangeIndex.create(result.results().toRangeList());
if (result instanceof ScoredGlobalIndexResult) {
scoreGetter = ((ScoredGlobalIndexResult) result).scoreGetter();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

Expand All @@ -254,30 +294,120 @@ public Plan plan() {
return wrapToIndexSplits(splits, rowRangeIndex, scoreGetter);
}

private Optional<GlobalIndexResult> evalGlobalIndex() {
if (this.globalIndexResult != null) {
return Optional.of(globalIndexResult);
/**
* Builds a query plan using the pre-fetched {@link ManifestEntry} rows stored in
* btree_file_meta index. This method does NOT call {@link #batchScan} at all, so no manifest
* HDFS reads occur.
*
* <p>Returns {@code null} if the index is stale (i.e., one or more referenced data files no
* longer exist, typically due to compaction after the index was built). The caller should fall
* back to a regular manifest scan in that case.
*/
@Nullable
private Plan buildPlanFromFilePathIndex(FilePathGlobalIndexResult result) throws IOException {
ManifestEntrySerializer serializer = new ManifestEntrySerializer();

List<ManifestEntry> entries = new ArrayList<>();
for (byte[] bytes : result.manifestEntryBytes()) {
entries.add(serializer.deserializeFromBytes(bytes));
}

// Build rowRangeIndex once; use it for both planning-time file pruning and
// reading-time row-level filtering (IndexedSplit).
RowRangeIndex rowRangeIndex = null;
if (result.hasRowLevelFilter()) {
rowRangeIndex = RowRangeIndex.create(result.rowIndexResult().results().toRangeList());
// Planning-time file pruning: discard files whose row range has no intersection
// with the matched row IDs, equivalent to what batchScan does during manifest scan.
final RowRangeIndex index = rowRangeIndex;
entries =
entries.stream()
.filter(
e -> {
long from = e.file().nonNullFirstRowId();
long to = from + e.file().rowCount() - 1;
return !index.intersectedRanges(from, to).isEmpty();
})
.collect(Collectors.toList());
}

// Guard against stale index caused by compaction after the index was built.
// For typical point/narrow queries this is O(1~3) fileIO.exists() calls.
for (ManifestEntry entry : entries) {
if (!table.fileIO()
.exists(
table.store()
.pathFactory()
.createDataFilePathFactory(entry.partition(), entry.bucket())
.toPath(entry.file()))) {
LOG.warn(
"btree_file_meta index is stale: file {} no longer exists "
+ "(possibly removed by compaction). Falling back to manifest scan.",
entry.file().fileName());
return null;
}
}
if (filter == null) {
return Optional.empty();

List<Split> splits = buildSplitsFromEntries(entries);
if (splits.isEmpty()) {
return Collections::emptyList;
}
CoreOptions options = table.coreOptions();
if (!options.globalIndexEnabled()) {
return Optional.empty();

if (rowRangeIndex != null) {
return wrapToIndexSplits(splits, rowRangeIndex, null);
}
PartitionPredicate partitionFilter =
batchScan.snapshotReader().manifestsReader().partitionFilter();
Optional<GlobalIndexScanner> optionalScanner =
GlobalIndexScanner.create(table, partitionFilter, filter);
if (!optionalScanner.isPresent()) {
return Optional.empty();
return () -> splits;
}

/**
* Reconstructs {@link DataSplit} objects from a list of {@link ManifestEntry} instances.
*
* <p>Groups entries by {@code partition + bucket}, then builds one {@link DataSplit} per group.
* Uses {@link BinaryRow} directly as map key so that grouping relies on its content-based
* {@code equals}/{@code hashCode}, which is collision-free.
*/
private List<Split> buildSplitsFromEntries(List<ManifestEntry> entries) {
// partition -> bucket -> files
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> grouped = new HashMap<>();
Map<BinaryRow, Map<Integer, ManifestEntry>> representative = new HashMap<>();

for (ManifestEntry entry : entries) {
grouped.computeIfAbsent(entry.partition(), k -> new HashMap<>())
.computeIfAbsent(entry.bucket(), k -> new ArrayList<>())
.add(entry.file());
representative
.computeIfAbsent(entry.partition(), k -> new HashMap<>())
.put(entry.bucket(), entry);
}

try (GlobalIndexScanner scanner = optionalScanner.get()) {
return scanner.scan(filter);
} catch (IOException e) {
throw new RuntimeException(e);
List<Split> splits = new ArrayList<>();
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> partEntry :
grouped.entrySet()) {
for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
partEntry.getValue().entrySet()) {
ManifestEntry rep =
representative.get(partEntry.getKey()).get(bucketEntry.getKey());
List<DataFileMeta> files = bucketEntry.getValue();
// Sort by firstRowId for consistent ordering
files.sort((a, b) -> Long.compare(a.nonNullFirstRowId(), b.nonNullFirstRowId()));

Long latestSnapshotId =
batchScan.snapshotReader().snapshotManager().latestSnapshotId();
DataSplit split =
DataSplit.builder()
.withSnapshot(latestSnapshotId != null ? latestSnapshotId : -1L)
.withPartition(rep.partition())
.withBucket(rep.bucket())
.withBucketPath("")
.withTotalBuckets(rep.totalBuckets())
.isStreaming(false)
.rawConvertible(false)
.withDataFiles(files)
.build();
splits.add(split);
}
}
return splits;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.globalindex;

import javax.annotation.Nullable;

import java.util.List;

/**
* Result of a BTree-with-file-meta global index query. Contains pre-fetched {@link
* org.apache.paimon.manifest.ManifestEntry} bytes (from the btree_file_meta index) that can be used
* to build a query plan without reading manifest files from HDFS, plus an optional row-level {@link
* GlobalIndexResult} (from the btree key index) for fine-grained row filtering inside matching
* files.
*/
public class FilePathGlobalIndexResult {

private final List<byte[]> manifestEntryBytes;
@Nullable private final GlobalIndexResult rowIndexResult;

public FilePathGlobalIndexResult(
List<byte[]> manifestEntryBytes, @Nullable GlobalIndexResult rowIndexResult) {
this.manifestEntryBytes = manifestEntryBytes;
this.rowIndexResult = rowIndexResult;
}

/**
* Raw serialized {@link org.apache.paimon.manifest.ManifestEntry} bytes from btree_file_meta
* index.
*/
public List<byte[]> manifestEntryBytes() {
return manifestEntryBytes;
}

/** Whether row-level filtering is available via the btree key index. */
public boolean hasRowLevelFilter() {
return rowIndexResult != null;
}

/** The row-level index result (btree key index), or {@code null} if not available. */
@Nullable
public GlobalIndexResult rowIndexResult() {
return rowIndexResult;
}
}
Loading
Loading