diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 220ca4cdc44b..05fa2a821ad6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -128,6 +128,12 @@
MemorySize |
Memory page size for caching. |
+
+ chain-table.chain-partition-keys |
+ (none) |
+ String |
+ Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain. |
+
chain-table.enabled |
false |
@@ -374,6 +380,12 @@
Integer |
Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set. |
+
+ compaction.skip-expired-partitions |
+ false |
+ Boolean |
+ Whether to skip compacting partitions that are already expired according to 'partition.expiration-time'. Only effective when 'partition.expiration-time' is set and 'partition.expiration-strategy' is 'values-time'. |
+
compaction.total-size-threshold |
(none) |
@@ -1187,12 +1199,6 @@
String |
When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch. |
-
- chain-table.chain-partition-keys |
- (none) |
- String |
- Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain. |
-
scan.file-creation-time-millis |
(none) |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 4dc495c12b05..00e2d6b480b4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1123,6 +1123,23 @@ public InlineElement getDescription() {
"Whether only overwrite dynamic partition when overwriting a partitioned table with "
+ "dynamic partition columns. Works only when the table has partition keys.");
+ /** The strategy for partition expiration. */
+ public enum PartitionExpireStrategy {
+ VALUES_TIME("values-time"),
+ UPDATE_TIME("update-time");
+
+ private final String value;
+
+ PartitionExpireStrategy(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
+
public static final ConfigOption PARTITION_EXPIRATION_STRATEGY =
key("partition.expiration-strategy")
.stringType()
@@ -1168,6 +1185,16 @@ public InlineElement getDescription() {
+ "By default, all partitions to be expired will be expired together, which may cause a risk of out-of-memory. "
+ "Use this parameter to divide partition expiration process and mitigate memory pressure.");
+ public static final ConfigOption COMPACTION_SKIP_EXPIRED_PARTITIONS =
+ key("compaction.skip-expired-partitions")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to skip compacting partitions that are already expired "
+ + "according to 'partition.expiration-time'. "
+ + "Only effective when 'partition.expiration-time' is set "
+ + "and 'partition.expiration-strategy' is 'values-time'.");
+
public static final ConfigOption PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
@@ -3310,6 +3337,10 @@ public String partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}
+ public boolean compactionSkipExpiredPartitions() {
+ return options.get(COMPACTION_SKIP_EXPIRED_PARTITIONS);
+ }
+
@Nullable
public String dataFileExternalPaths() {
return options.get(DATA_FILE_EXTERNAL_PATHS);
diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
index 0921a65697b3..4371fe86facb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
@@ -29,6 +29,7 @@
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -37,7 +38,9 @@
import java.util.Optional;
/** Strategy for partition expiration. */
-public abstract class PartitionExpireStrategy {
+public abstract class PartitionExpireStrategy implements Serializable {
+
+ private static final long serialVersionUID = 1L;
protected final List partitionKeys;
protected final String partitionDefaultName;
diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 0016619e2162..afc58c851317 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -20,6 +20,7 @@
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -43,7 +44,9 @@
import static java.time.temporal.ChronoField.YEAR;
/** Time extractor to extract time from partition values. */
-public class PartitionTimeExtractor {
+public class PartitionTimeExtractor implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java
index 3cb7a405d2a5..628f4c3733fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java
@@ -34,6 +34,8 @@
*/
public class PartitionUpdateTimeExpireStrategy extends PartitionExpireStrategy {
+ private static final long serialVersionUID = 1L;
+
public PartitionUpdateTimeExpireStrategy(CoreOptions options, RowType partitionType) {
super(partitionType, options.partitionDefaultName());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 94e26d6f3748..6238817da4e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -45,6 +45,8 @@
*/
public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG =
LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index d4edb5cbaac0..83486a862466 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -25,6 +25,7 @@
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.CompactBucketsTable;
@@ -141,6 +142,26 @@ public DataStreamSource build() {
});
dataStream = new DataStreamSource<>(filterStream);
}
+ CoreOptions coreOptions = table.coreOptions();
+ if (coreOptions.compactionSkipExpiredPartitions()
+ && coreOptions.partitionExpireTime() != null
+ && CoreOptions.PartitionExpireStrategy.VALUES_TIME
+ .toString()
+ .equals(coreOptions.partitionExpireStrategy())) {
+ RowType partitionType = table.schema().logicalPartitionType();
+ Duration expireTime = coreOptions.partitionExpireTime();
+ PartitionValuesTimeExpireStrategy expireStrategy =
+ new PartitionValuesTimeExpireStrategy(coreOptions, partitionType);
+ SingleOutputStreamOperator filterStream =
+ dataStream.filter(
+ rowData -> {
+ LocalDateTime expireDateTime =
+ LocalDateTime.now().minus(expireTime);
+ BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
+ return !expireStrategy.isExpired(expireDateTime, partition);
+ });
+ dataStream = new DataStreamSource<>(filterStream);
+ }
Integer parallelism =
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism != null) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index cb7cf87f60b3..0b3edcf8f0f8 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -62,6 +62,8 @@
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -926,6 +928,127 @@ public void testDataEvolutionTableCompact() throws Exception {
assertThat(value).isEqualTo(30000);
}
+ @Test
+ @Timeout(60)
+ public void testSkipExpiredPartitions() throws Exception {
+ // Use a date far in the past (expired) and today's date (not expired)
+ String expiredDt =
+ LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
+ String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
+
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d");
+ tableOptions.put(
+ CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
+ CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString());
+ tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
+
+ FileStoreTable table =
+ prepareTable(
+ Collections.singletonList("dt"),
+ Arrays.asList("dt", "k"),
+ Collections.emptyList(),
+ tableOptions);
+
+ // Write two batches to each partition so each has multiple files
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString(expiredDt)),
+ rowData(1, 100, 15, BinaryString.fromString(activeDt)));
+
+ writeData(
+ rowData(2, 100, 15, BinaryString.fromString(expiredDt)),
+ rowData(2, 100, 15, BinaryString.fromString(activeDt)));
+
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+ CompactAction action =
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--table_conf",
+ CoreOptions.COMPACTION_SKIP_EXPIRED_PARTITIONS.key() + "=true");
+ StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build();
+ action.withStreamExecutionEnvironment(env).build();
+ env.execute();
+
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
+
+ List splits = table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ String dt = split.partition().getString(0).toString();
+ if (dt.equals(activeDt)) {
+ // active partition should be compacted into 1 file
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ } else {
+ // expired partition should be skipped, still has 2 files
+ assertThat(split.dataFiles().size()).isEqualTo(2);
+ }
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testNotSkipExpiredPartitionsByDefault() throws Exception {
+ String expiredDt =
+ LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
+ String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
+
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d");
+ tableOptions.put(
+ CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
+ CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString());
+ tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
+ // COMPACTION_SKIP_EXPIRED_PARTITIONS is not set, default is false
+
+ FileStoreTable table =
+ prepareTable(
+ Collections.singletonList("dt"),
+ Arrays.asList("dt", "k"),
+ Collections.emptyList(),
+ tableOptions);
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString(expiredDt)),
+ rowData(1, 100, 15, BinaryString.fromString(activeDt)));
+
+ writeData(
+ rowData(2, 100, 15, BinaryString.fromString(expiredDt)),
+ rowData(2, 100, 15, BinaryString.fromString(activeDt)));
+
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+ CompactAction action =
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName);
+ StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build();
+ action.withStreamExecutionEnvironment(env).build();
+ env.execute();
+
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
+
+ // both expired and active partitions should be compacted into 1 file
+ List splits = table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ }
+ }
+
private void setFirstRowId(List commitables, long firstRowId) {
commitables.forEach(
c -> {