Skip to content

[AMORO-4166] [Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM.#4167

Open
slfan1989 wants to merge 5 commits intoapache:masterfrom
slfan1989:amoro-4166
Open

[AMORO-4166] [Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM.#4167
slfan1989 wants to merge 5 commits intoapache:masterfrom
slfan1989:amoro-4166

Conversation

@slfan1989
Copy link
Copy Markdown
Contributor

Why are the changes needed?

Close #4166.

SortedPosDeleteWriter currently only flushes buffered position deletes based on a record count threshold, which can cause OutOfMemoryError (OOM) in memory-constrained environments when processing large-scale delete operations.

There was a TODO comment in the code indicating the need for a heap memory-based flush policy:

// TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the
// threshold.

This PR implements the heap-based flush mechanism to prevent OOM issues by monitoring JVM heap usage and triggering flush when memory pressure is detected, while maintaining backward compatibility.

Brief change log

Core Implementation:

  • SortedPosDeleteWriter:
    • Added HeapUsageProvider interface to monitor JVM heap memory (max/used)
    • Implemented shouldFlushByHeap() method with safety guards (minimum records, valid ratio check)
    • Modified delete() logic to flush when records >= recordsNumThreshold || shouldFlushByHeap()
    • Added multiple constructor overloads to support new parameters while maintaining backward compatibility

Configuration:

  • TableProperties: Added three new table properties:
    • pos-delete.flush.heap.ratio (default: 0.8) - Heap usage ratio threshold
    • pos-delete.flush.records (default: Long.MAX_VALUE) - Record count threshold
    • pos-delete.flush.heap.min-records (default: 1000) - Minimum records before heap flush

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible
    • Added TestSortedPosDeleteWriterHeapFlush with mock HeapUsageProvider to test heap-based flush logic
    • Test case 1: Verify flush is triggered when heap usage exceeds threshold
    • Test case 2: Verify heap flush is disabled when ratio is set to 0

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@github-actions github-actions bot added module:mixed-spark Spark module for Mixed Format module:mixed-hive Hive moduel for Mixed Format labels Apr 5, 2026
Copy link
Copy Markdown
Contributor

@czy006 czy006 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #4167 代码审核

感谢这个改进!整体思路正确,解决了 SortedPosDeleteWriter 在内存受限环境下 OOM 的问题。但以下几个方面需要关注:

关键问题

  1. 默认行为静默变更: 向后兼容构造函数默认传入 0.8 的 heap ratio,导致所有现有用户隐式启用 heap-based flush
  2. 性能开销: shouldFlushByHeap() 每次 delete() 都查询 JVM 内存指标,可能引入 safepoint 开销
  3. 测试覆盖不足: 缺少边界条件测试

改进建议

  1. 构造函数膨胀: 7个构造函数形成 telescoping constructor anti-pattern
  2. 代码重复: UnkeyedUpsertSparkWriter 在 3 个 Spark 版本中完全相同的 property 读取代码
  3. usedMemory() 精确性: totalMemory() - freeMemory() 是粗略估算

详见下方 inline comments。

partitionKey,
DEFAULT_RECORDS_NUM_THRESHOLD);
DEFAULT_RECORDS_NUM_THRESHOLD,
TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] 默认行为静默变更

向后兼容的构造函数默认传入了 POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT = 0.8d,导致所有使用旧构造函数的代码路径隐式启用了 heap-based flush。

原来 recordsNumThreshold = Long.MAX_VALUE 意味着 flush 永远不会由记录数触发。现在默认 ratio=0.8 意味着堆使用率超过80%就会触发 flush。对于 JVM 堆使用率常态在 80% 附近的数据处理任务,这可能导致:

  • 产生大量小 delete 文件
  • 查询性能退化

建议: 向后兼容的构造函数应传 0d(禁用 heap flush),仅当用户显式配置时才启用。例如:

// 保持旧行为:不启用 heap flush
this(..., recordsNumThreshold, 0d, 0);

受影响的旧代码路径包括:

  • MixedTreeNodePosDeleteWriter 的无 properties 构造函数
  • TestTableTracer, TestTaskWriter, MixedDataTestHelpers, TestOrphanFileCleanIceberg 等测试文件


@Override
public long usedMemory() {
return RUNTIME.totalMemory() - RUNTIME.freeMemory();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Improvement] usedMemory() 计算方式的局限性

Runtime.totalMemory() - Runtime.freeMemory() 是 JVM 堆使用量的粗略估算:

  • totalMemory() 是 JVM 当前已分配的堆大小(非最大堆),JVM 启动初期远小于 maxMemory()
  • freeMemory() 是已分配堆中的空闲部分

