diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index b6154fa477..ad882e497d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1319,6 +1319,16 @@ public class ConfigOptions { + KV_SCANNER_MAX_BATCH_SIZE.key() + "'."); + public static final ConfigOption CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED = + key("client.scanner.kv.server-side.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Master switch for using the server-side KV scan in bounded reads " + + "of primary-key tables when no KV snapshot file is available. When " + + "false (default), bounded primary-key reads fall back to the prior " + + "behavior (log-only when lake is enabled, or fail when lake is disabled)."); + public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE = key("client.lookup.queue-size") .intType() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index b0bba81a7e..631c6d8176 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -435,9 +435,18 @@ public boolean isBounded() { + modificationScanType + " statement with conditions on primary key."); } - if (!isDataLakeEnabled) { + if (!isDataLakeEnabled + && !(hasPrimaryKey() + && flussConfig.get( + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) { throw new UnsupportedOperationException( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } return source; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java index ba4c9d0131..b400880263 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java @@ -87,6 +87,8 @@ public void emitRecord( .asLogSplitState() .setNextOffset(recordAndPosition.record().logOffset() + 1); } + } else if (splitState.isKvBatchSplitState()) { + processAndEmitRecord(recordAndPosition.record(), sourceOutput); } else if (splitState.isLakeSplit()) { if (lakeRecordRecordEmitter == null) { lakeRecordRecordEmitter = new LakeRecordRecordEmitter<>(this::processAndEmitRecord); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 0b7e98b395..265cc0ab77 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -39,6 +39,7 @@ import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.state.SourceEnumeratorState; @@ -469,6 +470,7 @@ public void start() { } private void startInBatchMode() { + boolean kvBatchEnabled = flussConf.get(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED); if (lakeEnabled) { if (lakeSource == null) { throw new IllegalStateException( @@ -477,39 +479,65 @@ private void startInBatchMode() { context.callAsync( () -> { List splits = generateHybridLakeFlussSplits(); - // No lake snapshot exists, fall back to Fluss-only splits if (splits == null) { LOG.info( "No lake snapshot found for table {}," + " falling back to Fluss-only splits.", tablePath); - if (isPartitioned) { - Set partitionInfos = listPartitions(); - Collection partitions = - partitionInfos.stream() - .map( - p -> - new Partition( - p.getPartitionId(), - p.getPartitionName())) - .collect(Collectors.toList()); - // Use log-only splits to avoid generating mixed split - // types (HybridSnapshotLogSplit + LogSplit) for - // primary-key tables, which is not supported. - splits = this.initLogTablePartitionSplits(partitions); - } else { - splits = this.getLogSplit(null, null); - } + splits = generateFlussOnlyBatchSplits(kvBatchEnabled); } return splits; }, this::handleSplitsAdd); + } else if (kvBatchEnabled && hasPrimaryKey) { + context.callAsync(() -> generateFlussOnlyBatchSplits(true), this::handleSplitsAdd); } else { throw new UnsupportedOperationException( String.format( - "Batch only supports when table option '%s' is set to true.", - ConfigOptions.TABLE_DATALAKE_ENABLED)); + "Batch mode requires either '%s' = 'true' (data-lake integration) " + + "or '%s' = 'true' (server-side KV scan, primary-key tables only).", + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key())); + } + } + + private List generateFlussOnlyBatchSplits(boolean kvBatchEnabled) { + if (kvBatchEnabled && hasPrimaryKey) { + if (isPartitioned) { + Set partitionInfos = listPartitions(); + List splits = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + splits.addAll( + buildKvBatchSplits( + partitionInfo.getPartitionId(), + partitionInfo.getPartitionName())); + } + return splits; + } + return buildKvBatchSplits(null, null); + } + if (isPartitioned) { + Set partitionInfos = listPartitions(); + Collection partitions = + partitionInfos.stream() + .map(p -> new Partition(p.getPartitionId(), p.getPartitionName())) + .collect(Collectors.toList()); + return this.initLogTablePartitionSplits(partitions); } + return this.getLogSplit(null, null); + } + + private List buildKvBatchSplits( + @Nullable Long partitionId, @Nullable String partitionName) { + List splits = new ArrayList<>(); + for (int bucketId = 0; bucketId < tableInfo.getNumBuckets(); bucketId++) { + TableBucket tb = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + if (ignoreTableBucket(tb)) { + continue; + } + splits.add(new KvBatchSplit(tb, partitionName)); + } + return splits; } private void startInStreamModeForNonPartitionedTable() { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java index 68a3489bd8..65ef6e75ee 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java @@ -28,6 +28,7 @@ import org.apache.fluss.flink.source.reader.fetcher.FlinkSourceFetcherManager; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplitState; +import org.apache.fluss.flink.source.split.KvBatchSplitState; import org.apache.fluss.flink.source.split.LogSplitState; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.split.SourceSplitState; @@ -163,6 +164,8 @@ protected SourceSplitState initializedState(SourceSplitBase split) { return new HybridSnapshotLogSplitState(split.asHybridSnapshotLogSplit()); } else if (split.isLogSplit()) { return new LogSplitState(split.asLogSplit()); + } else if (split.isKvBatchSplit()) { + return new KvBatchSplitState(split.asKvBatchSplit()); } else if (split.isLakeSplit()) { return LakeSplitStateInitializer.initializedState(split); } else { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 90f2b6e9a6..363b139bb7 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -216,6 +216,8 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { subscribeLog(sourceSplitBase, hybridSnapshotLogSplit.getLogStartingOffset()); } else if (sourceSplitBase.isLogSplit()) { subscribeLog(sourceSplitBase, sourceSplitBase.asLogSplit().getStartingOffset()); + } else if (sourceSplitBase.isKvBatchSplit()) { + boundedSplits.add(sourceSplitBase); } else if (sourceSplitBase.isLakeSplit()) { getLakeSplitReader().addSplit(sourceSplitBase, boundedSplits); if (sourceSplitBase instanceof LakeSnapshotAndFlussLogSplit) { @@ -412,6 +414,12 @@ private void checkSnapshotSplitOrStartNext() { snapshotSplit.getTableBucket(), snapshotSplit.getSnapshotId()); currentBoundedSplitReader = new BoundedSplitReader(batchScanner, snapshotSplit.recordsToSkip()); + } else if (currentBoundedSplit.isKvBatchSplit()) { + BatchScanner batchScanner = + table.newScan() + .project(projectedFields) + .createBatchScanner(currentBoundedSplit.getTableBucket()); + currentBoundedSplitReader = new BoundedSplitReader(batchScanner, 0); } else if (currentBoundedSplit.isLakeSplit()) { currentBoundedSplitReader = getLakeSplitReader().getBoundedSplitScanner(currentBoundedSplit); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java new file mode 100644 index 0000000000..0e8f7e309c --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.split; + +import org.apache.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +/** + * A bounded split that reads the full primary-key state of a bucket via the server-side KV scan + * (FIP-17). Emitted by the enumerator when a primary-key table has no KV snapshot file available + * for the bucket and the source is running in bounded mode. + * + *

This split has no resumable position: on Flink task restart the bucket is rescanned from + * scratch. Snapshot isolation is provided by the server (a consistent point-in-time view of the + * RocksDB state at the moment {@code ScanKv} opens the scan), but the client cannot resume an + * expired or invalidated session, so progress is not checkpointed. + * + *

Unlike {@link HybridSnapshotLogSplit}, this split has no log handoff phase: when the bucket is + * drained the split is marked finished. + */ +public class KvBatchSplit extends SourceSplitBase { + + private static final String KV_BATCH_SPLIT_PREFIX = "kv-batch-"; + + public KvBatchSplit(TableBucket tableBucket, @Nullable String partitionName) { + super(tableBucket, partitionName); + } + + @Override + public String splitId() { + return toSplitId(KV_BATCH_SPLIT_PREFIX, tableBucket); + } + + @Override + protected byte splitKind() { + return KV_BATCH_SPLIT_FLAG; + } + + @Override + public String toString() { + return "KvBatchSplit{" + + "tableBucket=" + + tableBucket + + ", partitionName='" + + partitionName + + '\'' + + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java new file mode 100644 index 0000000000..e83fe9ad28 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.split; + +/** State of {@link KvBatchSplit}. Has no resumable position. */ +public class KvBatchSplitState extends SourceSplitState { + + public KvBatchSplitState(KvBatchSplit split) { + super(split); + } + + @Override + public KvBatchSplit toSourceSplit() { + return (KvBatchSplit) split; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java index 6240b4feac..d74b1defef 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java @@ -25,11 +25,12 @@ import java.util.Objects; -/** A base source split for {@link SnapshotSplit} and {@link LogSplit}. */ +/** A base source split for {@link SnapshotSplit}, {@link LogSplit} and {@link KvBatchSplit}. */ public abstract class SourceSplitBase implements SourceSplit { public static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1; public static final byte LOG_SPLIT_FLAG = 2; + public static final byte KV_BATCH_SPLIT_FLAG = 3; protected final TableBucket tableBucket; @@ -81,6 +82,11 @@ public final boolean isHybridSnapshotLogSplit() { return getClass() == HybridSnapshotLogSplit.class; } + /** Checks whether this split is a {@link KvBatchSplit}. */ + public final boolean isKvBatchSplit() { + return getClass() == KvBatchSplit.class; + } + /** Casts this split into a {@link HybridSnapshotLogSplit}. */ public final HybridSnapshotLogSplit asHybridSnapshotLogSplit() { return (HybridSnapshotLogSplit) this; @@ -91,11 +97,18 @@ public final LogSplit asLogSplit() { return (LogSplit) this; } + /** Casts this split into a {@link KvBatchSplit}. */ + public final KvBatchSplit asKvBatchSplit() { + return (KvBatchSplit) this; + } + protected byte splitKind() { if (isHybridSnapshotLogSplit()) { return HYBRID_SNAPSHOT_SPLIT_FLAG; } else if (isLogSplit()) { return LOG_SPLIT_FLAG; + } else if (isKvBatchSplit()) { + return KV_BATCH_SPLIT_FLAG; } else { throw new IllegalArgumentException("Unsupported split kind for " + getClass()); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java index 7ee92cef9e..ff7b5364c6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java @@ -40,9 +40,6 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1; - private static final byte LOG_SPLIT_FLAG = 2; - private static final int CURRENT_VERSION = VERSION_0; @Nullable private final LakeSource lakeSource; @@ -75,12 +72,14 @@ public byte[] serialize(SourceSplitBase split) throws IOException { out.writeBoolean(hybridSnapshotLogSplit.isSnapshotFinished()); // write log starting offset out.writeLong(hybridSnapshotLogSplit.getLogStartingOffset()); - } else { + } else if (split.isLogSplit()) { LogSplit logSplit = split.asLogSplit(); // write starting offset out.writeLong(logSplit.getStartingOffset()); // write stopping offset out.writeLong(logSplit.getStoppingOffset().orElse(LogSplit.NO_STOPPING_OFFSET)); + } else { + // KvBatchSplit has no extra fields to serialize beyond the common header. } } else { LakeSplitSerializer lakeSplitSerializer = @@ -128,7 +127,7 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce int bucketId = in.readInt(); TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); - if (splitKind == HYBRID_SNAPSHOT_SPLIT_FLAG) { + if (splitKind == SourceSplitBase.HYBRID_SNAPSHOT_SPLIT_FLAG) { long snapshotId = in.readLong(); long recordsToSkip = in.readLong(); boolean isSnapshotFinished = in.readBoolean(); @@ -140,10 +139,12 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce recordsToSkip, isSnapshotFinished, logStartingOffset); - } else if (splitKind == LOG_SPLIT_FLAG) { + } else if (splitKind == SourceSplitBase.LOG_SPLIT_FLAG) { long startingOffset = in.readLong(); long stoppingOffset = in.readLong(); return new LogSplit(tableBucket, partitionName, startingOffset, stoppingOffset); + } else if (splitKind == SourceSplitBase.KV_BATCH_SPLIT_FLAG) { + return new KvBatchSplit(tableBucket, partitionName); } else { LakeSplitSerializer lakeSplitSerializer = new LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer()); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java index 32f93491e9..dbc85821a0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java @@ -46,6 +46,14 @@ public final LogSplitState asLogSplitState() { return (LogSplitState) this; } + public final boolean isKvBatchSplitState() { + return getClass() == KvBatchSplitState.class; + } + + public final KvBatchSplitState asKvBatchSplitState() { + return (KvBatchSplitState) this; + } + public abstract SourceSplitBase toSourceSplit(); public boolean isLakeSplit() { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 51d05fa036..9cbe0fd538 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -20,6 +20,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TablePath; @@ -40,6 +41,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -49,6 +51,7 @@ import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectBatchRows; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; @@ -250,8 +253,151 @@ void testScanSingleRowFilterException() throws Exception { assertThatThrownBy(() -> tEnv.explainSql(query)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage( - "Currently, Fluss only support queries on table with datalake enabled" - + " or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); + } + + @Test + void testKvBatchScanOnPkTable() throws Exception { + String tableName = String.format("test_kv_batch_pk_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " address varchar," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '4'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 1; i <= 5; i++) { + upsertWriter.upsert(row(i, "address" + i, "name" + i)); + } + upsertWriter.flush(); + } + + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT * FROM %s", tableName)).collect(); + List expected = + Arrays.asList( + "+I[1, address1, name1]", + "+I[2, address2, name2]", + "+I[3, address3, name3]", + "+I[4, address4, name4]", + "+I[5, address5, name5]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testKvBatchScanReturnsAllRecords() throws Exception { + String tableName = String.format("test_kv_batch_100_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '3'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 1; i <= 100; i++) { + upsertWriter.upsert(row(i, "name" + i)); + } + upsertWriter.flush(); + } + + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT * FROM %s", tableName)).collect(); + List actual = collectRowsWithTimeout(collected, 100); + + List expected = new ArrayList<>(); + for (int i = 1; i <= 100; i++) { + expected.add(String.format("+I[%d, name%d]", i, i)); + } + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + void testKvBatchScanOnLogTableThrows() throws Exception { + String tableName = String.format("test_kv_batch_log_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar)" + + " with (" + + " 'bucket.num' = '2'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + // The KV scan option is meaningless for log tables; batch mode must still reject them. + assertThatThrownBy(() -> tEnv.executeSql(String.format("SELECT * FROM %s", tableName))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Batch mode requires either datalake enabled") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) + .hasMessageContaining(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key()); + } + + @Test + void testKvBatchScanWithProjection() throws Exception { + String tableName = String.format("test_kv_batch_proj_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar," + + " region varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '3'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + upsertWriter.upsert(row(1, "Alice", "us-east")); + upsertWriter.upsert(row(2, "Bob", "eu-west")); + upsertWriter.upsert(row(3, "Carol", "ap-south")); + upsertWriter.flush(); + } + + // Only project two of the three columns. + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT id, name FROM %s", tableName)).collect(); + List expected = Arrays.asList("+I[1, Alice]", "+I[2, Bob]", "+I[3, Carol]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testKvBatchScanOnEmptyTable() throws Exception { + String tableName = String.format("test_kv_batch_empty_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '3'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + // No rows written — scan must complete naturally with an empty result set. + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT * FROM %s", tableName)).collect(); + List actual = collectBatchRows(collected); + assertThat(actual).isEmpty(); } @Test @@ -420,7 +566,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { tEnv.explainSql( String.format("SELECT COUNT(address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); assertThatThrownBy( () -> @@ -429,7 +581,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { "SELECT COUNT(DISTINCT address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); // test not push down grouping count. assertThatThrownBy( @@ -440,7 +598,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { tableName)) .wait()) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } @Test @@ -495,7 +659,13 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { tEnv.explainSql( String.format("SELECT COUNT(address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); assertThatThrownBy( () -> tEnv.explainSql( @@ -503,7 +673,13 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { "SELECT COUNT(DISTINCT address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); // test not push down grouping count. assertThatThrownBy( @@ -514,7 +690,13 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { tableName)) .wait()) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either datalake enabled" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } private String prepareSourceTable(String[] keys, String partitionedKey) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 6d239f965b..76dcc620a6 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.client.write.HashBucketAssigner; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; @@ -29,6 +30,7 @@ import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SnapshotSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; @@ -143,6 +145,185 @@ void testPkTableNoSnapshotSplits() throws Throwable { } } + @Test + void testBoundedPkTableEmitsKvBatchSplits() throws Throwable { + long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + int numSubtasks = DEFAULT_BUCKET_NUM; + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + enabled, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, // bounded + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + assertThat(context.getSplitsAssignmentSequence()).isEmpty(); + + // Drive the bounded-mode async split generation. + context.runNextOneTimeCallable(); + + Map> expectedAssignment = new HashMap<>(); + for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) { + KvBatchSplit split = new KvBatchSplit(new TableBucket(tableId, bucket), null); + int owner = enumerator.getSplitOwner(split); + expectedAssignment.computeIfAbsent(owner, k -> new ArrayList<>()).add(split); + } + Map> actualAssignment = getReadersAssignments(context); + assertThat(actualAssignment).isEqualTo(expectedAssignment); + actualAssignment + .values() + .forEach( + splits -> splits.forEach(s -> assertThat(s.isKvBatchSplit()).isTrue())); + } + } + + /** + * When the master switch is off (default), bounded reads of a primary-key table without a lake + * snapshot keep the pre-FIP-17 behavior of failing at startup, so existing users are not + * silently flipped onto the new code path. + */ + @Test + void testBoundedPkDisabledByDefaultThrows() throws Throwable { + createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + null, + LeaseContext.DEFAULT, + false); + assertThatThrownBy(enumerator::start) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Batch mode requires either") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) + .hasMessageContaining( + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key()); + } + } + + /** + * Enabling the server-side KV scan switch on a log table (no primary key) must not bypass the + * "batch not supported" guard — the enumerator should still throw {@link + * UnsupportedOperationException} because KV scan is only meaningful for primary-key tables. + */ + @Test + void testKvBatchEnabledOnLogTableStillThrows() throws Throwable { + createTable(DEFAULT_TABLE_PATH, DEFAULT_LOG_TABLE_DESCRIPTOR); + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + enabled, + false, // hasPrimaryKey = false (log table) + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + null, + LeaseContext.DEFAULT, + false); + assertThatThrownBy(enumerator::start) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Batch mode requires either") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + } + } + + /** + * With {@code kvBatchEnabled=true} and a partitioned primary-key table the enumerator must emit + * one {@link KvBatchSplit} per (partition, bucket) pair — one for each auto-created partition + * and each bucket within it. + */ + @Test + void testBoundedPartitionedPkTableEmitsKvBatchSplits() throws Throwable { + long tableId = + createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR); + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + Map partitionNameByIds = + waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + + int numSubtasks = DEFAULT_BUCKET_NUM; + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + enabled, + true, + true, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + context.runNextOneTimeCallable(); + + // Every split must be a KvBatchSplit. + List allAssigned = + context.getSplitsAssignmentSequence().stream() + .flatMap(a -> a.assignment().values().stream()) + .flatMap(List::stream) + .collect(Collectors.toList()); + assertThat(allAssigned).isNotEmpty(); + allAssigned.forEach(s -> assertThat(s.isKvBatchSplit()).isTrue()); + + // Expect one split per bucket per partition. + int expectedSplitCount = partitionNameByIds.size() * DEFAULT_BUCKET_NUM; + assertThat(allAssigned).hasSize(expectedSplitCount); + + // Each split must carry the correct partition name. + Set assignedPartitionNames = + allAssigned.stream() + .map(SourceSplitBase::getPartitionName) + .collect(Collectors.toSet()); + assertThat(assignedPartitionNames) + .containsExactlyInAnyOrderElementsOf(partitionNameByIds.values()); + + // All table IDs in the splits must match the created table. + allAssigned.forEach( + s -> assertThat(s.getTableBucket().getTableId()).isEqualTo(tableId)); + } + } + @Test void testSplitAssignmentBatchSize() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index c170518995..51c1f5a8f1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.client.write.HashBucketAssigner; import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.utils.FlinkTestBase; @@ -307,6 +308,35 @@ void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception { } } + @Test + void testHandleKvBatchSplitChangesAndFetch() throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-kv-batch-split-table"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + try (FlinkSourceSplitReader splitReader = + createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.getRowType())) { + Map> rows = putRows(tableId, tablePath, 10); + + List splits = new ArrayList<>(); + for (TableBucket bucket : rows.keySet()) { + splits.add(new KvBatchSplit(bucket, null)); + } + + Map> expectedRecords = new HashMap<>(); + for (Map.Entry> e : rows.entrySet()) { + String splitId = new KvBatchSplit(e.getKey(), null).splitId(); + List records = new ArrayList<>(e.getValue().size()); + int pos = 1; + for (InternalRow row : e.getValue()) { + records.add(new RecordAndPos(new ScanRecord(row), pos++)); + } + expectedRecords.put(splitId, records); + } + + assignSplitsAndFetchUntilRetrieveRecords( + splitReader, splits, expectedRecords, DEFAULT_PK_TABLE_SCHEMA.getRowType()); + } + } + @Test void testNoSubscribedBucket() throws Exception { TablePath tablePath = TablePath.of(DEFAULT_DB, "test-no-subscribe-bucket-table"); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java index 54a3207807..78946860c5 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java @@ -26,8 +26,9 @@ /** * Unit tests for {@link org.apache.fluss.flink.source.split.SourceSplitSerializer} of serializing - * {@link org.apache.fluss.flink.source.split.SnapshotSplit} and {@link - * org.apache.fluss.flink.source.split.LogSplit}. + * {@link org.apache.fluss.flink.source.split.SnapshotSplit}, {@link + * org.apache.fluss.flink.source.split.LogSplit} and {@link + * org.apache.fluss.flink.source.split.KvBatchSplit}. */ class SourceSplitSerializerTest { @@ -71,4 +72,19 @@ void testLogSplitSerde(boolean isPartitioned) throws Exception { serializer.deserialize(serializer.getVersion(), serialized); assertThat(deserializedSplit).isEqualTo(logSplit); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testKvBatchSplitSerde(boolean isPartitioned) throws Exception { + TableBucket bucket = isPartitioned ? partitionedTableBucket : tableBucket; + String partitionName = isPartitioned ? "2024" : null; + KvBatchSplit split = new KvBatchSplit(bucket, partitionName); + + byte[] serialized = serializer.serialize(split); + SourceSplitBase deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSplit).isEqualTo(split); + assertThat(deserializedSplit.isKvBatchSplit()).isTrue(); + assertThat(deserializedSplit.asKvBatchSplit().splitId()).isEqualTo(split.splitId()); + } } diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index af9f97a696..486f8dea7e 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -102,6 +102,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) | scan.partition.discovery.interval | Duration | 1min | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. The default value is 1 minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large number of requests to ZooKeeper in server, this option cannot be set too small, as a small value would cause frequent requests and increase server load. In the future, once list partitions is optimized, the default value of this parameter can be reduced. | | scan.kv.snapshot.lease.id | String | UUID | The lease ID used to protect acquired KV snapshots from deletion. If specified, the snapshots will be retained until either the consumer finishes processing all of them or the lease duration expires. By default, this value is set to a randomly generated UUID string if not explicitly provided. | | scan.kv.snapshot.lease.duration | Duration | 1day | The time period how long to wait before expiring the kv snapshot lease to avoid kv snapshot blocking to delete. | +| client.scanner.kv.server-side.enabled | Boolean | false | Master switch for using the server-side KV scan (FIP-17) in bounded reads of primary-key tables when no KV snapshot file is available. When false (default), bounded primary-key reads fall back to the prior behavior (log-only when lake is enabled, or fail when lake is disabled). See [Full Scan of Primary Key Tables](engine-flink/reads.md#full-scan-of-primary-key-tables) for details. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | | client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | | client.scanner.log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. | diff --git a/website/docs/engine-flink/reads.md b/website/docs/engine-flink/reads.md index 57d7323666..1fff47355c 100644 --- a/website/docs/engine-flink/reads.md +++ b/website/docs/engine-flink/reads.md @@ -234,6 +234,51 @@ The server evaluates these predicates against per-batch column statistics and sk ## Batch Read +### Full Scan of Primary Key Tables + +Fluss can perform a bounded full-table scan on a primary-key table directly via the server-side KV scan API. + +Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint: + +- This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log. +- On task restart, each bucket is rescanned from scratch. Progress within a scan session is not checkpointed, because an expired or invalidated server-side session cannot be resumed from a mid-point. +- The feature is disabled by default (`false`). Without it, unbounded (streaming) reads on primary-key tables work as usual; bounded reads require the data-lake integration to be enabled. + +#### Example + +**1. Create a primary-key table with the feature enabled:** +```sql title="Flink SQL" +CREATE TABLE pk_table ( + id INT NOT NULL, + name STRING, + region STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'bucket.num' = '4', + 'client.scanner.kv.server-side.enabled' = 'true' +); +``` + +**2. Write some data:** +```sql title="Flink SQL" +INSERT INTO pk_table VALUES + (1, 'Alice', 'us-east'), + (2, 'Bob', 'eu-west'), + (3, 'Carol', 'ap-south'); +``` + +**3. Run a full scan in batch mode:** +```sql title="Flink SQL" +SET 'execution.runtime-mode' = 'batch'; +SELECT * FROM pk_table; +``` + +You can also enable the feature dynamically without storing it in the table metadata: +```sql title="Flink SQL" +SET 'execution.runtime-mode' = 'batch'; +SELECT * FROM pk_table /*+ OPTIONS('client.scanner.kv.server-side.enabled' = 'true') */; +``` + ### Limit Read The Fluss source supports limiting reads for both primary-key tables and log tables, making it convenient to preview the latest `N` records in a table.