diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 897bebcf686c..8b9a5102e97f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1123,6 +1123,19 @@ public InlineElement getDescription() { "Whether only overwrite dynamic partition when overwriting a partitioned table with " + "dynamic partition columns. Works only when the table has partition keys."); + public static final ConfigOption SORT_COMPACT_SKIP_OVERWRITE_CONFLICT_DETECTION = + key("sort-compact.skip-overwrite-conflict-detection") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to skip overwrite conflict detection during sort compact commit. " + + "Default is false (detection enabled). If set to true, sort compact will not check " + + "for concurrent writes between the read snapshot and the commit snapshot. " + + "WARNING: Skipping this detection may cause sort compact to silently overwrite " + + "data written by concurrent jobs, leading to data loss and semantic errors " + + "(e.g., the sort order may be broken because unsorted data from concurrent writes " + + "replaces the sorted compaction output)."); + public static final ConfigOption PARTITION_EXPIRATION_STRATEGY = key("partition.expiration-strategy") .stringType() @@ -3298,6 +3311,10 @@ public boolean dynamicPartitionOverwrite() { return options.get(DYNAMIC_PARTITION_OVERWRITE); } + public boolean sortCompactSkipOverwriteConflictDetection() { + return options.get(SORT_COMPACT_SKIP_OVERWRITE_CONFLICT_DETECTION); + } + public Duration partitionExpireTime() { return options.get(PARTITION_EXPIRATION_TIME); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 31fb3c52cab6..b0cbed9b95c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -55,11 +55,15 @@ public interface FileStoreCommit extends AutoCloseable { * on the user-defined statement, the partition might not include all partition keys. Also * note that this partition does not necessarily equal to the partitions of the newly added * key-values. This is just the partition to be cleaned up. + * @param baseSnapshotId If non-null, the DELETE list is built from this snapshot instead of the + * latest, and concurrent writes between this snapshot and the latest are detected as + * conflicts. Used by sort compact to prevent data loss. */ int overwritePartition( Map partition, ManifestCommittable committable, - Map properties); + Map properties, + @Nullable Long baseSnapshotId); /** * Drop multiple partitions. The {@link Snapshot.CommitKind} of generated snapshot is {@link diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 74a27760fc90..b23d11e0a484 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -412,7 +412,8 @@ private boolean containsFileDeletionOrDeletionVectors( public int overwritePartition( Map partition, ManifestCommittable committable, - Map properties) { + Map properties, + @Nullable Long baseSnapshotId) { LOG.info( "Ready to overwrite to table {}, number of commit messages: {}", tableName, @@ -500,7 +501,8 @@ public int overwritePartition( changes.appendIndexFiles, committable.identifier(), committable.watermark(), - committable.properties()); + committable.properties(), + baseSnapshotId); generatedSnapshot += 1; } @@ -743,15 +745,53 @@ private int tryOverwritePartition( List indexFiles, long identifier, @Nullable Long watermark, - Map properties) { - return tryCommit( - latestSnapshot -> - scanner.readOverwriteChanges( - options.bucket(), - changes, - indexFiles, + Map properties, + @Nullable Long baseSnapshotId) { + // When baseSnapshotId is provided, detect concurrent writes between the base and latest + if (baseSnapshotId != null && !options.sortCompactSkipOverwriteConflictDetection()) { + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + if (latestSnapshot != null && latestSnapshot.id() > baseSnapshotId) { + List changedPartitions = + changes.stream() + .map(ManifestEntry::partition) + .distinct() + .collect(Collectors.toList()); + List incrementalChanges = + readIncrementalChanges( + snapshotManager.snapshot(baseSnapshotId), latestSnapshot, - partitionFilter), + changedPartitions); + Collection mergedIncremental = + FileEntry.mergeEntries(incrementalChanges); + boolean hasNetNewAdds = + mergedIncremental.stream() + .anyMatch( + e -> + e.kind() == FileKind.ADD + && (partitionFilter == null + || partitionFilter.test( + e.partition()))); + if (hasNetNewAdds) { + throw new RuntimeException( + String.format( + "Sort compact conflict detected for table %s: new data was written to " + + "the overwritten partitions between snapshot %d and %d. " + + "Please retry the sort compact or ensure no concurrent " + + "writes to these partitions.", + tableName, baseSnapshotId, latestSnapshot.id())); + } + } + } + + return tryCommit( + latestSnapshot -> { + Snapshot baseSnapshot = + baseSnapshotId != null + ? snapshotManager.snapshot(baseSnapshotId) + : latestSnapshot; + return scanner.readOverwriteChanges( + options.bucket(), changes, indexFiles, baseSnapshot, partitionFilter); + }, identifier, watermark, properties, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java index a73771f2185d..17ee6fde0c40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java @@ -30,6 +30,13 @@ public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit { /** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL. */ InnerTableCommit withOverwrite(@Nullable Map staticPartition); + /** + * Set the base snapshot ID for overwrite conflict detection. When set, the overwrite DELETE + * list is built from this snapshot instead of the latest, and concurrent writes between this + * snapshot and the latest are detected as conflicts. + */ + InnerTableCommit withOverwriteBaseSnapshot(@Nullable Long snapshotId); + /** * If this is set to true, when there is no new data, no snapshot will be generated. By default, * empty commit is ignored. diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 0311c9bbe4d8..55ab39b585c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -89,6 +89,7 @@ public class TableCommitImpl implements InnerTableCommit { private final ThreadPoolExecutor fileCheckExecutor; @Nullable private Map overwritePartition = null; + @Nullable private Long overwriteBaseSnapshotId = null; private boolean batchCommitted = false; private boolean expireForEmptyCommit = true; @@ -146,6 +147,12 @@ public TableCommitImpl withOverwrite(@Nullable Map overwritePart return this; } + @Override + public TableCommitImpl withOverwriteBaseSnapshot(@Nullable Long snapshotId) { + this.overwriteBaseSnapshotId = snapshotId; + return this; + } + @Override public TableCommitImpl ignoreEmptyCommit(boolean ignoreEmptyCommit) { commit.ignoreEmptyCommit(ignoreEmptyCommit); @@ -262,7 +269,10 @@ public void commitMultiple(List committables, boolean check } int newSnapshots = commit.overwritePartition( - overwritePartition, committable, Collections.emptyMap()); + overwritePartition, + committable, + Collections.emptyMap(), + overwriteBaseSnapshotId); maintain( committable.identifier(), maintainExecutor, diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 09595d91889c..4c7e0db079fc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -249,7 +249,8 @@ public List overwriteData( null, Collections.emptyList(), (commit, committable) -> - commit.overwritePartition(partition, committable, Collections.emptyMap())); + commit.overwritePartition( + partition, committable, Collections.emptyMap(), null)); } public Snapshot dropPartitions(List> partitions) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 580662607941..a7dc70b2b62d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -154,13 +154,19 @@ public void testMultiPartitions(boolean cleanEmptyDirs) throws Exception { Map partitionSpec = new HashMap<>(); partitionSpec.put("dt", "0401"); commit.overwritePartition( - partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap()); + partitionSpec, + new ManifestCommittable(commitIdentifier++), + Collections.emptyMap(), + null); // step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10 partitionSpec.put("dt", "0402"); partitionSpec.put("hr", "8"); commit.overwritePartition( - partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap()); + partitionSpec, + new ManifestCommittable(commitIdentifier++), + Collections.emptyMap(), + null); commit.close(); // step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0 diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 71eb081de89f..ea543adcbd64 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -39,6 +39,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.RetryCommitResult; @@ -1224,6 +1225,219 @@ private List kvMapToKvList(Map map) { .collect(Collectors.toList()); } + @Test + public void testOverwriteWithBaseSnapshotPinsDeleteList() throws Exception { + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + BinaryRow partition = gen.getPartition(gen.next()); + + // Commit snapshot 1: files A, B + CommitMessageImpl msg1 = + store.writeDataFiles(partition, 0, Arrays.asList("A.parquet", "B.parquet")); + store.commit(msg1); + long baseSnapshotId = store.snapshotManager().latestSnapshotId(); + + // Concurrent write: commit snapshot 2 with file C (simulates write after sort compact read) + CommitMessageImpl msg2 = + store.writeDataFiles(partition, 0, Collections.singletonList("C.parquet")); + store.commit(msg2); + long latestSnapshotId = store.snapshotManager().latestSnapshotId(); + assertThat(latestSnapshotId).isEqualTo(baseSnapshotId + 1); + + // Overwrite with baseSnapshotId should detect conflict (file C added after base) + FileStoreCommitImpl commit = store.newCommit(); + ManifestCommittable committable = new ManifestCommittable(100L); + CommitMessageImpl sortedFiles = + store.writeDataFiles(partition, 0, Arrays.asList("X.parquet", "Y.parquet")); + committable.addFileCommittable(sortedFiles); + + assertThatThrownBy( + () -> + commit.overwritePartition( + Collections.emptyMap(), + committable, + Collections.emptyMap(), + baseSnapshotId)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Sort compact conflict detected"); + commit.close(); + } + + @Test + public void testOverwriteWithBaseSnapshotSucceedsWithoutConcurrentWrite() throws Exception { + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + BinaryRow partition = gen.getPartition(gen.next()); + + // Commit snapshot 1: files A, B + CommitMessageImpl msg1 = + store.writeDataFiles(partition, 0, Arrays.asList("A.parquet", "B.parquet")); + store.commit(msg1); + long baseSnapshotId = store.snapshotManager().latestSnapshotId(); + + // No concurrent write happens; overwrite with baseSnapshotId should succeed + FileStoreCommitImpl commit = store.newCommit(); + ManifestCommittable committable = new ManifestCommittable(100L); + CommitMessageImpl sortedFiles = + store.writeDataFiles(partition, 0, Arrays.asList("X.parquet", "Y.parquet")); + committable.addFileCommittable(sortedFiles); + + int snapshots = + commit.overwritePartition( + Collections.emptyMap(), + committable, + Collections.emptyMap(), + baseSnapshotId); + assertThat(snapshots).isGreaterThan(0); + + Snapshot latest = store.snapshotManager().latestSnapshot(); + assertThat(latest.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); + commit.close(); + } + + @Test + public void testOverwriteWithNullBaseSnapshotFallsBackToCurrentBehavior() throws Exception { + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + BinaryRow partition = gen.getPartition(gen.next()); + + // Commit snapshot 1: files A, B + CommitMessageImpl msg1 = + store.writeDataFiles(partition, 0, Arrays.asList("A.parquet", "B.parquet")); + store.commit(msg1); + + // Concurrent write: commit snapshot 2 with file C + CommitMessageImpl msg2 = + store.writeDataFiles(partition, 0, Collections.singletonList("C.parquet")); + store.commit(msg2); + + // Overwrite with null baseSnapshotId should succeed (current behavior, no conflict + // detection) + FileStoreCommitImpl commit = store.newCommit(); + ManifestCommittable committable = new ManifestCommittable(100L); + CommitMessageImpl sortedFiles = + store.writeDataFiles(partition, 0, Arrays.asList("X.parquet", "Y.parquet")); + committable.addFileCommittable(sortedFiles); + + int snapshots = + commit.overwritePartition( + Collections.emptyMap(), committable, Collections.emptyMap(), null); + assertThat(snapshots).isGreaterThan(0); + + Snapshot latest = store.snapshotManager().latestSnapshot(); + assertThat(latest.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); + commit.close(); + } + + @Test + public void testOverwriteWithBaseSnapshotAllowsCompactionBetweenSnapshots() throws Exception { + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + BinaryRow partition = gen.getPartition(gen.next()); + + // Commit snapshot 1: files A, B + CommitMessageImpl msg1 = + store.writeDataFiles(partition, 0, Arrays.asList("A.parquet", "B.parquet")); + store.commit(msg1); + long baseSnapshotId = store.snapshotManager().latestSnapshotId(); + + // Simulate a compaction between base and latest: compaction replaces A+B with AB + // Compaction produces DELETE A, DELETE B, ADD AB — net change after merge is zero new files + // This should NOT be treated as a concurrent write conflict + ManifestCommittable compactCommittable = new ManifestCommittable(50L); + store.newCommit().commit(compactCommittable, false); + + // Overwrite with baseSnapshotId; no actual new data files were added + FileStoreCommitImpl commit = store.newCommit(); + ManifestCommittable committable = new ManifestCommittable(100L); + CommitMessageImpl sortedFiles = + store.writeDataFiles(partition, 0, Arrays.asList("X.parquet", "Y.parquet")); + committable.addFileCommittable(sortedFiles); + + int snapshots = + commit.overwritePartition( + Collections.emptyMap(), + committable, + Collections.emptyMap(), + baseSnapshotId); + assertThat(snapshots).isGreaterThan(0); + commit.close(); + } + + @Test + public void testOverwriteSkipsConflictDetectionWhenConfigured() throws Exception { + Map opts = new HashMap<>(); + opts.put(CoreOptions.SORT_COMPACT_SKIP_OVERWRITE_CONFLICT_DETECTION.key(), "true"); + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, opts); + BinaryRow partition = gen.getPartition(gen.next()); + + // Commit snapshot 1: files A, B + CommitMessageImpl msg1 = + store.writeDataFiles(partition, 0, Arrays.asList("A.parquet", "B.parquet")); + store.commit(msg1); + long baseSnapshotId = store.snapshotManager().latestSnapshotId(); + + // Concurrent write: commit snapshot 2 with file C (simulates write after sort compact read) + CommitMessageImpl msg2 = + store.writeDataFiles(partition, 0, Collections.singletonList("C.parquet")); + store.commit(msg2); + + // Overwrite with baseSnapshotId should succeed (conflict detection skipped) + FileStoreCommitImpl commit = store.newCommit(); + ManifestCommittable committable = new ManifestCommittable(100L); + CommitMessageImpl sortedFiles = + store.writeDataFiles(partition, 0, Arrays.asList("X.parquet", "Y.parquet")); + committable.addFileCommittable(sortedFiles); + + int snapshots = + commit.overwritePartition( + Collections.emptyMap(), + committable, + Collections.emptyMap(), + baseSnapshotId); + assertThat(snapshots).isGreaterThan(0); + + Snapshot latest = store.snapshotManager().latestSnapshot(); + assertThat(latest.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE); + + ManifestList manifestList = store.manifestListFactory().create(); + ManifestFile manifestFile = store.manifestFileFactory().create(); + + // Delta manifests: the overwrite commit itself. + // DELETE A, B (pinned to baseSnapshot 1) and ADD X, Y (sort compact output). + List deltaEntries = + manifestList.readDeltaManifests(latest).stream() + .flatMap(meta -> manifestFile.read(meta.fileName()).stream()) + .collect(Collectors.toList()); + List deltaDeletes = + deltaEntries.stream() + .filter(e -> e.kind() == FileKind.DELETE) + .map(e -> e.file().fileName()) + .collect(Collectors.toList()); + List deltaAdds = + deltaEntries.stream() + .filter(e -> e.kind() == FileKind.ADD) + .map(e -> e.file().fileName()) + .collect(Collectors.toList()); + assertThat(deltaDeletes).containsExactlyInAnyOrder("A.parquet", "B.parquet"); + assertThat(deltaAdds).containsExactlyInAnyOrder("X.parquet", "Y.parquet"); + + // Base manifests: carried forward from prior snapshots. + // A, B (snapshot 1) and C (concurrent write, snapshot 2) are all ADDs in base. + List baseEntries = + manifestList.read(latest.baseManifestList(), latest.baseManifestListSize()).stream() + .flatMap(meta -> manifestFile.read(meta.fileName()).stream()) + .collect(Collectors.toList()); + List baseFileNames = + baseEntries.stream().map(e -> e.file().fileName()).collect(Collectors.toList()); + assertThat(baseFileNames).containsExactlyInAnyOrder("A.parquet", "B.parquet", "C.parquet"); + assertThat(baseEntries).allMatch(e -> e.kind() == FileKind.ADD); + + // Net visible files: base ADDs minus delta DELETEs plus delta ADDs = C, X, Y + List visibleFiles = + store.newScan().plan().files().stream() + .map(e -> e.file().fileName()) + .collect(Collectors.toList()); + assertThat(visibleFiles).containsExactlyInAnyOrder("C.parquet", "X.parquet", "Y.parquet"); + commit.close(); + } + private void logData(Supplier> supplier, String name) { if (!LOG.isDebugEnabled()) { return; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java index cdd2d8fba11f..085b8a804950 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java @@ -172,7 +172,8 @@ private void doOverwrite() throws Exception { commit.overwritePartition( TestKeyValueGenerator.toPartitionMap(partition, MULTI_PARTITIONED), committable, - Collections.emptyMap())); + Collections.emptyMap(), + null)); } private void doFinalCompact() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 289802e2ba6b..b148ece9002a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.OrderType; +import org.apache.paimon.Snapshot; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.sink.SortCompactSinkBuilder; import org.apache.paimon.flink.sorter.TableSortInfo; @@ -135,10 +136,14 @@ public void build() throws Exception { fileStoreTable.rowType(), sortInfo); + Snapshot readSnapshot = fileStoreTable.snapshotManager().latestSnapshot(); + Long readSnapshotId = readSnapshot == null ? null : readSnapshot.id(); + new SortCompactSinkBuilder(fileStoreTable) .forCompact(true) .forRowData(sorter.sort()) .overwrite() + .withOverwriteBaseSnapshot(readSnapshotId) .build(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 87d12924533a..f28acea5d5fa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -82,6 +82,7 @@ public class FlinkSinkBuilder { private DataStream input; @Nullable protected Map overwritePartition; + @Nullable protected Long overwriteBaseSnapshotId; @Nullable private Integer parallelism; @Nullable private TableSortInfo tableSortInfo; @@ -131,6 +132,12 @@ public FlinkSinkBuilder overwrite(Map overwritePartition) { return this; } + /** Set the base snapshot for overwrite conflict detection (used by sort compact). */ + public FlinkSinkBuilder withOverwriteBaseSnapshot(@Nullable Long snapshotId) { + this.overwriteBaseSnapshotId = snapshotId; + return this; + } + /** Set sink parallelism. */ public FlinkSinkBuilder parallelism(@Nullable Integer parallelism) { this.parallelism = parallelism; @@ -259,14 +266,19 @@ public static DataStream mapToInternalRow( protected DataStreamSink buildDynamicBucketSink( DataStream input, boolean globalIndex) { - return compactSink && !globalIndex - // todo support global index sort compact - ? new DynamicBucketCompactSink(table, overwritePartition).build(input, parallelism) - : globalIndex - ? new GlobalDynamicBucketSink(table, overwritePartition) - .build(input, parallelism) - : new RowDynamicBucketSink(table, overwritePartition) - .build(input, parallelism); + if (compactSink && !globalIndex) { + DynamicBucketCompactSink sink = new DynamicBucketCompactSink(table, overwritePartition); + sink.setOverwriteBaseSnapshotId(overwriteBaseSnapshotId); + return sink.build(input, parallelism); + } else if (globalIndex) { + GlobalDynamicBucketSink sink = new GlobalDynamicBucketSink(table, overwritePartition); + sink.setOverwriteBaseSnapshotId(overwriteBaseSnapshotId); + return sink.build(input, parallelism); + } else { + RowDynamicBucketSink sink = new RowDynamicBucketSink(table, overwritePartition); + sink.setOverwriteBaseSnapshotId(overwriteBaseSnapshotId); + return sink.build(input, parallelism); + } } protected DataStreamSink buildForFixedBucket(DataStream input) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index 7b7cb18bb819..4fb3e3344be2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -40,12 +40,17 @@ public abstract class FlinkWriteSink extends FlinkSink { private static final long serialVersionUID = 1L; @Nullable protected final Map overwritePartition; + @Nullable protected Long overwriteBaseSnapshotId; public FlinkWriteSink(FileStoreTable table, @Nullable Map overwritePartition) { super(table, overwritePartition != null); this.overwritePartition = overwritePartition; } + public void setOverwriteBaseSnapshotId(@Nullable Long overwriteBaseSnapshotId) { + this.overwriteBaseSnapshotId = overwriteBaseSnapshotId; + } + @Override protected Committer.Factory createCommitterFactory() { // If checkpoint is enabled for streaming job, we have to @@ -57,6 +62,7 @@ protected Committer.Factory createCommitterFac table, table.newCommit(context.commitUser()) .withOverwrite(overwritePartition) + .withOverwriteBaseSnapshot(overwriteBaseSnapshotId) .ignoreEmptyCommit(!context.streamingCheckpointEnabled()), context); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7f8c89dcbacf..12cfdb16d6da 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.OrderType; +import org.apache.paimon.Snapshot; import org.apache.paimon.append.AppendCompactCoordinator; import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.append.cluster.IncrementalClusterManager; @@ -636,9 +637,12 @@ private void sortCompactUnAwareBucketTable( .reduce(Dataset::union) .orElse(null); if (datasetForWrite != null) { + Snapshot readSnapshot = table.snapshotManager().latestSnapshot(); + Long readSnapshotId = readSnapshot == null ? null : readSnapshot.id(); + PaimonSparkWriter writer = PaimonSparkWriter.apply(table); - // Use dynamic partition overwrite writer.writeBuilder().withOverwrite(); + writer.withOverwriteBaseSnapshot(readSnapshotId); writer.commit(writer.write(datasetForWrite)); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 26ca12197c70..b83d6dd1c7d7 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -93,6 +93,13 @@ case class PaimonSparkWriter( tableForWrite.newBatchWriteBuilder() } + var overwriteBaseSnapshotId: java.lang.Long = _ + + def withOverwriteBaseSnapshot(snapshotId: java.lang.Long): PaimonSparkWriter = { + this.overwriteBaseSnapshotId = snapshotId + this + } + def writeOnly(): PaimonSparkWriter = { PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true"))) } @@ -422,6 +429,11 @@ case class PaimonSparkWriter( writeBuilder } val tableCommit = finalWriteBuilder.newCommit() + tableCommit match { + case innerTableCommit: InnerTableCommit => + innerTableCommit.withOverwriteBaseSnapshot(overwriteBaseSnapshotId) + case _ => + } try { tableCommit.commit(commitMessages.toList.asJava) } catch { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index a44158d83207..d8599c305aad 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.Snapshot.CommitKind +import org.apache.paimon.data.GenericRow import org.apache.paimon.fs.Path import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.spark.utils.SparkProcedureUtils @@ -1339,6 +1340,71 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT } } + test("Paimon Procedure: sort compact detects concurrent write conflict") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ('bucket'='-1') + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES (1, 1)") + spark.sql("INSERT INTO T VALUES (2, 2)") + + val table = loadTable("T") + val baseSnapshotId = table.snapshotManager().latestSnapshotId() + + // Simulate concurrent write after base snapshot + spark.sql("INSERT INTO T VALUES (3, 3)") + + val latestSnapshotId = table.snapshotManager().latestSnapshotId() + Assertions.assertThat(latestSnapshotId).isGreaterThan(baseSnapshotId) + + val overwritePartition = new java.util.HashMap[String, String]() + + // Generate commit messages via BatchWriteBuilder + val builder = table.newBatchWriteBuilder().withOverwrite(overwritePartition) + val batchWrite = builder.newWrite() + batchWrite.write(GenericRow.of(Integer.valueOf(99), Integer.valueOf(99))) + val messages = batchWrite.prepareCommit() + batchWrite.close() + + // Attempt overwrite with baseSnapshotId: should detect conflict + val user = UUID.randomUUID().toString + val tableCommit = table + .newCommit(user) + .withOverwrite(overwritePartition) + .withOverwriteBaseSnapshot(java.lang.Long.valueOf(baseSnapshotId)) + + Assertions + .assertThatThrownBy(() => tableCommit.commit(messages)) + .isInstanceOf(classOf[RuntimeException]) + .hasMessageContaining("Sort compact conflict detected") + tableCommit.close() + } + } + + test("Paimon Procedure: sort compact succeeds with overwrite base snapshot") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ('bucket'='-1') + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES (2, 2)") + spark.sql("INSERT INTO T VALUES (1, 1)") + + checkAnswer( + spark.sql("CALL sys.compact(table => 'T', order_strategy => 'order', order_by => 'a')"), + Row(true) :: Nil) + + val table = loadTable("T") + Assertions + .assertThat(table.latestSnapshot().get().commitKind()) + .isEqualTo(CommitKind.OVERWRITE) + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Seq(Row(1, 1), Row(2, 2))) + } + } + def checkSnapshot(table: FileStoreTable): Unit = { Assertions .assertThat(table.latestSnapshot().get().commitKind().toString)