diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 220ca4cdc44b..05fa2a821ad6 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -128,6 +128,12 @@ MemorySize Memory page size for caching. + +
chain-table.chain-partition-keys
+ (none) + String + Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain. +
chain-table.enabled
false @@ -374,6 +380,12 @@ Integer Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set. + +
compaction.skip-expired-partitions
+ false + Boolean + Whether to skip compacting partitions that are already expired according to 'partition.expiration-time'. Only effective when 'partition.expiration-time' is set and 'partition.expiration-strategy' is 'values-time'. +
compaction.total-size-threshold
(none) @@ -1187,12 +1199,6 @@ String When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch. - -
chain-table.chain-partition-keys
- (none) - String - Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain. -
scan.file-creation-time-millis
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 4dc495c12b05..00e2d6b480b4 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1123,6 +1123,23 @@ public InlineElement getDescription() { "Whether only overwrite dynamic partition when overwriting a partitioned table with " + "dynamic partition columns. Works only when the table has partition keys."); + /** The strategy for partition expiration. */ + public enum PartitionExpireStrategy { + VALUES_TIME("values-time"), + UPDATE_TIME("update-time"); + + private final String value; + + PartitionExpireStrategy(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } + public static final ConfigOption PARTITION_EXPIRATION_STRATEGY = key("partition.expiration-strategy") .stringType() @@ -1168,6 +1185,16 @@ public InlineElement getDescription() { + "By default, all partitions to be expired will be expired together, which may cause a risk of out-of-memory. " + "Use this parameter to divide partition expiration process and mitigate memory pressure."); + public static final ConfigOption COMPACTION_SKIP_EXPIRED_PARTITIONS = + key("compaction.skip-expired-partitions") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to skip compacting partitions that are already expired " + + "according to 'partition.expiration-time'. " + + "Only effective when 'partition.expiration-time' is set " + + "and 'partition.expiration-strategy' is 'values-time'."); + public static final ConfigOption PARTITION_TIMESTAMP_FORMATTER = key("partition.timestamp-formatter") .stringType() @@ -3310,6 +3337,10 @@ public String partitionExpireStrategy() { return options.get(PARTITION_EXPIRATION_STRATEGY); } + public boolean compactionSkipExpiredPartitions() { + return options.get(COMPACTION_SKIP_EXPIRED_PARTITIONS); + } + @Nullable public String dataFileExternalPaths() { return options.get(DATA_FILE_EXTERNAL_PATHS); diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java index 0921a65697b3..4371fe86facb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -37,7 +38,9 @@ import java.util.Optional; /** Strategy for partition expiration. */ -public abstract class PartitionExpireStrategy { +public abstract class PartitionExpireStrategy implements Serializable { + + private static final long serialVersionUID = 1L; protected final List partitionKeys; protected final String partitionDefaultName; diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java index 0016619e2162..afc58c851317 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -43,7 +44,9 @@ import static java.time.temporal.ChronoField.YEAR; /** Time extractor to extract time from partition values. */ -public class PartitionTimeExtractor { +public class PartitionTimeExtractor implements Serializable { + + private static final long serialVersionUID = 1L; private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java index 3cb7a405d2a5..628f4c3733fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java @@ -34,6 +34,8 @@ */ public class PartitionUpdateTimeExpireStrategy extends PartitionExpireStrategy { + private static final long serialVersionUID = 1L; + public PartitionUpdateTimeExpireStrategy(CoreOptions options, RowType partitionType) { super(partitionType, options.partitionDefaultName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java index 94e26d6f3748..6238817da4e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java @@ -45,6 +45,8 @@ */ public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java index d4edb5cbaac0..83486a862466 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.system.CompactBucketsTable; @@ -141,6 +142,26 @@ public DataStreamSource build() { }); dataStream = new DataStreamSource<>(filterStream); } + CoreOptions coreOptions = table.coreOptions(); + if (coreOptions.compactionSkipExpiredPartitions() + && coreOptions.partitionExpireTime() != null + && CoreOptions.PartitionExpireStrategy.VALUES_TIME + .toString() + .equals(coreOptions.partitionExpireStrategy())) { + RowType partitionType = table.schema().logicalPartitionType(); + Duration expireTime = coreOptions.partitionExpireTime(); + PartitionValuesTimeExpireStrategy expireStrategy = + new PartitionValuesTimeExpireStrategy(coreOptions, partitionType); + SingleOutputStreamOperator filterStream = + dataStream.filter( + rowData -> { + LocalDateTime expireDateTime = + LocalDateTime.now().minus(expireTime); + BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1)); + return !expireStrategy.isExpired(expireDateTime, partition); + }); + dataStream = new DataStreamSource<>(filterStream); + } Integer parallelism = Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM); if (parallelism != null) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index cb7cf87f60b3..0b3edcf8f0f8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -62,6 +62,8 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -926,6 +928,127 @@ public void testDataEvolutionTableCompact() throws Exception { assertThat(value).isEqualTo(30000); } + @Test + @Timeout(60) + public void testSkipExpiredPartitions() throws Exception { + // Use a date far in the past (expired) and today's date (not expired) + String expiredDt = + LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd")); + String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true"); + tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d"); + tableOptions.put( + CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), + CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString()); + tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd"); + + FileStoreTable table = + prepareTable( + Collections.singletonList("dt"), + Arrays.asList("dt", "k"), + Collections.emptyList(), + tableOptions); + + // Write two batches to each partition so each has multiple files + writeData( + rowData(1, 100, 15, BinaryString.fromString(expiredDt)), + rowData(1, 100, 15, BinaryString.fromString(activeDt))); + + writeData( + rowData(2, 100, 15, BinaryString.fromString(expiredDt)), + rowData(2, 100, 15, BinaryString.fromString(activeDt))); + + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + CompactAction action = + createAction( + CompactAction.class, + "compact", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--table_conf", + CoreOptions.COMPACTION_SKIP_EXPIRED_PARTITIONS.key() + "=true"); + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + action.withStreamExecutionEnvironment(env).build(); + env.execute(); + + checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT); + + List splits = table.newSnapshotReader().read().dataSplits(); + for (DataSplit split : splits) { + String dt = split.partition().getString(0).toString(); + if (dt.equals(activeDt)) { + // active partition should be compacted into 1 file + assertThat(split.dataFiles().size()).isEqualTo(1); + } else { + // expired partition should be skipped, still has 2 files + assertThat(split.dataFiles().size()).isEqualTo(2); + } + } + } + + @Test + @Timeout(60) + public void testNotSkipExpiredPartitionsByDefault() throws Exception { + String expiredDt = + LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd")); + String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true"); + tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d"); + tableOptions.put( + CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), + CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString()); + tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd"); + // COMPACTION_SKIP_EXPIRED_PARTITIONS is not set, default is false + + FileStoreTable table = + prepareTable( + Collections.singletonList("dt"), + Arrays.asList("dt", "k"), + Collections.emptyList(), + tableOptions); + + writeData( + rowData(1, 100, 15, BinaryString.fromString(expiredDt)), + rowData(1, 100, 15, BinaryString.fromString(activeDt))); + + writeData( + rowData(2, 100, 15, BinaryString.fromString(expiredDt)), + rowData(2, 100, 15, BinaryString.fromString(activeDt))); + + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + CompactAction action = + createAction( + CompactAction.class, + "compact", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName); + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + action.withStreamExecutionEnvironment(env).build(); + env.execute(); + + checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT); + + // both expired and active partitions should be compacted into 1 file + List splits = table.newSnapshotReader().read().dataSplits(); + for (DataSplit split : splits) { + assertThat(split.dataFiles().size()).isEqualTo(1); + } + } + private void setFirstRowId(List commitables, long firstRowId) { commitables.forEach( c -> {