Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
<tr>
<td><h5>chain-table.chain-partition-keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
</tr>
<tr>
<td><h5>chain-table.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -374,6 +380,12 @@
<td>Integer</td>
<td>Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.</td>
</tr>
<tr>
<td><h5>compaction.skip-expired-partitions</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to skip compacting partitions that are already expired according to 'partition.expiration-time'. Only effective when 'partition.expiration-time' is set and 'partition.expiration-strategy' is 'values-time'.</td>
</tr>
<tr>
<td><h5>compaction.total-size-threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -1187,12 +1199,6 @@
<td>String</td>
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
</tr>
<tr>
<td><h5>chain-table.chain-partition-keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
</tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
31 changes: 31 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,23 @@ public InlineElement getDescription() {
"Whether only overwrite dynamic partition when overwriting a partitioned table with "
+ "dynamic partition columns. Works only when the table has partition keys.");

/** The strategy for partition expiration. */
public enum PartitionExpireStrategy {
VALUES_TIME("values-time"),
UPDATE_TIME("update-time");

private final String value;

PartitionExpireStrategy(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}

public static final ConfigOption<String> PARTITION_EXPIRATION_STRATEGY =
key("partition.expiration-strategy")
.stringType()
Expand Down Expand Up @@ -1168,6 +1185,16 @@ public InlineElement getDescription() {
+ "By default, all partitions to be expired will be expired together, which may cause a risk of out-of-memory. "
+ "Use this parameter to divide partition expiration process and mitigate memory pressure.");

public static final ConfigOption<Boolean> COMPACTION_SKIP_EXPIRED_PARTITIONS =
key("compaction.skip-expired-partitions")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to skip compacting partitions that are already expired "
+ "according to 'partition.expiration-time'. "
+ "Only effective when 'partition.expiration-time' is set "
+ "and 'partition.expiration-strategy' is 'values-time'.");

public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
Expand Down Expand Up @@ -3310,6 +3337,10 @@ public String partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}

public boolean compactionSkipExpiredPartitions() {
return options.get(COMPACTION_SKIP_EXPIRED_PARTITIONS);
}

@Nullable
public String dataFileExternalPaths() {
return options.get(DATA_FILE_EXTERNAL_PATHS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -37,7 +38,9 @@
import java.util.Optional;

/** Strategy for partition expiration. */
public abstract class PartitionExpireStrategy {
public abstract class PartitionExpireStrategy implements Serializable {

private static final long serialVersionUID = 1L;

protected final List<String> partitionKeys;
protected final String partitionDefaultName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand All @@ -43,7 +44,9 @@
import static java.time.temporal.ChronoField.YEAR;

/** Time extractor to extract time from partition values. */
public class PartitionTimeExtractor {
public class PartitionTimeExtractor implements Serializable {

private static final long serialVersionUID = 1L;

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
public class PartitionUpdateTimeExpireStrategy extends PartitionExpireStrategy {

private static final long serialVersionUID = 1L;

public PartitionUpdateTimeExpireStrategy(CoreOptions options, RowType partitionType) {
super(partitionType, options.partitionDefaultName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*/
public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy {

private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.CompactBucketsTable;
Expand Down Expand Up @@ -141,6 +142,26 @@ public DataStreamSource<RowData> build() {
});
dataStream = new DataStreamSource<>(filterStream);
}
CoreOptions coreOptions = table.coreOptions();
if (coreOptions.compactionSkipExpiredPartitions()
&& coreOptions.partitionExpireTime() != null
&& CoreOptions.PartitionExpireStrategy.VALUES_TIME
.toString()
.equals(coreOptions.partitionExpireStrategy())) {
RowType partitionType = table.schema().logicalPartitionType();
Duration expireTime = coreOptions.partitionExpireTime();
PartitionValuesTimeExpireStrategy expireStrategy =
new PartitionValuesTimeExpireStrategy(coreOptions, partitionType);
SingleOutputStreamOperator<RowData> filterStream =
dataStream.filter(
rowData -> {
LocalDateTime expireDateTime =
LocalDateTime.now().minus(expireTime);
BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
return !expireStrategy.isExpired(expireDateTime, partition);
});
dataStream = new DataStreamSource<>(filterStream);
}
Integer parallelism =
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Duration;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -926,6 +928,127 @@ public void testDataEvolutionTableCompact() throws Exception {
assertThat(value).isEqualTo(30000);
}

@Test
@Timeout(60)
public void testSkipExpiredPartitions() throws Exception {
// Use a date far in the past (expired) and today's date (not expired)
String expiredDt =
LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));

Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d");
tableOptions.put(
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString());
tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");

FileStoreTable table =
prepareTable(
Collections.singletonList("dt"),
Arrays.asList("dt", "k"),
Collections.emptyList(),
tableOptions);

// Write two batches to each partition so each has multiple files
writeData(
rowData(1, 100, 15, BinaryString.fromString(expiredDt)),
rowData(1, 100, 15, BinaryString.fromString(activeDt)));

writeData(
rowData(2, 100, 15, BinaryString.fromString(expiredDt)),
rowData(2, 100, 15, BinaryString.fromString(activeDt)));

checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);

CompactAction action =
createAction(
CompactAction.class,
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--table_conf",
CoreOptions.COMPACTION_SKIP_EXPIRED_PARTITIONS.key() + "=true");
StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build();
action.withStreamExecutionEnvironment(env).build();
env.execute();

checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);

List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
for (DataSplit split : splits) {
String dt = split.partition().getString(0).toString();
if (dt.equals(activeDt)) {
// active partition should be compacted into 1 file
assertThat(split.dataFiles().size()).isEqualTo(1);
} else {
// expired partition should be skipped, still has 2 files
assertThat(split.dataFiles().size()).isEqualTo(2);
}
}
}

@Test
@Timeout(60)
public void testNotSkipExpiredPartitionsByDefault() throws Exception {
String expiredDt =
LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));

Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d");
tableOptions.put(
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString());
tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
// COMPACTION_SKIP_EXPIRED_PARTITIONS is not set, default is false

FileStoreTable table =
prepareTable(
Collections.singletonList("dt"),
Arrays.asList("dt", "k"),
Collections.emptyList(),
tableOptions);

writeData(
rowData(1, 100, 15, BinaryString.fromString(expiredDt)),
rowData(1, 100, 15, BinaryString.fromString(activeDt)));

writeData(
rowData(2, 100, 15, BinaryString.fromString(expiredDt)),
rowData(2, 100, 15, BinaryString.fromString(activeDt)));

checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);

CompactAction action =
createAction(
CompactAction.class,
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName);
StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build();
action.withStreamExecutionEnvironment(env).build();
env.execute();

checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);

// both expired and active partitions should be compacted into 1 file
List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
for (DataSplit split : splits) {
assertThat(split.dataFiles().size()).isEqualTo(1);
}
}

private void setFirstRowId(List<CommitMessage> commitables, long firstRowId) {
commitables.forEach(
c -> {
Expand Down
Loading