From 3e4cc496df3378ddf59fccb587b91ee3c6b88144 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 11:58:26 +0300 Subject: [PATCH 01/13] [flink] Add KvBatchSplit for server-side KV scan integration --- .../source/event/UnfinishedSplitEvent.java | 75 +++++++++++++++++++ .../flink/source/split/KvBatchSplit.java | 65 ++++++++++++++++ .../flink/source/split/SourceSplitBase.java | 15 +++- .../source/split/SourceSplitSerializer.java | 8 +- .../split/SourceSplitSerializerTest.java | 20 ++++- 5 files changed, 179 insertions(+), 4 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java new file mode 100644 index 0000000000..72dad5fe34 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; + +/** + * Signal from a source reader back to the enumerator that a non-resumable split (today only {@link + * org.apache.fluss.flink.source.split.KvBatchSplit}) could not be completed and must be + * re-assigned. Sent on transient failures whose recovery requires opening a fresh scanner session, + * most notably {@code NOT_LEADER_OR_FOLLOWER} / {@code LeaderNotAvailableException}. + * + *

The enumerator is expected to refresh metadata and re-emit the split (possibly to a different + * reader), bounded by a per-split attempt budget so a persistently failing bucket eventually fails + * the job rather than hot-looping. + */ +public class UnfinishedSplitEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + + private final String splitId; + private final String reason; + + public UnfinishedSplitEvent(String splitId, String reason) { + this.splitId = splitId; + this.reason = reason; + } + + public String getSplitId() { + return splitId; + } + + public String getReason() { + return reason; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnfinishedSplitEvent that = (UnfinishedSplitEvent) o; + return Objects.equals(splitId, that.splitId) && Objects.equals(reason, that.reason); + } + + @Override + public int hashCode() { + return Objects.hash(splitId, reason); + } + + @Override + public String toString() { + return "UnfinishedSplitEvent{splitId='" + splitId + "', reason='" + reason + "'}"; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java new file mode 100644 index 0000000000..cdb4ee82a7 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.split; + +import org.apache.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +/** + * A bounded split that reads the full primary-key state of a bucket via the server-side KV scan + * (FIP-17). Emitted by the enumerator when a primary-key table has no KV snapshot file available + * for the bucket and the source is running in bounded mode. + * + *

This split has no resumable position: on Flink task restart the bucket is rescanned from + * scratch. Snapshot isolation is provided by the server (a consistent point-in-time view of the + * RocksDB state at the moment {@code ScanKv} opens the scan), but the client cannot resume an + * expired or invalidated session, so progress is not checkpointed. + * + *

Unlike {@link HybridSnapshotLogSplit}, this split has no log handoff phase: when the bucket + * is drained the split is marked finished. + */ +public class KvBatchSplit extends SourceSplitBase { + + private static final String KV_BATCH_SPLIT_PREFIX = "kv-batch-"; + + public KvBatchSplit(TableBucket tableBucket, @Nullable String partitionName) { + super(tableBucket, partitionName); + } + + @Override + public String splitId() { + return toSplitId(KV_BATCH_SPLIT_PREFIX, tableBucket); + } + + @Override + protected byte splitKind() { + return KV_BATCH_SPLIT_FLAG; + } + + @Override + public String toString() { + return "KvBatchSplit{" + + "tableBucket=" + + tableBucket + + ", partitionName='" + + partitionName + + '\'' + + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java index 6240b4feac..d74b1defef 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitBase.java @@ -25,11 +25,12 @@ import java.util.Objects; -/** A base source split for {@link SnapshotSplit} and {@link LogSplit}. */ +/** A base source split for {@link SnapshotSplit}, {@link LogSplit} and {@link KvBatchSplit}. */ public abstract class SourceSplitBase implements SourceSplit { public static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1; public static final byte LOG_SPLIT_FLAG = 2; + public static final byte KV_BATCH_SPLIT_FLAG = 3; protected final TableBucket tableBucket; @@ -81,6 +82,11 @@ public final boolean isHybridSnapshotLogSplit() { return getClass() == HybridSnapshotLogSplit.class; } + /** Checks whether this split is a {@link KvBatchSplit}. */ + public final boolean isKvBatchSplit() { + return getClass() == KvBatchSplit.class; + } + /** Casts this split into a {@link HybridSnapshotLogSplit}. */ public final HybridSnapshotLogSplit asHybridSnapshotLogSplit() { return (HybridSnapshotLogSplit) this; @@ -91,11 +97,18 @@ public final LogSplit asLogSplit() { return (LogSplit) this; } + /** Casts this split into a {@link KvBatchSplit}. */ + public final KvBatchSplit asKvBatchSplit() { + return (KvBatchSplit) this; + } + protected byte splitKind() { if (isHybridSnapshotLogSplit()) { return HYBRID_SNAPSHOT_SPLIT_FLAG; } else if (isLogSplit()) { return LOG_SPLIT_FLAG; + } else if (isKvBatchSplit()) { + return KV_BATCH_SPLIT_FLAG; } else { throw new IllegalArgumentException("Unsupported split kind for " + getClass()); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java index 7ee92cef9e..7f980ca655 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java @@ -42,6 +42,7 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer Date: Tue, 26 May 2026 12:18:00 +0300 Subject: [PATCH 02/13] [flink] Emit KvBatchSplit for bounded PK reads without snapshot --- .../enumerator/FlinkSourceEnumerator.java | 116 +++++++++++++++--- .../source/event/UnfinishedSplitEvent.java | 47 +++++-- .../flink/source/split/KvBatchSplit.java | 4 +- .../enumerator/FlinkSourceEnumeratorTest.java | 97 +++++++++++++++ 4 files changed, 238 insertions(+), 26 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 0b7e98b395..845f060bf4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -37,8 +37,10 @@ import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent; import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; +import org.apache.fluss.flink.source.event.UnfinishedSplitEvent; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.state.SourceEnumeratorState; @@ -147,6 +149,16 @@ public class FlinkSourceEnumerator /** checkpointId -> tableBuckets who finished consume kv snapshots. */ private final TreeMap> consumedKvSnapshotMap = new TreeMap<>(); + /** + * Per-split attempt counter for {@link KvBatchSplit}s that have been reported back via {@link + * UnfinishedSplitEvent}. Bounded by {@link #maxKvBatchRebalanceAttempts}. Keyed by {@code + * splitId}; not checkpointed (an enumerator restart resets the counter, which is acceptable + * because reassignment of an unresumable split is idempotent). + */ + private final Map kvBatchSplitAttempts = new HashMap<>(); + + private final int maxKvBatchRebalanceAttempts = 5; + // Lazily instantiated or mutable fields. private Connection connection; private Admin flussAdmin; @@ -483,27 +495,13 @@ private void startInBatchMode() { "No lake snapshot found for table {}," + " falling back to Fluss-only splits.", tablePath); - if (isPartitioned) { - Set partitionInfos = listPartitions(); - Collection partitions = - partitionInfos.stream() - .map( - p -> - new Partition( - p.getPartitionId(), - p.getPartitionName())) - .collect(Collectors.toList()); - // Use log-only splits to avoid generating mixed split - // types (HybridSnapshotLogSplit + LogSplit) for - // primary-key tables, which is not supported. - splits = this.initLogTablePartitionSplits(partitions); - } else { - splits = this.getLogSplit(null, null); - } + splits = generateFlussOnlyBatchSplits(); } return splits; }, this::handleSplitsAdd); + } else if (hasPrimaryKey) { + context.callAsync(this::generateFlussOnlyBatchSplits, this::handleSplitsAdd); } else { throw new UnsupportedOperationException( String.format( @@ -512,6 +510,56 @@ private void startInBatchMode() { } } + /** + * Builds bounded-mode Fluss-only splits when no lake snapshot is available: + * + *

+ */ + private List generateFlussOnlyBatchSplits() { + if (hasPrimaryKey) { + if (isPartitioned) { + Set partitionInfos = listPartitions(); + List splits = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + splits.addAll( + buildKvBatchSplits( + partitionInfo.getPartitionId(), + partitionInfo.getPartitionName())); + } + return splits; + } else { + return buildKvBatchSplits(null, null); + } + } else { + if (isPartitioned) { + Set partitionInfos = listPartitions(); + Collection partitions = + partitionInfos.stream() + .map(p -> new Partition(p.getPartitionId(), p.getPartitionName())) + .collect(Collectors.toList()); + return this.initLogTablePartitionSplits(partitions); + } else { + return this.getLogSplit(null, null); + } + } + } + + private List buildKvBatchSplits( + @Nullable Long partitionId, @Nullable String partitionName) { + List splits = new ArrayList<>(); + for (int bucketId = 0; bucketId < tableInfo.getNumBuckets(); bucketId++) { + TableBucket tb = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + if (ignoreTableBucket(tb)) { + continue; + } + splits.add(new KvBatchSplit(tb, partitionName)); + } + return splits; + } + private void startInStreamModeForNonPartitionedTable() { if (lakeSource != null) { context.callAsync( @@ -1181,7 +1229,41 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } tableBuckets.forEach(tableBucket -> addConsumedBucket(checkpointId, tableBucket)); + } else if (sourceEvent instanceof UnfinishedSplitEvent) { + handleUnfinishedSplitEvent((UnfinishedSplitEvent) sourceEvent); + } + } + + /** + * Re-emits a {@link KvBatchSplit} reported back by a reader as unfinished (typically due to a + * transient leadership change on the tablet server). Bounded by {@link + * #maxKvBatchRebalanceAttempts}: once the budget is exhausted for a split, the job is failed to + * surface persistent failures rather than hot-loop. + */ + private void handleUnfinishedSplitEvent(UnfinishedSplitEvent event) { + String splitId = event.getSplitId(); + int attempts = kvBatchSplitAttempts.getOrDefault(splitId, 0) + 1; + if (attempts > maxKvBatchRebalanceAttempts) { + kvBatchSplitAttempts.remove(splitId); + throw new FlinkRuntimeException( + String.format( + "KvBatchSplit %s for bucket %s exceeded %d re-assignment attempts; " + + "giving up. Last failure reason: %s", + splitId, + event.getTableBucket(), + maxKvBatchRebalanceAttempts, + event.getReason())); } + kvBatchSplitAttempts.put(splitId, attempts); + LOG.warn( + "Re-emitting KvBatchSplit {} for bucket {} (attempt {}/{}). Last failure: {}", + splitId, + event.getTableBucket(), + attempts, + maxKvBatchRebalanceAttempts, + event.getReason()); + KvBatchSplit split = new KvBatchSplit(event.getTableBucket(), event.getPartitionName()); + doHandleSplitsAdd(Collections.singletonList(split)); } @VisibleForTesting diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java index 72dad5fe34..2db463255c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java @@ -17,8 +17,12 @@ package org.apache.fluss.flink.source.event; +import org.apache.fluss.metadata.TableBucket; + import org.apache.flink.api.connector.source.SourceEvent; +import javax.annotation.Nullable; + import java.util.Objects; /** @@ -27,19 +31,28 @@ * re-assigned. Sent on transient failures whose recovery requires opening a fresh scanner session, * most notably {@code NOT_LEADER_OR_FOLLOWER} / {@code LeaderNotAvailableException}. * - *

The enumerator is expected to refresh metadata and re-emit the split (possibly to a different - * reader), bounded by a per-split attempt budget so a persistently failing bucket eventually fails - * the job rather than hot-looping. + *

The event carries the bucket and partition name so the enumerator can reconstruct the split + * without keeping per-split state across its own restarts. The enumerator is expected to refresh + * metadata and re-emit the split (possibly to a different reader), bounded by a per-split attempt + * budget so a persistently failing bucket eventually fails the job rather than hot-looping. */ public class UnfinishedSplitEvent implements SourceEvent { private static final long serialVersionUID = 1L; private final String splitId; + private final TableBucket tableBucket; + @Nullable private final String partitionName; private final String reason; - public UnfinishedSplitEvent(String splitId, String reason) { + public UnfinishedSplitEvent( + String splitId, + TableBucket tableBucket, + @Nullable String partitionName, + String reason) { this.splitId = splitId; + this.tableBucket = tableBucket; + this.partitionName = partitionName; this.reason = reason; } @@ -47,6 +60,15 @@ public String getSplitId() { return splitId; } + public TableBucket getTableBucket() { + return tableBucket; + } + + @Nullable + public String getPartitionName() { + return partitionName; + } + public String getReason() { return reason; } @@ -60,16 +82,27 @@ public boolean equals(Object o) { return false; } UnfinishedSplitEvent that = (UnfinishedSplitEvent) o; - return Objects.equals(splitId, that.splitId) && Objects.equals(reason, that.reason); + return Objects.equals(splitId, that.splitId) + && Objects.equals(tableBucket, that.tableBucket) + && Objects.equals(partitionName, that.partitionName) + && Objects.equals(reason, that.reason); } @Override public int hashCode() { - return Objects.hash(splitId, reason); + return Objects.hash(splitId, tableBucket, partitionName, reason); } @Override public String toString() { - return "UnfinishedSplitEvent{splitId='" + splitId + "', reason='" + reason + "'}"; + return "UnfinishedSplitEvent{splitId='" + + splitId + + "', tableBucket=" + + tableBucket + + ", partitionName='" + + partitionName + + "', reason='" + + reason + + "'}"; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java index cdb4ee82a7..0e8f7e309c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplit.java @@ -31,8 +31,8 @@ * RocksDB state at the moment {@code ScanKv} opens the scan), but the client cannot resume an * expired or invalidated session, so progress is not checkpointed. * - *

Unlike {@link HybridSnapshotLogSplit}, this split has no log handoff phase: when the bucket - * is drained the split is marked finished. + *

Unlike {@link HybridSnapshotLogSplit}, this split has no log handoff phase: when the bucket is + * drained the split is marked finished. */ public class KvBatchSplit extends SourceSplitBase { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 6d239f965b..470e6a044e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -27,8 +27,10 @@ import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; +import org.apache.fluss.flink.source.event.UnfinishedSplitEvent; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SnapshotSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; @@ -55,6 +57,7 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -143,6 +146,100 @@ void testPkTableNoSnapshotSplits() throws Throwable { } } + @Test + void testBoundedPkTableEmitsKvBatchSplits() throws Throwable { + long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + int numSubtasks = DEFAULT_BUCKET_NUM; + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, // bounded + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + assertThat(context.getSplitsAssignmentSequence()).isEmpty(); + + // Drive the bounded-mode async split generation. + context.runNextOneTimeCallable(); + + Map> expectedAssignment = new HashMap<>(); + for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) { + KvBatchSplit split = new KvBatchSplit(new TableBucket(tableId, bucket), null); + int owner = enumerator.getSplitOwner(split); + expectedAssignment.computeIfAbsent(owner, k -> new ArrayList<>()).add(split); + } + Map> actualAssignment = getReadersAssignments(context); + assertThat(actualAssignment).isEqualTo(expectedAssignment); + actualAssignment + .values() + .forEach( + splits -> splits.forEach(s -> assertThat(s.isKvBatchSplit()).isTrue())); + } + } + + /** + * On {@link UnfinishedSplitEvent} the enumerator re-emits the split (so a transient leadership + * change recovers cleanly), but only up to a fixed budget. Past the budget it fails the job + * rather than hot-looping. + */ + @Test + void testUnfinishedSplitEventRetryBudget() throws Throwable { + long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, // bounded + null, + null, + LeaseContext.DEFAULT, + false); + enumerator.start(); + registerReader(context, enumerator, 0); + + TableBucket bucket = new TableBucket(tableId, 0); + KvBatchSplit split = new KvBatchSplit(bucket, null); + UnfinishedSplitEvent event = + new UnfinishedSplitEvent( + split.splitId(), bucket, null, "not leader or follower"); + + // 5 attempts succeed; each re-emits the split. + int initialAssignmentCount = context.getSplitsAssignmentSequence().size(); + for (int i = 0; i < 5; i++) { + enumerator.handleSourceEvent(0, event); + } + assertThat(context.getSplitsAssignmentSequence().size() - initialAssignmentCount) + .isEqualTo(5); + + // 6th attempt exceeds the cap and fails the job. + assertThatThrownBy(() -> enumerator.handleSourceEvent(0, event)) + .isInstanceOf(FlinkRuntimeException.class) + .hasMessageContaining("exceeded") + .hasMessageContaining("re-assignment attempts"); + } + } + @Test void testSplitAssignmentBatchSize() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); From d7c4e0ad7ca3d75ed3617b305defd1d07c7c24fb Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 12:45:09 +0300 Subject: [PATCH 03/13] [flink] Wire KvBatchSplit into FlinkSourceSplitReader --- .../source/emitter/FlinkRecordEmitter.java | 2 + .../source/reader/FlinkSourceReader.java | 6 +- .../source/reader/FlinkSourceSplitReader.java | 76 ++++++++++++++++++- .../flink/source/split/KvBatchSplitState.java | 31 ++++++++ .../flink/source/split/SourceSplitState.java | 8 ++ .../reader/FlinkSourceSplitReaderTest.java | 30 ++++++++ 6 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java index ba4c9d0131..b400880263 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java @@ -87,6 +87,8 @@ public void emitRecord( .asLogSplitState() .setNextOffset(recordAndPosition.record().logOffset() + 1); } + } else if (splitState.isKvBatchSplitState()) { + processAndEmitRecord(recordAndPosition.record(), sourceOutput); } else if (splitState.isLakeSplit()) { if (lakeRecordRecordEmitter == null) { lakeRecordRecordEmitter = new LakeRecordRecordEmitter<>(this::processAndEmitRecord); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java index 68a3489bd8..ee2fe2d95e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java @@ -28,6 +28,7 @@ import org.apache.fluss.flink.source.reader.fetcher.FlinkSourceFetcherManager; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplitState; +import org.apache.fluss.flink.source.split.KvBatchSplitState; import org.apache.fluss.flink.source.split.LogSplitState; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.split.SourceSplitState; @@ -86,7 +87,8 @@ public FlinkSourceReader( projectedFields, logRecordBatchFilter, lakeSource, - flinkSourceReaderMetrics), + flinkSourceReaderMetrics, + context::sendSourceEventToCoordinator), (ignore) -> {}), recordEmitter, context.getConfiguration(), @@ -163,6 +165,8 @@ protected SourceSplitState initializedState(SourceSplitBase split) { return new HybridSnapshotLogSplitState(split.asHybridSnapshotLogSplit()); } else if (split.isLogSplit()) { return new LogSplitState(split.asLogSplit()); + } else if (split.isKvBatchSplit()) { + return new KvBatchSplitState(split.asKvBatchSplit()); } else if (split.isLakeSplit()) { return LakeSplitStateInitializer.initializedState(split); } else { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 90f2b6e9a6..14725277ec 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -26,12 +26,16 @@ import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.LeaderNotAvailableException; +import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.flink.lake.LakeSplitReaderGenerator; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.flink.metrics.FlinkMetricRegistry; +import org.apache.fluss.flink.source.event.UnfinishedSplitEvent; import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SnapshotSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; @@ -69,6 +73,7 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.function.Consumer; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -117,6 +122,8 @@ public class FlinkSourceSplitReader implements SplitReader unsubscribedTableBuckets = new HashSet<>(); + private final Consumer unfinishedSplitConsumer; + public FlinkSourceSplitReader( Configuration flussConf, TablePath tablePath, @@ -125,6 +132,26 @@ public FlinkSourceSplitReader( @Nullable Predicate logRecordBatchFilter, @Nullable LakeSource lakeSource, FlinkSourceReaderMetrics flinkSourceReaderMetrics) { + this( + flussConf, + tablePath, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + lakeSource, + flinkSourceReaderMetrics, + event -> {}); + } + + public FlinkSourceSplitReader( + Configuration flussConf, + TablePath tablePath, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + @Nullable LakeSource lakeSource, + FlinkSourceReaderMetrics flinkSourceReaderMetrics, + Consumer unfinishedSplitConsumer) { this.flinkMetricRegistry = new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); @@ -145,6 +172,7 @@ public FlinkSourceSplitReader( this.stoppingOffsets = new HashMap<>(); this.emptyLogSplits = new HashSet<>(); this.lakeSource = lakeSource; + this.unfinishedSplitConsumer = unfinishedSplitConsumer; } @Override @@ -158,7 +186,15 @@ public RecordsWithSplitIds fetch() throws IOException { } checkSnapshotSplitOrStartNext(); if (currentBoundedSplitReader != null) { - CloseableIterator recordIterator = currentBoundedSplitReader.readBatch(); + CloseableIterator recordIterator; + try { + recordIterator = currentBoundedSplitReader.readBatch(); + } catch (IOException e) { + if (currentBoundedSplit.isKvBatchSplit() && isLeaderRebalance(e)) { + return handleKvBatchLeaderRebalance(e); + } + throw e; + } if (recordIterator == null) { LOG.info("split {} is finished", currentBoundedSplit.splitId()); return finishCurrentBoundedSplit(); @@ -216,6 +252,8 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { subscribeLog(sourceSplitBase, hybridSnapshotLogSplit.getLogStartingOffset()); } else if (sourceSplitBase.isLogSplit()) { subscribeLog(sourceSplitBase, sourceSplitBase.asLogSplit().getStartingOffset()); + } else if (sourceSplitBase.isKvBatchSplit()) { + boundedSplits.add(sourceSplitBase); } else if (sourceSplitBase.isLakeSplit()) { getLakeSplitReader().addSplit(sourceSplitBase, boundedSplits); if (sourceSplitBase instanceof LakeSnapshotAndFlussLogSplit) { @@ -412,6 +450,12 @@ private void checkSnapshotSplitOrStartNext() { snapshotSplit.getTableBucket(), snapshotSplit.getSnapshotId()); currentBoundedSplitReader = new BoundedSplitReader(batchScanner, snapshotSplit.recordsToSkip()); + } else if (currentBoundedSplit.isKvBatchSplit()) { + BatchScanner batchScanner = + table.newScan() + .project(projectedFields) + .createBatchScanner(currentBoundedSplit.getTableBucket()); + currentBoundedSplitReader = new BoundedSplitReader(batchScanner, 0); } else if (currentBoundedSplit.isLakeSplit()) { currentBoundedSplitReader = getLakeSplitReader().getBoundedSplitScanner(currentBoundedSplit); @@ -576,6 +620,36 @@ public void close() throws Exception { flinkMetricRegistry.close(); } + private static boolean isLeaderRebalance(Throwable t) { + while (t != null) { + if (t instanceof NotLeaderOrFollowerException + || t instanceof LeaderNotAvailableException) { + return true; + } + t = t.getCause(); + } + return false; + } + + private FlinkRecordsWithSplitIds handleKvBatchLeaderRebalance(IOException cause) + throws IOException { + KvBatchSplit failedSplit = currentBoundedSplit.asKvBatchSplit(); + String splitId = failedSplit.splitId(); + LOG.warn( + "Leader rebalance for KvBatchSplit {}; requesting enumerator re-assignment.", + splitId, + cause); + closeCurrentBoundedSplit(); + unfinishedSplitConsumer.accept( + new UnfinishedSplitEvent( + splitId, + failedSplit.getTableBucket(), + failedSplit.getPartitionName(), + cause.getMessage())); + return new FlinkRecordsWithSplitIds( + Collections.singleton(splitId), flinkSourceReaderMetrics); + } + private void sanityCheck(RowType flussTableRowType, @Nullable int[] projectedFields) { RowType tableRowType = projectedFields != null diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java new file mode 100644 index 0000000000..e83fe9ad28 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/KvBatchSplitState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.split; + +/** State of {@link KvBatchSplit}. Has no resumable position. */ +public class KvBatchSplitState extends SourceSplitState { + + public KvBatchSplitState(KvBatchSplit split) { + super(split); + } + + @Override + public KvBatchSplit toSourceSplit() { + return (KvBatchSplit) split; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java index 32f93491e9..dbc85821a0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java @@ -46,6 +46,14 @@ public final LogSplitState asLogSplitState() { return (LogSplitState) this; } + public final boolean isKvBatchSplitState() { + return getClass() == KvBatchSplitState.class; + } + + public final KvBatchSplitState asKvBatchSplitState() { + return (KvBatchSplitState) this; + } + public abstract SourceSplitBase toSourceSplit(); public boolean isLakeSplit() { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index c170518995..51c1f5a8f1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.client.write.HashBucketAssigner; import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; +import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.utils.FlinkTestBase; @@ -307,6 +308,35 @@ void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception { } } + @Test + void testHandleKvBatchSplitChangesAndFetch() throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-kv-batch-split-table"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + try (FlinkSourceSplitReader splitReader = + createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.getRowType())) { + Map> rows = putRows(tableId, tablePath, 10); + + List splits = new ArrayList<>(); + for (TableBucket bucket : rows.keySet()) { + splits.add(new KvBatchSplit(bucket, null)); + } + + Map> expectedRecords = new HashMap<>(); + for (Map.Entry> e : rows.entrySet()) { + String splitId = new KvBatchSplit(e.getKey(), null).splitId(); + List records = new ArrayList<>(e.getValue().size()); + int pos = 1; + for (InternalRow row : e.getValue()) { + records.add(new RecordAndPos(new ScanRecord(row), pos++)); + } + expectedRecords.put(splitId, records); + } + + assignSplitsAndFetchUntilRetrieveRecords( + splitReader, splits, expectedRecords, DEFAULT_PK_TABLE_SCHEMA.getRowType()); + } + } + @Test void testNoSubscribedBucket() throws Exception { TablePath tablePath = TablePath.of(DEFAULT_DB, "test-no-subscribe-bucket-table"); From 88e7fb29829eeec335816817db3b2c26bba7ea07 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 13:05:53 +0300 Subject: [PATCH 04/13] [flink] Gate bounded KvBatchSplit emission behind master switch --- .../apache/fluss/config/ConfigOptions.java | 10 +++++ .../enumerator/FlinkSourceEnumerator.java | 43 +++++++------------ .../enumerator/FlinkSourceEnumeratorTest.java | 39 ++++++++++++++++- 3 files changed, 63 insertions(+), 29 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index b6154fa477..3b81ee7816 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1319,6 +1319,16 @@ public class ConfigOptions { + KV_SCANNER_MAX_BATCH_SIZE.key() + "'."); + public static final ConfigOption CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED = + key("client.scanner.kv.server-side.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Master switch for using the server-side KV scan (FIP-17) in bounded reads " + + "of primary-key tables when no KV snapshot file is available. When " + + "false (default), bounded primary-key reads fall back to the prior " + + "behavior (log-only when lake is enabled, or fail when lake is disabled)."); + public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE = key("client.lookup.queue-size") .intType() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 845f060bf4..205a84e3b3 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -481,6 +481,7 @@ public void start() { } private void startInBatchMode() { + boolean kvBatchEnabled = flussConf.get(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED); if (lakeEnabled) { if (lakeSource == null) { throw new IllegalStateException( @@ -489,19 +490,18 @@ private void startInBatchMode() { context.callAsync( () -> { List splits = generateHybridLakeFlussSplits(); - // No lake snapshot exists, fall back to Fluss-only splits if (splits == null) { LOG.info( "No lake snapshot found for table {}," + " falling back to Fluss-only splits.", tablePath); - splits = generateFlussOnlyBatchSplits(); + splits = generateFlussOnlyBatchSplits(kvBatchEnabled); } return splits; }, this::handleSplitsAdd); - } else if (hasPrimaryKey) { - context.callAsync(this::generateFlussOnlyBatchSplits, this::handleSplitsAdd); + } else if (kvBatchEnabled && hasPrimaryKey) { + context.callAsync(() -> generateFlussOnlyBatchSplits(true), this::handleSplitsAdd); } else { throw new UnsupportedOperationException( String.format( @@ -510,16 +510,8 @@ private void startInBatchMode() { } } - /** - * Builds bounded-mode Fluss-only splits when no lake snapshot is available: - * - *

    - *
  • primary-key tables: one {@link KvBatchSplit} per bucket (server-side KV scan), - *
  • log tables: existing log-only splits with a bounded stopping offset. - *
- */ - private List generateFlussOnlyBatchSplits() { - if (hasPrimaryKey) { + private List generateFlussOnlyBatchSplits(boolean kvBatchEnabled) { + if (kvBatchEnabled && hasPrimaryKey) { if (isPartitioned) { Set partitionInfos = listPartitions(); List splits = new ArrayList<>(); @@ -530,21 +522,18 @@ private List generateFlussOnlyBatchSplits() { partitionInfo.getPartitionName())); } return splits; - } else { - return buildKvBatchSplits(null, null); - } - } else { - if (isPartitioned) { - Set partitionInfos = listPartitions(); - Collection partitions = - partitionInfos.stream() - .map(p -> new Partition(p.getPartitionId(), p.getPartitionName())) - .collect(Collectors.toList()); - return this.initLogTablePartitionSplits(partitions); - } else { - return this.getLogSplit(null, null); } + return buildKvBatchSplits(null, null); + } + if (isPartitioned) { + Set partitionInfos = listPartitions(); + Collection partitions = + partitionInfos.stream() + .map(p -> new Partition(p.getPartitionId(), p.getPartitionName())) + .collect(Collectors.toList()); + return this.initLogTablePartitionSplits(partitions); } + return this.getLogSplit(null, null); } private List buildKvBatchSplits( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 470e6a044e..e0f8fd0141 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.client.write.HashBucketAssigner; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; @@ -150,12 +151,14 @@ void testPkTableNoSnapshotSplits() throws Throwable { void testBoundedPkTableEmitsKvBatchSplits() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = DEFAULT_BUCKET_NUM; + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( DEFAULT_TABLE_PATH, - flussConf, + enabled, true, false, context, @@ -196,15 +199,47 @@ void testBoundedPkTableEmitsKvBatchSplits() throws Throwable { * change recovers cleanly), but only up to a fixed budget. Past the budget it fails the job * rather than hot-looping. */ + /** + * When the master switch is off (default), bounded reads of a primary-key table without a lake + * snapshot keep the pre-FIP-17 behavior of failing at startup, so existing users are not + * silently flipped onto the new code path. + */ + @Test + void testBoundedPkDisabledByDefaultThrows() throws Throwable { + createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + null, + LeaseContext.DEFAULT, + false); + assertThatThrownBy(enumerator::start) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Batch only supports when table option"); + } + } + @Test void testUnfinishedSplitEventRetryBudget() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(1)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( DEFAULT_TABLE_PATH, - flussConf, + enabled, true, false, context, From b48c90a085b151a0792d6a7dd6eb84f25dea0a1b Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 14:35:39 +0300 Subject: [PATCH 05/13] [flink] redundand code cleanup and test --- .../fluss/flink/source/FlinkTableSource.java | 6 +- .../enumerator/FlinkSourceEnumerator.java | 45 -------- .../source/event/UnfinishedSplitEvent.java | 108 ------------------ .../source/reader/FlinkSourceReader.java | 3 +- .../source/reader/FlinkSourceSplitReader.java | 68 +---------- .../source/FlinkTableSourceBatchITCase.java | 35 ++++++ .../enumerator/FlinkSourceEnumeratorTest.java | 53 --------- 7 files changed, 42 insertions(+), 276 deletions(-) delete mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index b0bba81a7e..0def9f47f1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -435,7 +435,11 @@ public boolean isBounded() { + modificationScanType + " statement with conditions on primary key."); } - if (!isDataLakeEnabled) { + if (!isDataLakeEnabled + && !(hasPrimaryKey() + && flussConfig.get( + ConfigOptions + .CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) { throw new UnsupportedOperationException( "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 205a84e3b3..7e2df77e9c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -37,7 +37,6 @@ import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent; import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; -import org.apache.fluss.flink.source.event.UnfinishedSplitEvent; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; import org.apache.fluss.flink.source.split.KvBatchSplit; @@ -149,16 +148,6 @@ public class FlinkSourceEnumerator /** checkpointId -> tableBuckets who finished consume kv snapshots. */ private final TreeMap> consumedKvSnapshotMap = new TreeMap<>(); - /** - * Per-split attempt counter for {@link KvBatchSplit}s that have been reported back via {@link - * UnfinishedSplitEvent}. Bounded by {@link #maxKvBatchRebalanceAttempts}. Keyed by {@code - * splitId}; not checkpointed (an enumerator restart resets the counter, which is acceptable - * because reassignment of an unresumable split is idempotent). - */ - private final Map kvBatchSplitAttempts = new HashMap<>(); - - private final int maxKvBatchRebalanceAttempts = 5; - // Lazily instantiated or mutable fields. private Connection connection; private Admin flussAdmin; @@ -1218,41 +1207,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } tableBuckets.forEach(tableBucket -> addConsumedBucket(checkpointId, tableBucket)); - } else if (sourceEvent instanceof UnfinishedSplitEvent) { - handleUnfinishedSplitEvent((UnfinishedSplitEvent) sourceEvent); - } - } - - /** - * Re-emits a {@link KvBatchSplit} reported back by a reader as unfinished (typically due to a - * transient leadership change on the tablet server). Bounded by {@link - * #maxKvBatchRebalanceAttempts}: once the budget is exhausted for a split, the job is failed to - * surface persistent failures rather than hot-loop. - */ - private void handleUnfinishedSplitEvent(UnfinishedSplitEvent event) { - String splitId = event.getSplitId(); - int attempts = kvBatchSplitAttempts.getOrDefault(splitId, 0) + 1; - if (attempts > maxKvBatchRebalanceAttempts) { - kvBatchSplitAttempts.remove(splitId); - throw new FlinkRuntimeException( - String.format( - "KvBatchSplit %s for bucket %s exceeded %d re-assignment attempts; " - + "giving up. Last failure reason: %s", - splitId, - event.getTableBucket(), - maxKvBatchRebalanceAttempts, - event.getReason())); } - kvBatchSplitAttempts.put(splitId, attempts); - LOG.warn( - "Re-emitting KvBatchSplit {} for bucket {} (attempt {}/{}). Last failure: {}", - splitId, - event.getTableBucket(), - attempts, - maxKvBatchRebalanceAttempts, - event.getReason()); - KvBatchSplit split = new KvBatchSplit(event.getTableBucket(), event.getPartitionName()); - doHandleSplitsAdd(Collections.singletonList(split)); } @VisibleForTesting diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java deleted file mode 100644 index 2db463255c..0000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/UnfinishedSplitEvent.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.source.event; - -import org.apache.fluss.metadata.TableBucket; - -import org.apache.flink.api.connector.source.SourceEvent; - -import javax.annotation.Nullable; - -import java.util.Objects; - -/** - * Signal from a source reader back to the enumerator that a non-resumable split (today only {@link - * org.apache.fluss.flink.source.split.KvBatchSplit}) could not be completed and must be - * re-assigned. Sent on transient failures whose recovery requires opening a fresh scanner session, - * most notably {@code NOT_LEADER_OR_FOLLOWER} / {@code LeaderNotAvailableException}. - * - *

The event carries the bucket and partition name so the enumerator can reconstruct the split - * without keeping per-split state across its own restarts. The enumerator is expected to refresh - * metadata and re-emit the split (possibly to a different reader), bounded by a per-split attempt - * budget so a persistently failing bucket eventually fails the job rather than hot-looping. - */ -public class UnfinishedSplitEvent implements SourceEvent { - - private static final long serialVersionUID = 1L; - - private final String splitId; - private final TableBucket tableBucket; - @Nullable private final String partitionName; - private final String reason; - - public UnfinishedSplitEvent( - String splitId, - TableBucket tableBucket, - @Nullable String partitionName, - String reason) { - this.splitId = splitId; - this.tableBucket = tableBucket; - this.partitionName = partitionName; - this.reason = reason; - } - - public String getSplitId() { - return splitId; - } - - public TableBucket getTableBucket() { - return tableBucket; - } - - @Nullable - public String getPartitionName() { - return partitionName; - } - - public String getReason() { - return reason; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - UnfinishedSplitEvent that = (UnfinishedSplitEvent) o; - return Objects.equals(splitId, that.splitId) - && Objects.equals(tableBucket, that.tableBucket) - && Objects.equals(partitionName, that.partitionName) - && Objects.equals(reason, that.reason); - } - - @Override - public int hashCode() { - return Objects.hash(splitId, tableBucket, partitionName, reason); - } - - @Override - public String toString() { - return "UnfinishedSplitEvent{splitId='" - + splitId - + "', tableBucket=" - + tableBucket - + ", partitionName='" - + partitionName - + "', reason='" - + reason - + "'}"; - } -} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java index ee2fe2d95e..65ef6e75ee 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java @@ -87,8 +87,7 @@ public FlinkSourceReader( projectedFields, logRecordBatchFilter, lakeSource, - flinkSourceReaderMetrics, - context::sendSourceEventToCoordinator), + flinkSourceReaderMetrics), (ignore) -> {}), recordEmitter, context.getConfiguration(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 14725277ec..363b139bb7 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -26,16 +26,12 @@ import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.LeaderNotAvailableException; -import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.flink.lake.LakeSplitReaderGenerator; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.flink.metrics.FlinkMetricRegistry; -import org.apache.fluss.flink.source.event.UnfinishedSplitEvent; import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; -import org.apache.fluss.flink.source.split.KvBatchSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SnapshotSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; @@ -73,7 +69,6 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; -import java.util.function.Consumer; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -122,8 +117,6 @@ public class FlinkSourceSplitReader implements SplitReader unsubscribedTableBuckets = new HashSet<>(); - private final Consumer unfinishedSplitConsumer; - public FlinkSourceSplitReader( Configuration flussConf, TablePath tablePath, @@ -132,26 +125,6 @@ public FlinkSourceSplitReader( @Nullable Predicate logRecordBatchFilter, @Nullable LakeSource lakeSource, FlinkSourceReaderMetrics flinkSourceReaderMetrics) { - this( - flussConf, - tablePath, - sourceOutputType, - projectedFields, - logRecordBatchFilter, - lakeSource, - flinkSourceReaderMetrics, - event -> {}); - } - - public FlinkSourceSplitReader( - Configuration flussConf, - TablePath tablePath, - RowType sourceOutputType, - @Nullable int[] projectedFields, - @Nullable Predicate logRecordBatchFilter, - @Nullable LakeSource lakeSource, - FlinkSourceReaderMetrics flinkSourceReaderMetrics, - Consumer unfinishedSplitConsumer) { this.flinkMetricRegistry = new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); @@ -172,7 +145,6 @@ public FlinkSourceSplitReader( this.stoppingOffsets = new HashMap<>(); this.emptyLogSplits = new HashSet<>(); this.lakeSource = lakeSource; - this.unfinishedSplitConsumer = unfinishedSplitConsumer; } @Override @@ -186,15 +158,7 @@ public RecordsWithSplitIds fetch() throws IOException { } checkSnapshotSplitOrStartNext(); if (currentBoundedSplitReader != null) { - CloseableIterator recordIterator; - try { - recordIterator = currentBoundedSplitReader.readBatch(); - } catch (IOException e) { - if (currentBoundedSplit.isKvBatchSplit() && isLeaderRebalance(e)) { - return handleKvBatchLeaderRebalance(e); - } - throw e; - } + CloseableIterator recordIterator = currentBoundedSplitReader.readBatch(); if (recordIterator == null) { LOG.info("split {} is finished", currentBoundedSplit.splitId()); return finishCurrentBoundedSplit(); @@ -620,36 +584,6 @@ public void close() throws Exception { flinkMetricRegistry.close(); } - private static boolean isLeaderRebalance(Throwable t) { - while (t != null) { - if (t instanceof NotLeaderOrFollowerException - || t instanceof LeaderNotAvailableException) { - return true; - } - t = t.getCause(); - } - return false; - } - - private FlinkRecordsWithSplitIds handleKvBatchLeaderRebalance(IOException cause) - throws IOException { - KvBatchSplit failedSplit = currentBoundedSplit.asKvBatchSplit(); - String splitId = failedSplit.splitId(); - LOG.warn( - "Leader rebalance for KvBatchSplit {}; requesting enumerator re-assignment.", - splitId, - cause); - closeCurrentBoundedSplit(); - unfinishedSplitConsumer.accept( - new UnfinishedSplitEvent( - splitId, - failedSplit.getTableBucket(), - failedSplit.getPartitionName(), - cause.getMessage())); - return new FlinkRecordsWithSplitIds( - Collections.singleton(splitId), flinkSourceReaderMetrics); - } - private void sanityCheck(RowType flussTableRowType, @Nullable int[] projectedFields) { RowType tableRowType = projectedFields != null diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 51d05fa036..9fb632af46 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -254,6 +254,41 @@ void testScanSingleRowFilterException() throws Exception { + " or point queries on primary key when it's in batch execution mode."); } + @Test + void testKvBatchScanOnPkTable() throws Exception { + String tableName = String.format("test_kv_batch_pk_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " address varchar," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '4'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 1; i <= 5; i++) { + upsertWriter.upsert(row(i, "address" + i, "name" + i)); + } + upsertWriter.flush(); + } + + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT * FROM %s", tableName)).collect(); + List expected = + Arrays.asList( + "+I[1, address1, name1]", + "+I[2, address2, name2]", + "+I[3, address3, name3]", + "+I[4, address4, name4]", + "+I[5, address5, name5]"); + assertResultsIgnoreOrder(collected, expected, true); + } + @Test void testLakeTableQueryOnLakeDisabledTable() throws Exception { String tableName = prepareSourceTable(new String[] {"id", "name"}, null); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index e0f8fd0141..db320a4baa 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -28,7 +28,6 @@ import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; -import org.apache.fluss.flink.source.event.UnfinishedSplitEvent; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; import org.apache.fluss.flink.source.split.KvBatchSplit; @@ -58,7 +57,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; -import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -194,11 +192,6 @@ void testBoundedPkTableEmitsKvBatchSplits() throws Throwable { } } - /** - * On {@link UnfinishedSplitEvent} the enumerator re-emits the split (so a transient leadership - * change recovers cleanly), but only up to a fixed budget. Past the budget it fails the job - * rather than hot-looping. - */ /** * When the master switch is off (default), bounded reads of a primary-key table without a lake * snapshot keep the pre-FIP-17 behavior of failing at startup, so existing users are not @@ -229,52 +222,6 @@ void testBoundedPkDisabledByDefaultThrows() throws Throwable { } } - @Test - void testUnfinishedSplitEventRetryBudget() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); - Configuration enabled = new Configuration(flussConf); - enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); - try (MockSplitEnumeratorContext context = - new MockSplitEnumeratorContext<>(1)) { - FlinkSourceEnumerator enumerator = - new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, - enabled, - true, - false, - context, - OffsetsInitializer.full(), - DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, - false, // bounded - null, - null, - LeaseContext.DEFAULT, - false); - enumerator.start(); - registerReader(context, enumerator, 0); - - TableBucket bucket = new TableBucket(tableId, 0); - KvBatchSplit split = new KvBatchSplit(bucket, null); - UnfinishedSplitEvent event = - new UnfinishedSplitEvent( - split.splitId(), bucket, null, "not leader or follower"); - - // 5 attempts succeed; each re-emits the split. - int initialAssignmentCount = context.getSplitsAssignmentSequence().size(); - for (int i = 0; i < 5; i++) { - enumerator.handleSourceEvent(0, event); - } - assertThat(context.getSplitsAssignmentSequence().size() - initialAssignmentCount) - .isEqualTo(5); - - // 6th attempt exceeds the cap and fails the job. - assertThatThrownBy(() -> enumerator.handleSourceEvent(0, event)) - .isInstanceOf(FlinkRuntimeException.class) - .hasMessageContaining("exceeded") - .hasMessageContaining("re-assignment attempts"); - } - } - @Test void testSplitAssignmentBatchSize() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); From 00afd9e600baee1abb5e825cd952659b4c5cae18 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 14:56:11 +0300 Subject: [PATCH 06/13] [flink] add a test with the client --- .../fluss/flink/source/FlinkTableSource.java | 3 +- .../source/FlinkTableSourceBatchITCase.java | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 0def9f47f1..d27d7bad95 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -438,8 +438,7 @@ public boolean isBounded() { if (!isDataLakeEnabled && !(hasPrimaryKey() && flussConfig.get( - ConfigOptions - .CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) { + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) { throw new UnsupportedOperationException( "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 9fb632af46..59bbba5131 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -40,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -289,6 +290,39 @@ void testKvBatchScanOnPkTable() throws Exception { assertResultsIgnoreOrder(collected, expected, true); } + @Test + void testKvBatchScanReturnsAllRecords() throws Exception { + String tableName = String.format("test_kv_batch_100_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '3'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + for (int i = 1; i <= 100; i++) { + upsertWriter.upsert(row(i, "name" + i)); + } + upsertWriter.flush(); + } + + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT * FROM %s", tableName)).collect(); + List actual = collectRowsWithTimeout(collected, 100); + + List expected = new ArrayList<>(); + for (int i = 1; i <= 100; i++) { + expected.add(String.format("+I[%d, name%d]", i, i)); + } + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + @Test void testLakeTableQueryOnLakeDisabledTable() throws Exception { String tableName = prepareSourceTable(new String[] {"id", "name"}, null); From ecdb181c89a547f2a299e04d0f7be0a5e36a9176 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 15:20:58 +0300 Subject: [PATCH 07/13] [flink] add documentation --- .../apache/fluss/config/ConfigOptions.java | 2 +- website/docs/engine-flink/options.md | 1 + website/docs/engine-flink/reads.md | 45 +++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 3b81ee7816..ad882e497d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1324,7 +1324,7 @@ public class ConfigOptions { .booleanType() .defaultValue(false) .withDescription( - "Master switch for using the server-side KV scan (FIP-17) in bounded reads " + "Master switch for using the server-side KV scan in bounded reads " + "of primary-key tables when no KV snapshot file is available. When " + "false (default), bounded primary-key reads fall back to the prior " + "behavior (log-only when lake is enabled, or fail when lake is disabled)."); diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index af9f97a696..1598f57644 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -102,6 +102,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) | scan.partition.discovery.interval | Duration | 1min | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. The default value is 1 minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large number of requests to ZooKeeper in server, this option cannot be set too small, as a small value would cause frequent requests and increase server load. In the future, once list partitions is optimized, the default value of this parameter can be reduced. | | scan.kv.snapshot.lease.id | String | UUID | The lease ID used to protect acquired KV snapshots from deletion. If specified, the snapshots will be retained until either the consumer finishes processing all of them or the lease duration expires. By default, this value is set to a randomly generated UUID string if not explicitly provided. | | scan.kv.snapshot.lease.duration | Duration | 1day | The time period how long to wait before expiring the kv snapshot lease to avoid kv snapshot blocking to delete. | +| client.scanner.kv.server-side.enabled | Boolean | false | Master switch for using the server-side KV scan (FIP-17) in bounded reads of primary-key tables when no KV snapshot file is available. When false (default), bounded primary-key reads fall back to the prior behavior (log-only when lake is enabled, or fail when lake is disabled). See [Full Scan of Primary Key Tables](engine-flink/reads.md#full-scan-of-primary-key-tables-server-side-kv-scan) for details. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | | client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | | client.scanner.log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. | diff --git a/website/docs/engine-flink/reads.md b/website/docs/engine-flink/reads.md index 57d7323666..a882156168 100644 --- a/website/docs/engine-flink/reads.md +++ b/website/docs/engine-flink/reads.md @@ -234,6 +234,51 @@ The server evaluates these predicates against per-batch column statistics and sk ## Batch Read +### Full Scan of Primary Key Tables + +Fluss can perform a bounded full-table scan on a primary-key table directly via the server-side KV scan API. + +Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint: + +- This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log. +- On task restart, each bucket is rescanned from scratch. Progress within a scan session is not checkpointed, because an expired or invalidated server-side session cannot be resumed from a mid-point. +- The feature is disabled by default (`false`). Without it, unbounded (streaming) reads on primary-key tables work as usual; bounded reads require the data-lake integration to be enabled. + +#### Example + +**1. Create a primary-key table with the feature enabled:** +```sql title="Flink SQL" +CREATE TABLE pk_table ( + id INT NOT NULL, + name STRING, + region STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'bucket.num' = '4', + 'client.scanner.kv.server-side.enabled' = 'true' +); +``` + +**2. Write some data:** +```sql title="Flink SQL" +INSERT INTO pk_table VALUES + (1, 'Alice', 'us-east'), + (2, 'Bob', 'eu-west'), + (3, 'Carol', 'ap-south'); +``` + +**3. Run a full scan in batch mode:** +```sql title="Flink SQL" +SET 'execution.runtime-mode' = 'batch'; +SELECT * FROM pk_table; +``` + +You can also enable the feature dynamically without storing it in the table metadata: +```sql title="Flink SQL" +SET 'execution.runtime-mode' = 'batch'; +SELECT * FROM pk_table /*+ OPTIONS('client.scanner.kv.server-side.enabled' = 'true') */; +``` + ### Limit Read The Fluss source supports limiting reads for both primary-key tables and log tables, making it convenient to preview the latest `N` records in a table. From 98d8c9fafab7a2cf540f20c85acab0f049c88e47 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 15:56:54 +0300 Subject: [PATCH 08/13] [flink] small improvements and tests --- .../fluss/flink/source/FlinkTableSource.java | 8 +- .../enumerator/FlinkSourceEnumerator.java | 6 +- .../source/split/SourceSplitSerializer.java | 13 +- .../source/FlinkTableSourceBatchITCase.java | 58 ++++++- .../enumerator/FlinkSourceEnumeratorTest.java | 143 +++++++++++++++++- website/docs/engine-flink/reads.md | 2 +- 6 files changed, 208 insertions(+), 22 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index d27d7bad95..47ea28a454 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -440,7 +440,13 @@ public boolean isBounded() { && flussConfig.get( ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) { throw new UnsupportedOperationException( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } return source; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 7e2df77e9c..9e109c3550 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -494,8 +494,10 @@ private void startInBatchMode() { } else { throw new UnsupportedOperationException( String.format( - "Batch only supports when table option '%s' is set to true.", - ConfigOptions.TABLE_DATALAKE_ENABLED)); + "Batch mode requires either '%s' = 'true' (data-lake integration) " + + "or '%s' = 'true' (server-side KV scan, primary-key tables only).", + ConfigOptions.TABLE_DATALAKE_ENABLED, + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED)); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java index 7f980ca655..ff7b5364c6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java @@ -40,10 +40,6 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1; - private static final byte LOG_SPLIT_FLAG = 2; - private static final byte KV_BATCH_SPLIT_FLAG = 3; - private static final int CURRENT_VERSION = VERSION_0; @Nullable private final LakeSource lakeSource; @@ -83,8 +79,7 @@ public byte[] serialize(SourceSplitBase split) throws IOException { // write stopping offset out.writeLong(logSplit.getStoppingOffset().orElse(LogSplit.NO_STOPPING_OFFSET)); } else { - // KvBatchSplit - checkNotNull(split.asKvBatchSplit()); + // KvBatchSplit has no extra fields to serialize beyond the common header. } } else { LakeSplitSerializer lakeSplitSerializer = @@ -132,7 +127,7 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce int bucketId = in.readInt(); TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); - if (splitKind == HYBRID_SNAPSHOT_SPLIT_FLAG) { + if (splitKind == SourceSplitBase.HYBRID_SNAPSHOT_SPLIT_FLAG) { long snapshotId = in.readLong(); long recordsToSkip = in.readLong(); boolean isSnapshotFinished = in.readBoolean(); @@ -144,11 +139,11 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce recordsToSkip, isSnapshotFinished, logStartingOffset); - } else if (splitKind == LOG_SPLIT_FLAG) { + } else if (splitKind == SourceSplitBase.LOG_SPLIT_FLAG) { long startingOffset = in.readLong(); long stoppingOffset = in.readLong(); return new LogSplit(tableBucket, partitionName, startingOffset, stoppingOffset); - } else if (splitKind == KV_BATCH_SPLIT_FLAG) { + } else if (splitKind == SourceSplitBase.KV_BATCH_SPLIT_FLAG) { return new KvBatchSplit(tableBucket, partitionName); } else { LakeSplitSerializer lakeSplitSerializer = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 59bbba5131..d2773b6b12 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -20,6 +20,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TablePath; @@ -251,8 +252,13 @@ void testScanSingleRowFilterException() throws Exception { assertThatThrownBy(() -> tEnv.explainSql(query)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage( - "Currently, Fluss only support queries on table with datalake enabled" - + " or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } @Test @@ -489,7 +495,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { tEnv.explainSql( String.format("SELECT COUNT(address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); assertThatThrownBy( () -> @@ -498,7 +510,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { "SELECT COUNT(DISTINCT address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); // test not push down grouping count. assertThatThrownBy( @@ -509,7 +527,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { tableName)) .wait()) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } @Test @@ -564,7 +588,13 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { tEnv.explainSql( String.format("SELECT COUNT(address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); assertThatThrownBy( () -> tEnv.explainSql( @@ -572,7 +602,13 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { "SELECT COUNT(DISTINCT address) FROM %s", tableName))) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); // test not push down grouping count. assertThatThrownBy( @@ -583,7 +619,13 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { tableName)) .wait()) .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + "Batch mode requires either data-lake integration" + + " (set '" + + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + + "' = 'true') or server-side KV scan on a" + + " primary-key table (set '" + + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key() + + "' = 'true')."); } private String prepareSourceTable(String[] keys, String partitionedKey) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index db320a4baa..9c291e5e24 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -218,7 +218,148 @@ void testBoundedPkDisabledByDefaultThrows() throws Throwable { false); assertThatThrownBy(enumerator::start) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Batch only supports when table option"); + .hasMessageContaining("Batch mode requires either") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) + .hasMessageContaining( + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key()); + } + } + + /** + * With {@code kvBatchEnabled=true} and a partitioned primary-key table the enumerator must emit + * one {@link KvBatchSplit} per (partition, bucket) pair — one for each auto-created partition + * and each bucket within it. + */ + @Test + void testBoundedPartitionedPkTableEmitsKvBatchSplits() throws Throwable { + long tableId = + createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR); + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + Map partitionNameByIds = + waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + + int numSubtasks = DEFAULT_BUCKET_NUM; + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + enabled, + true, + true, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + context.runNextOneTimeCallable(); + + // Every split must be a KvBatchSplit. + List allAssigned = + context.getSplitsAssignmentSequence().stream() + .flatMap(a -> a.assignment().values().stream()) + .flatMap(List::stream) + .collect(Collectors.toList()); + assertThat(allAssigned).isNotEmpty(); + allAssigned.forEach(s -> assertThat(s.isKvBatchSplit()).isTrue()); + + // Expect one split per bucket per partition. + int expectedSplitCount = partitionNameByIds.size() * DEFAULT_BUCKET_NUM; + assertThat(allAssigned).hasSize(expectedSplitCount); + + // Each split must carry the correct partition name. + Set assignedPartitionNames = + allAssigned.stream() + .map(SourceSplitBase::getPartitionName) + .collect(Collectors.toSet()); + assertThat(assignedPartitionNames) + .containsExactlyInAnyOrderElementsOf(partitionNameByIds.values()); + + // All table IDs in the splits must match the created table. + allAssigned.forEach( + s -> assertThat(s.getTableBucket().getTableId()).isEqualTo(tableId)); + } + } + + /** + * When the table has data-lake enabled but no lake snapshot exists yet (freshly created table), + * and {@code kvBatchEnabled=true}, the enumerator must fall back to emitting {@link + * KvBatchSplit}s rather than log-only splits. + * + *

The server throws {@code LakeTableSnapshotNotExistException} for a fresh lake-enabled + * table, which causes {@code generateHybridLakeFlussSplits()} to return {@code null}. + */ + @Test + void testLakeEnabledNoSnapshotFallsBackToKvBatchSplits() throws Throwable { + // Create a lake-enabled PK table. No snapshot will exist yet, so + // getReadableLakeSnapshot() will throw LakeTableSnapshotNotExistException and + // generateHybridLakeFlussSplits() will return null, triggering the KV-scan fallback. + TableDescriptor lakeEnabledPkDescriptor = + TableDescriptor.builder() + .schema(DEFAULT_PK_TABLE_SCHEMA) + .distributedBy(DEFAULT_BUCKET_NUM, "id") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + long tableId = createTable(DEFAULT_TABLE_PATH, lakeEnabledPkDescriptor); + int numSubtasks = DEFAULT_BUCKET_NUM; + + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); + + // A lake source is required when lakeEnabled=true; use a no-op testing instance. + LakeSource noopLakeSource = + new TestingLakeSource(DEFAULT_BUCKET_NUM, Collections.emptyList()); + + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + enabled, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + noopLakeSource, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + // Drive the async callable that calls generateHybridLakeFlussSplits() and + // then falls back to generateFlussOnlyBatchSplits(true). + context.runNextOneTimeCallable(); + + List allAssigned = + getReadersAssignments(context).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + // Must have exactly one KvBatchSplit per bucket — not log splits. + assertThat(allAssigned).hasSize(DEFAULT_BUCKET_NUM); + allAssigned.forEach(s -> assertThat(s.isKvBatchSplit()).isTrue()); + Set buckets = + allAssigned.stream() + .map(s -> s.getTableBucket().getBucket()) + .collect(Collectors.toSet()); + assertThat(buckets).hasSize(DEFAULT_BUCKET_NUM); + allAssigned.forEach( + s -> assertThat(s.getTableBucket().getTableId()).isEqualTo(tableId)); } } diff --git a/website/docs/engine-flink/reads.md b/website/docs/engine-flink/reads.md index a882156168..1fff47355c 100644 --- a/website/docs/engine-flink/reads.md +++ b/website/docs/engine-flink/reads.md @@ -236,7 +236,7 @@ The server evaluates these predicates against per-batch column statistics and sk ### Full Scan of Primary Key Tables -Fluss can perform a bounded full-table scan on a primary-key table directly via the server-side KV scan API. +Fluss can perform a bounded full-table scan on a primary-key table directly via the server-side KV scan API. Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint: From 4f5e693d2b8477150817fe5190ac20fe6937dcf8 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 16:32:58 +0300 Subject: [PATCH 09/13] [flink] add more tests --- .../source/FlinkTableSourceBatchITCase.java | 71 ++++++++++++ .../enumerator/FlinkSourceEnumeratorTest.java | 104 ++++++------------ 2 files changed, 104 insertions(+), 71 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index d2773b6b12..903a0a6d37 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -51,6 +51,7 @@ import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectBatchRows; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; @@ -329,6 +330,76 @@ void testKvBatchScanReturnsAllRecords() throws Exception { assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); } + @Test + void testKvBatchScanOnLogTableThrows() throws Exception { + String tableName = String.format("test_kv_batch_log_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar)" + + " with (" + + " 'bucket.num' = '2'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + // The KV scan option is meaningless for log tables; batch mode must still reject them. + assertThatThrownBy(() -> tEnv.executeSql(String.format("SELECT * FROM %s", tableName))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Batch mode requires either data-lake integration") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) + .hasMessageContaining(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key()); + } + + @Test + void testKvBatchScanWithProjection() throws Exception { + String tableName = String.format("test_kv_batch_proj_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar," + + " region varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '3'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + upsertWriter.upsert(row(1, "Alice", "us-east")); + upsertWriter.upsert(row(2, "Bob", "eu-west")); + upsertWriter.upsert(row(3, "Carol", "ap-south")); + upsertWriter.flush(); + } + + // Only project two of the three columns. + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT id, name FROM %s", tableName)).collect(); + List expected = Arrays.asList("+I[1, Alice]", "+I[2, Bob]", "+I[3, Carol]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testKvBatchScanOnEmptyTable() throws Exception { + String tableName = String.format("test_kv_batch_empty_%s", RandomUtils.nextInt()); + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " name varchar," + + " primary key (id) NOT ENFORCED)" + + " with (" + + " 'bucket.num' = '3'," + + " 'client.scanner.kv.server-side.enabled' = 'true')", + tableName)); + // No rows written — scan must complete naturally with an empty result set. + CloseableIterator collected = + tEnv.executeSql(String.format("SELECT * FROM %s", tableName)).collect(); + List actual = collectBatchRows(collected); + assertThat(actual).isEmpty(); + } + @Test void testLakeTableQueryOnLakeDisabledTable() throws Exception { String tableName = prepareSourceTable(new String[] {"id", "name"}, null); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 9c291e5e24..bfc3988e4f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -225,6 +225,39 @@ void testBoundedPkDisabledByDefaultThrows() throws Throwable { } } + /** + * Enabling the server-side KV scan switch on a log table (no primary key) must not bypass the + * "batch not supported" guard — the enumerator should still throw {@link + * UnsupportedOperationException} because KV scan is only meaningful for primary-key tables. + */ + @Test + void testKvBatchEnabledOnLogTableStillThrows() throws Throwable { + createTable(DEFAULT_TABLE_PATH, DEFAULT_LOG_TABLE_DESCRIPTOR); + Configuration enabled = new Configuration(flussConf); + enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + enabled, + false, // hasPrimaryKey = false (log table) + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, + null, + null, + LeaseContext.DEFAULT, + false); + assertThatThrownBy(enumerator::start) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Batch mode requires either") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + } + } + /** * With {@code kvBatchEnabled=true} and a partitioned primary-key table the enumerator must emit * one {@link KvBatchSplit} per (partition, bucket) pair — one for each auto-created partition @@ -291,77 +324,6 @@ void testBoundedPartitionedPkTableEmitsKvBatchSplits() throws Throwable { } } - /** - * When the table has data-lake enabled but no lake snapshot exists yet (freshly created table), - * and {@code kvBatchEnabled=true}, the enumerator must fall back to emitting {@link - * KvBatchSplit}s rather than log-only splits. - * - *

The server throws {@code LakeTableSnapshotNotExistException} for a fresh lake-enabled - * table, which causes {@code generateHybridLakeFlussSplits()} to return {@code null}. - */ - @Test - void testLakeEnabledNoSnapshotFallsBackToKvBatchSplits() throws Throwable { - // Create a lake-enabled PK table. No snapshot will exist yet, so - // getReadableLakeSnapshot() will throw LakeTableSnapshotNotExistException and - // generateHybridLakeFlussSplits() will return null, triggering the KV-scan fallback. - TableDescriptor lakeEnabledPkDescriptor = - TableDescriptor.builder() - .schema(DEFAULT_PK_TABLE_SCHEMA) - .distributedBy(DEFAULT_BUCKET_NUM, "id") - .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) - .build(); - long tableId = createTable(DEFAULT_TABLE_PATH, lakeEnabledPkDescriptor); - int numSubtasks = DEFAULT_BUCKET_NUM; - - Configuration enabled = new Configuration(flussConf); - enabled.set(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED, true); - - // A lake source is required when lakeEnabled=true; use a no-op testing instance. - LakeSource noopLakeSource = - new TestingLakeSource(DEFAULT_BUCKET_NUM, Collections.emptyList()); - - try (MockSplitEnumeratorContext context = - new MockSplitEnumeratorContext<>(numSubtasks)) { - FlinkSourceEnumerator enumerator = - new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, - enabled, - true, - false, - context, - OffsetsInitializer.full(), - DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, - false, - null, - noopLakeSource, - LeaseContext.DEFAULT, - false); - - enumerator.start(); - for (int i = 0; i < numSubtasks; i++) { - registerReader(context, enumerator, i); - } - // Drive the async callable that calls generateHybridLakeFlussSplits() and - // then falls back to generateFlussOnlyBatchSplits(true). - context.runNextOneTimeCallable(); - - List allAssigned = - getReadersAssignments(context).values().stream() - .flatMap(List::stream) - .collect(Collectors.toList()); - - // Must have exactly one KvBatchSplit per bucket — not log splits. - assertThat(allAssigned).hasSize(DEFAULT_BUCKET_NUM); - allAssigned.forEach(s -> assertThat(s.isKvBatchSplit()).isTrue()); - Set buckets = - allAssigned.stream() - .map(s -> s.getTableBucket().getBucket()) - .collect(Collectors.toSet()); - assertThat(buckets).hasSize(DEFAULT_BUCKET_NUM); - allAssigned.forEach( - s -> assertThat(s.getTableBucket().getTableId()).isEqualTo(tableId)); - } - } @Test void testSplitAssignmentBatchSize() throws Throwable { From 0de049aad852fa0f6e87d0c0f29c9b193d6365ac Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Tue, 26 May 2026 16:42:46 +0300 Subject: [PATCH 10/13] [flink] fix formatting issue --- .../fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index bfc3988e4f..76dcc620a6 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -324,7 +324,6 @@ void testBoundedPartitionedPkTableEmitsKvBatchSplits() throws Throwable { } } - @Test void testSplitAssignmentBatchSize() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); From 113bc4e8da90c642034c6885b1c063f998fc7e90 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Wed, 27 May 2026 08:20:33 +0300 Subject: [PATCH 11/13] [flink] address copilot comments --- .../fluss/flink/source/enumerator/FlinkSourceEnumerator.java | 4 ++-- website/docs/engine-flink/options.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 9e109c3550..265cc0ab77 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -496,8 +496,8 @@ private void startInBatchMode() { String.format( "Batch mode requires either '%s' = 'true' (data-lake integration) " + "or '%s' = 'true' (server-side KV scan, primary-key tables only).", - ConfigOptions.TABLE_DATALAKE_ENABLED, - ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED)); + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key())); } } diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 1598f57644..486f8dea7e 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -102,7 +102,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) | scan.partition.discovery.interval | Duration | 1min | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. The default value is 1 minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large number of requests to ZooKeeper in server, this option cannot be set too small, as a small value would cause frequent requests and increase server load. In the future, once list partitions is optimized, the default value of this parameter can be reduced. | | scan.kv.snapshot.lease.id | String | UUID | The lease ID used to protect acquired KV snapshots from deletion. If specified, the snapshots will be retained until either the consumer finishes processing all of them or the lease duration expires. By default, this value is set to a randomly generated UUID string if not explicitly provided. | | scan.kv.snapshot.lease.duration | Duration | 1day | The time period how long to wait before expiring the kv snapshot lease to avoid kv snapshot blocking to delete. | -| client.scanner.kv.server-side.enabled | Boolean | false | Master switch for using the server-side KV scan (FIP-17) in bounded reads of primary-key tables when no KV snapshot file is available. When false (default), bounded primary-key reads fall back to the prior behavior (log-only when lake is enabled, or fail when lake is disabled). See [Full Scan of Primary Key Tables](engine-flink/reads.md#full-scan-of-primary-key-tables-server-side-kv-scan) for details. | +| client.scanner.kv.server-side.enabled | Boolean | false | Master switch for using the server-side KV scan (FIP-17) in bounded reads of primary-key tables when no KV snapshot file is available. When false (default), bounded primary-key reads fall back to the prior behavior (log-only when lake is enabled, or fail when lake is disabled). See [Full Scan of Primary Key Tables](engine-flink/reads.md#full-scan-of-primary-key-tables) for details. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | | client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | | client.scanner.log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. | From e3c9db5390bfe21c134d409ed29ea98622081901 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Thu, 28 May 2026 08:15:23 +0300 Subject: [PATCH 12/13] [flink] change message back to datalake enabled --- .../java/org/apache/fluss/flink/source/FlinkTableSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 47ea28a454..631c6d8176 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -440,7 +440,7 @@ public boolean isBounded() { && flussConfig.get( ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) { throw new UnsupportedOperationException( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" From eec665be5434212bded23ae127c9a0f2daa93811 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Thu, 28 May 2026 09:06:50 +0300 Subject: [PATCH 13/13] [flink] fix broken test --- .../source/FlinkTableSourceBatchITCase.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 903a0a6d37..9cbe0fd538 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -253,7 +253,7 @@ void testScanSingleRowFilterException() throws Exception { assertThatThrownBy(() -> tEnv.explainSql(query)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" @@ -345,7 +345,7 @@ void testKvBatchScanOnLogTableThrows() throws Exception { // The KV scan option is meaningless for log tables; batch mode must still reject them. assertThatThrownBy(() -> tEnv.executeSql(String.format("SELECT * FROM %s", tableName))) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Batch mode requires either data-lake integration") + .hasMessageContaining("Batch mode requires either datalake enabled") .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) .hasMessageContaining(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key()); } @@ -566,7 +566,7 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { tEnv.explainSql( String.format("SELECT COUNT(address) FROM %s", tableName))) .hasMessageContaining( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" @@ -581,7 +581,7 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { "SELECT COUNT(DISTINCT address) FROM %s", tableName))) .hasMessageContaining( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" @@ -598,7 +598,7 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { tableName)) .wait()) .hasMessageContaining( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" @@ -659,7 +659,7 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { tEnv.explainSql( String.format("SELECT COUNT(address) FROM %s", tableName))) .hasMessageContaining( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" @@ -673,7 +673,7 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { "SELECT COUNT(DISTINCT address) FROM %s", tableName))) .hasMessageContaining( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a" @@ -690,7 +690,7 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { tableName)) .wait()) .hasMessageContaining( - "Batch mode requires either data-lake integration" + "Batch mode requires either datalake enabled" + " (set '" + ConfigOptions.TABLE_DATALAKE_ENABLED.key() + "' = 'true') or server-side KV scan on a"