Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,19 @@ public InlineElement getDescription() {
"Whether only overwrite dynamic partition when overwriting a partitioned table with "
+ "dynamic partition columns. Works only when the table has partition keys.");

public static final ConfigOption<Boolean> SORT_COMPACT_SKIP_OVERWRITE_CONFLICT_DETECTION =
key("sort-compact.skip-overwrite-conflict-detection")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to skip overwrite conflict detection during sort compact commit. "
+ "Default is false (detection enabled). If set to true, sort compact will not check "
+ "for concurrent writes between the read snapshot and the commit snapshot. "
+ "WARNING: Skipping this detection may cause sort compact to silently overwrite "
+ "data written by concurrent jobs, leading to data loss and semantic errors "
+ "(e.g., the sort order may be broken because unsorted data from concurrent writes "
+ "replaces the sorted compaction output).");

public static final ConfigOption<String> PARTITION_EXPIRATION_STRATEGY =
key("partition.expiration-strategy")
.stringType()
Expand Down Expand Up @@ -3298,6 +3311,10 @@ public boolean dynamicPartitionOverwrite() {
return options.get(DYNAMIC_PARTITION_OVERWRITE);
}

public boolean sortCompactSkipOverwriteConflictDetection() {
return options.get(SORT_COMPACT_SKIP_OVERWRITE_CONFLICT_DETECTION);
}

public Duration partitionExpireTime() {
return options.get(PARTITION_EXPIRATION_TIME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ public interface FileStoreCommit extends AutoCloseable {
* on the user-defined statement, the partition might not include all partition keys. Also
* note that this partition does not necessarily equal to the partitions of the newly added
* key-values. This is just the partition to be cleaned up.
* @param baseSnapshotId If non-null, the DELETE list is built from this snapshot instead of the
* latest, and concurrent writes between this snapshot and the latest are detected as
* conflicts. Used by sort compact to prevent data loss.
*/
int overwritePartition(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties);
Map<String, String> properties,
@Nullable Long baseSnapshotId);

/**
* Drop multiple partitions. The {@link Snapshot.CommitKind} of generated snapshot is {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ private <T extends FileEntry> boolean containsFileDeletionOrDeletionVectors(
public int overwritePartition(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties) {
Map<String, String> properties,
@Nullable Long baseSnapshotId) {
LOG.info(
"Ready to overwrite to table {}, number of commit messages: {}",
tableName,
Expand Down Expand Up @@ -500,7 +501,8 @@ public int overwritePartition(
changes.appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.properties());
committable.properties(),
baseSnapshotId);
generatedSnapshot += 1;
}

Expand Down Expand Up @@ -743,15 +745,53 @@ private int tryOverwritePartition(
List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<String, String> properties) {
return tryCommit(
latestSnapshot ->
scanner.readOverwriteChanges(
options.bucket(),
changes,
indexFiles,
Map<String, String> properties,
@Nullable Long baseSnapshotId) {
// When baseSnapshotId is provided, detect concurrent writes between the base and latest
if (baseSnapshotId != null && !options.sortCompactSkipOverwriteConflictDetection()) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && latestSnapshot.id() > baseSnapshotId) {
List<BinaryRow> changedPartitions =
changes.stream()
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
List<SimpleFileEntry> incrementalChanges =
readIncrementalChanges(
snapshotManager.snapshot(baseSnapshotId),
latestSnapshot,
partitionFilter),
changedPartitions);
Collection<SimpleFileEntry> mergedIncremental =
Comment on lines +760 to +764
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] 这里把 readIncrementalChanges(...) 调成了当前类方法,但实际定义在 scanner 上;同时 Collection<SimpleFileEntry> 缺少 java.util.Collection import。这两处都会导致新增冲突检测逻辑无法编译。

建议改为 scanner.readIncrementalChanges(...),并补充 java.util.Collection import。

— gpt-5.4 via Qwen Code /review

FileEntry.mergeEntries(incrementalChanges);
boolean hasNetNewAdds =
mergedIncremental.stream()
.anyMatch(
e ->
e.kind() == FileKind.ADD
&& (partitionFilter == null
|| partitionFilter.test(
e.partition())));
if (hasNetNewAdds) {
throw new RuntimeException(
String.format(
"Sort compact conflict detected for table %s: new data was written to "
+ "the overwritten partitions between snapshot %d and %d. "
+ "Please retry the sort compact or ensure no concurrent "
+ "writes to these partitions.",
tableName, baseSnapshotId, latestSnapshot.id()));
}
}
}

return tryCommit(
latestSnapshot -> {
Snapshot baseSnapshot =
baseSnapshotId != null
? snapshotManager.snapshot(baseSnapshotId)
: latestSnapshot;
return scanner.readOverwriteChanges(
options.bucket(), changes, indexFiles, baseSnapshot, partitionFilter);
},
identifier,
watermark,
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit {
/** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL. */
InnerTableCommit withOverwrite(@Nullable Map<String, String> staticPartition);

/**
* Set the base snapshot ID for overwrite conflict detection. When set, the overwrite DELETE
* list is built from this snapshot instead of the latest, and concurrent writes between this
* snapshot and the latest are detected as conflicts.
*/
InnerTableCommit withOverwriteBaseSnapshot(@Nullable Long snapshotId);

/**
* If this is set to true, when there is no new data, no snapshot will be generated. By default,
* empty commit is ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class TableCommitImpl implements InnerTableCommit {
private final ThreadPoolExecutor fileCheckExecutor;

@Nullable private Map<String, String> overwritePartition = null;
@Nullable private Long overwriteBaseSnapshotId = null;
private boolean batchCommitted = false;
private boolean expireForEmptyCommit = true;

Expand Down Expand Up @@ -146,6 +147,12 @@ public TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePart
return this;
}

@Override
public TableCommitImpl withOverwriteBaseSnapshot(@Nullable Long snapshotId) {
this.overwriteBaseSnapshotId = snapshotId;
return this;
}

@Override
public TableCommitImpl ignoreEmptyCommit(boolean ignoreEmptyCommit) {
commit.ignoreEmptyCommit(ignoreEmptyCommit);
Expand Down Expand Up @@ -262,7 +269,10 @@ public void commitMultiple(List<ManifestCommittable> committables, boolean check
}
int newSnapshots =
commit.overwritePartition(
overwritePartition, committable, Collections.emptyMap());
overwritePartition,
committable,
Collections.emptyMap(),
overwriteBaseSnapshotId);
maintain(
committable.identifier(),
maintainExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ public List<Snapshot> overwriteData(
null,
Collections.emptyList(),
(commit, committable) ->
commit.overwritePartition(partition, committable, Collections.emptyMap()));
commit.overwritePartition(
partition, committable, Collections.emptyMap(), null));
}

public Snapshot dropPartitions(List<Map<String, String>> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,19 @@ public void testMultiPartitions(boolean cleanEmptyDirs) throws Exception {
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("dt", "0401");
commit.overwritePartition(
partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap());
partitionSpec,
new ManifestCommittable(commitIdentifier++),
Collections.emptyMap(),
null);

// step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10
partitionSpec.put("dt", "0402");
partitionSpec.put("hr", "8");
commit.overwritePartition(
partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap());
partitionSpec,
new ManifestCommittable(commitIdentifier++),
Collections.emptyMap(),
null);
commit.close();

// step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
Expand Down
Loading
Loading