这意味着在 JVM 堆尚未完全扩展时,usedMemory() 返回值偏小,heap flush 不会及时触发——而这恰恰是最需要保护的时刻。

建议: 在注释中明确说明此计算方式的局限性,或考虑使用 MemoryPoolMXBean 获取更精确的 Eden/Old 区使用数据。

// TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the
// threshold.
if (records >= recordsNumThreshold) {
if (records >= recordsNumThreshold || shouldFlushByHeap()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] 每次 delete() 都调用 shouldFlushByHeap() 的性能开销

当前实现在每次 delete() 调用时都会执行 shouldFlushByHeap(),该方法调用 Runtime.totalMemory()Runtime.freeMemory()。在 JDK 14 之前的版本中,这些方法可能触发 safepoint 操作,高频调用会引入不可忽视的性能开销。

建议: 添加采样机制,避免每次都查询 JVM 内存指标:

// 仅在 records 达到 heapFlushMinRecords 的倍数时检查堆
if (records >= recordsNumThreshold || 
    (records % Math.max(1, heapFlushMinRecords) == 0 && shouldFlushByHeap())) {
    flushDeletes();
}

或者使用一个计数器,每 N 次 delete 才检查一次堆使用率。

}
};

interface HeapUsageProvider {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nitpick] HeapUsageProvider 接口可见性

当前 HeapUsageProvider 是 package-private(无修饰符)。如果未来需要在其他包中使用(如 Spark 模块自定义实现),可能需要改为 public

不过当前设计作为 package-private 是合理的——限制了 API 表面积,测试通过同一包访问。这只是一个提醒。

temp.delete();
}

@Test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Improvement] 测试覆盖不足

当前只有 2 个测试用例,缺少以下边界条件测试:

  1. heapUsageRatioThreshold = 1.0: PR 文档说 >=1 禁用,但没有测试验证
  2. maxMemory() <= 0: 应验证不会触发 flush
  3. heapFlushMinRecords = 0: 边界行为
  4. 多次 flush 循环: flush 后 records 重置为0,再积累到 heapFlushMinRecords 再次触发
  5. recordsNumThreshold 与 heap flush 同时触发: 验证优先级
  6. heapUsageRatioThreshold 为负数: 验证禁用逻辑

建议补充以上测试用例,确保 shouldFlushByHeap() 的所有分支都有覆盖。

this.format = format;
this.schema = schema;
this.writer = writer;
this.heapUsageRatioThreshold =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nitpick] 三个 Spark 版本中完全重复的 property 读取代码

这 15 行 property 读取逻辑在 UnkeyedUpsertSparkWriter 的 v3.3/v3.4/v3.5 三个版本中完全相同(同样的代码在 3 个文件中各出现一次)。

虽然这是 Amoro 多 Spark 版本架构的固有模式,但可以考虑将此逻辑提取到公共工具方法中,减少未来维护成本。例如:

// 在公共模块中
public class FlushConfig {
    public static FlushConfig fromProperties(Map<String, String> properties) { ... }
}

这不是阻塞项,但值得考虑。

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 6, 2026

Codecov Report

❌ Patch coverage is 58.73016% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 29.63%. Comparing base (fe98a6c) to head (30808b6).
⚠️ Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
.../apache/amoro/io/writer/SortedPosDeleteWriter.java 66.66% 7 Missing and 3 partials ⚠️
...e/io/writer/AdaptHiveGenericTaskWriterBuilder.java 0.00% 9 Missing ⚠️
.../amoro/io/writer/MixedTreeNodePosDeleteWriter.java 72.72% 2 Missing and 1 partial ⚠️
.../amoro/optimizing/MixedIcebergRewriteExecutor.java 0.00% 2 Missing ⚠️
...moro/hive/optimizing/MixedHiveRewriteExecutor.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4167      +/-   ##
============================================
- Coverage     29.93%   29.63%   -0.30%     
- Complexity     4229     4243      +14     
============================================
  Files           675      677       +2     
  Lines         53990    54739     +749     
  Branches       6838     6965     +127     
============================================
+ Hits          16161    16224      +63     
- Misses        36636    37306     +670     
- Partials       1193     1209      +16     
Flag Coverage Δ
core 29.63% <58.73%> (-0.30%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

module:mixed-hive Hive moduel for Mixed Format module:mixed-spark Spark module for Mixed Format

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM

3 participants