diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index 8535a881a8..2ecfca09df 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -52,6 +52,9 @@ import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir; @@ -236,6 +239,14 @@ public SplitEnumerator createEnumerator( public SplitEnumerator restoreEnumerator( SplitEnumeratorContext splitEnumeratorContext, SourceEnumeratorState sourceEnumeratorState) { + List remainingHybridLakeFlussSplits = + sourceEnumeratorState.getRemainingHybridLakeFlussSplits(); + // A fresh null means lake splits are not initialized yet. When restoring, null means + // nothing is pending, so normalize it here to avoid generating lake splits later. + if (remainingHybridLakeFlussSplits == null) { + remainingHybridLakeFlussSplits = Collections.emptyList(); + } + return new FlinkSourceEnumerator( tablePath, flussConf, @@ -244,7 +255,7 @@ public SplitEnumerator restoreEnumerator splitEnumeratorContext, sourceEnumeratorState.getAssignedBuckets(), sourceEnumeratorState.getAssignedPartitions(), - sourceEnumeratorState.getRemainingHybridLakeFlussSplits(), + remainingHybridLakeFlussSplits, offsetsInitializer, scanPartitionDiscoveryIntervalMs, splitPerAssignmentBatchSize, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index e04989cbb2..546ca2b3c0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -134,6 +134,20 @@ public class FlinkSourceEnumerator /** Buckets that have been assigned to readers. */ private final Set assignedTableBuckets; + /** + * Remaining lake snapshot and hybrid lake/Fluss splits to assign. + * + *

The field has three states: + * + *

    + *
  • {@code null}: lake split initialization has not run yet, or the source has no lake + * (non-lake table) so initialization will never run. + *
  • empty list: lake split initialization has run, or this enumerator was started in + * Fluss-only (non-lake) mode and must not initialize lake splits after restore. + *
  • non-empty list: lake split initialization has run and these splits still need to be + * assigned. + *
+ */ @Nullable private List pendingHybridLakeFlussSplits; private final long scanPartitionDiscoveryIntervalMs; @@ -511,18 +525,15 @@ private void startInBatchMode() { private void startInStreamModeForNonPartitionedTable() { if (lakeSource != null) { - context.callAsync( - () -> { - // firstly, try to generate hybrid lake splits, - List splits = generateHybridLakeFlussSplits(); - // splits is null, - // we'll fall back to normal fluss splits generation logic - if (splits == null) { - splits = this.initNonPartitionedSplits(); - } - return splits; - }, - this::handleSplitsAdd); + // Generate lake splits synchronously so that they are available before the + // first checkpoint. This is consistent with the partitioned-table path in + // start(). + List splits = generateHybridLakeFlussSplits(); + if (splits == null) { + // no lake snapshot, fall back to normal Fluss splits + splits = this.initNonPartitionedSplits(); + } + handleSplitsAdd(splits, null); } else { // init bucket splits and assign context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd); @@ -869,9 +880,8 @@ private List getLogSplit( /** Return the hybrid lake and fluss splits. Return null if no lake snapshot. */ @Nullable private List generateHybridLakeFlussSplits() { - // still have pending lake fluss splits, - // should be restored from checkpoint, shouldn't - // list splits again + // Restored from checkpoint with pending lake splits — return them directly + // without re-generating. if (pendingHybridLakeFlussSplits != null) { LOG.info("Still have pending lake fluss splits, shouldn't list splits again."); return pendingHybridLakeFlussSplits; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java index 121041df13..996313e607 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java @@ -270,12 +270,15 @@ private List deserializeRemainingHybridLakeFlussSplits( if (in.readBoolean()) { int numSplits = in.readInt(); List splits = new ArrayList<>(numSplits); + int version = in.readInt(); + if (numSplits == 0) { + return splits; + } SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer( checkNotNull( lakeSource, "lake source must not be null when there are hybrid lake splits.")); - int version = in.readInt(); for (int i = 0; i < numSplits; i++) { int splitSizeInBytes = in.readInt(); byte[] splitBytes = new byte[splitSizeInBytes]; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 6d239f965b..184e1b6e71 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -25,6 +25,8 @@ import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; +import org.apache.fluss.flink.source.FlinkSource; +import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; import org.apache.fluss.flink.source.reader.LeaseContext; @@ -32,6 +34,7 @@ import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SnapshotSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; +import org.apache.fluss.flink.source.state.SourceEnumeratorState; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; @@ -53,8 +56,10 @@ import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.table.data.RowData; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -210,6 +215,100 @@ void testInvalidSplitAssignmentBatchSize() throws Exception { } } + @Test + void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir Path tempDir) + throws Throwable { + long tableId = + createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR); + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + Map partitionNameByIds = + waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + Long partitionId = partitionNameByIds.keySet().stream().sorted().findFirst().get(); + String partitionName = partitionNameByIds.get(partitionId); + + LakeTableSnapshot lakeTableSnapshot = + new LakeTableSnapshot( + 0, + ImmutableMap.of( + new TableBucket(tableId, partitionId, 0), 50L, + new TableBucket(tableId, partitionId, 1), 50L, + new TableBucket(tableId, partitionId, 2), 50L)); + LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString()); + lakeTableHelper.registerLakeTableSnapshotV1(tableId, lakeTableSnapshot); + + ResolvedPartitionSpec partitionSpec = + ResolvedPartitionSpec.fromPartitionName( + Collections.singletonList("name"), partitionName); + LakeSource lakeSource = + new TestingLakeSource( + DEFAULT_BUCKET_NUM, + Collections.singletonList( + new PartitionInfo( + partitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR))); + + SourceEnumeratorState checkpointState; + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1); + SplitEnumerator enumerator = + new FlinkSource( + flussConf, + DEFAULT_TABLE_PATH, + false, + true, + DEFAULT_LOG_TABLE_SCHEMA.getRowType(), + null, + null, + OffsetsInitializer.timestamp(1000L), + 0L, + new RowDataDeserializationSchema(), + streaming, + null, + LeaseContext.DEFAULT) + .createEnumerator(context)) { + checkpointState = enumerator.snapshotState(1L); + assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNull(); + } + + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM); + SplitEnumerator restoredEnumerator = + new FlinkSource( + flussConf, + DEFAULT_TABLE_PATH, + false, + true, + DEFAULT_LOG_TABLE_SCHEMA.getRowType(), + null, + null, + OffsetsInitializer.full(), + 0L, + new RowDataDeserializationSchema(), + streaming, + null, + lakeSource, + LeaseContext.DEFAULT) + .restoreEnumerator(context, checkpointState)) { + assertThat(restoredEnumerator.snapshotState(1L).getRemainingHybridLakeFlussSplits()) + .isEmpty(); + + restoredEnumerator.start(); + context.runNextOneTimeCallable(); + context.runNextOneTimeCallable(); + + for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) { + registerReader(context, restoredEnumerator, i); + } + + List assignedSplits = + getReadersAssignments(context).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + assertThat(assignedSplits).isNotEmpty(); + assertThat(assignedSplits).allMatch(split -> split instanceof LogSplit); + assertThat(assignedSplits).noneMatch(split -> split instanceof LakeSnapshotSplit); + } + } + @Test void testPkTableWithSnapshotSplits() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); @@ -852,7 +951,7 @@ void testPartitionsExpiredInFlussButExistInLake( // --------------------- private void registerReader( MockSplitEnumeratorContext context, - FlinkSourceEnumerator enumerator, + SplitEnumerator enumerator, int readerId) { context.registerReader(new ReaderInfo(readerId, "location " + readerId)); enumerator.addReader(readerId); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java index 5273d12d16..fa90eda96f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java @@ -169,4 +169,26 @@ void testInconsistentLakeSourceSerde() throws Exception { serializer.deserialize(serializer.getVersion(), serialized); assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState); } + + @Test + void testEmptyPendingSplitsCheckpointSerdeWithoutLakeSource() throws Exception { + FlussSourceEnumeratorStateSerializer serializer = + new FlussSourceEnumeratorStateSerializer(null); + + SourceEnumeratorState sourceEnumeratorState = + new SourceEnumeratorState( + Collections.emptySet(), + Collections.emptyMap(), + Collections.emptyList(), + LeaseContext.DEFAULT.getKvSnapshotLeaseId()); + + byte[] serialized = serializer.serialize(sourceEnumeratorState); + SourceEnumeratorState deserializedSourceEnumeratorState = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState); + assertThat(deserializedSourceEnumeratorState.getRemainingHybridLakeFlussSplits()) + .isNotNull() + .isEmpty(); + } }