[AMORO-4166] [Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM.#4167
[AMORO-4166] [Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM.#4167slfan1989 wants to merge 5 commits intoapache:masterfrom
Conversation
…SortedPosDeleteWriter to prevent OOM.
…SortedPosDeleteWriter to prevent OOM.
czy006
left a comment
There was a problem hiding this comment.
PR #4167 代码审核
感谢这个改进!整体思路正确,解决了 SortedPosDeleteWriter 在内存受限环境下 OOM 的问题。但以下几个方面需要关注:
关键问题
- 默认行为静默变更: 向后兼容构造函数默认传入
0.8的 heap ratio,导致所有现有用户隐式启用 heap-based flush - 性能开销:
shouldFlushByHeap()每次delete()都查询 JVM 内存指标,可能引入 safepoint 开销 - 测试覆盖不足: 缺少边界条件测试
改进建议
- 构造函数膨胀: 7个构造函数形成 telescoping constructor anti-pattern
- 代码重复:
UnkeyedUpsertSparkWriter在 3 个 Spark 版本中完全相同的 property 读取代码 - usedMemory() 精确性:
totalMemory() - freeMemory()是粗略估算
详见下方 inline comments。
| partitionKey, | ||
| DEFAULT_RECORDS_NUM_THRESHOLD); | ||
| DEFAULT_RECORDS_NUM_THRESHOLD, | ||
| TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT, |
There was a problem hiding this comment.
[Critical] 默认行为静默变更
向后兼容的构造函数默认传入了 POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT = 0.8d,导致所有使用旧构造函数的代码路径隐式启用了 heap-based flush。
原来 recordsNumThreshold = Long.MAX_VALUE 意味着 flush 永远不会由记录数触发。现在默认 ratio=0.8 意味着堆使用率超过80%就会触发 flush。对于 JVM 堆使用率常态在 80% 附近的数据处理任务,这可能导致:
- 产生大量小 delete 文件
- 查询性能退化
建议: 向后兼容的构造函数应传 0d(禁用 heap flush),仅当用户显式配置时才启用。例如:
// 保持旧行为:不启用 heap flush
this(..., recordsNumThreshold, 0d, 0);受影响的旧代码路径包括:
MixedTreeNodePosDeleteWriter的无 properties 构造函数TestTableTracer,TestTaskWriter,MixedDataTestHelpers,TestOrphanFileCleanIceberg等测试文件
|
|
||
| @Override | ||
| public long usedMemory() { | ||
| return RUNTIME.totalMemory() - RUNTIME.freeMemory(); |
There was a problem hiding this comment.
[Improvement] usedMemory() 计算方式的局限性
Runtime.totalMemory() - Runtime.freeMemory() 是 JVM 堆使用量的粗略估算:
totalMemory()是 JVM 当前已分配的堆大小(非最大堆),JVM 启动初期远小于maxMemory()freeMemory()是已分配堆中的空闲部分
这意味着在 JVM 堆尚未完全扩展时,usedMemory() 返回值偏小,heap flush 不会及时触发——而这恰恰是最需要保护的时刻。
建议: 在注释中明确说明此计算方式的局限性,或考虑使用 MemoryPoolMXBean 获取更精确的 Eden/Old 区使用数据。
| // TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the | ||
| // threshold. | ||
| if (records >= recordsNumThreshold) { | ||
| if (records >= recordsNumThreshold || shouldFlushByHeap()) { |
There was a problem hiding this comment.
[Critical] 每次 delete() 都调用 shouldFlushByHeap() 的性能开销
当前实现在每次 delete() 调用时都会执行 shouldFlushByHeap(),该方法调用 Runtime.totalMemory() 和 Runtime.freeMemory()。在 JDK 14 之前的版本中,这些方法可能触发 safepoint 操作,高频调用会引入不可忽视的性能开销。
建议: 添加采样机制,避免每次都查询 JVM 内存指标:
// 仅在 records 达到 heapFlushMinRecords 的倍数时检查堆
if (records >= recordsNumThreshold ||
(records % Math.max(1, heapFlushMinRecords) == 0 && shouldFlushByHeap())) {
flushDeletes();
}或者使用一个计数器,每 N 次 delete 才检查一次堆使用率。
| } | ||
| }; | ||
|
|
||
| interface HeapUsageProvider { |
There was a problem hiding this comment.
[Nitpick] HeapUsageProvider 接口可见性
当前 HeapUsageProvider 是 package-private(无修饰符)。如果未来需要在其他包中使用(如 Spark 模块自定义实现),可能需要改为 public。
不过当前设计作为 package-private 是合理的——限制了 API 表面积,测试通过同一包访问。这只是一个提醒。
| temp.delete(); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
[Improvement] 测试覆盖不足
当前只有 2 个测试用例,缺少以下边界条件测试:
heapUsageRatioThreshold = 1.0: PR 文档说>=1禁用,但没有测试验证maxMemory() <= 0: 应验证不会触发 flushheapFlushMinRecords = 0: 边界行为- 多次 flush 循环: flush 后
records重置为0,再积累到heapFlushMinRecords再次触发 recordsNumThreshold与 heap flush 同时触发: 验证优先级heapUsageRatioThreshold为负数: 验证禁用逻辑
建议补充以上测试用例,确保 shouldFlushByHeap() 的所有分支都有覆盖。
| this.format = format; | ||
| this.schema = schema; | ||
| this.writer = writer; | ||
| this.heapUsageRatioThreshold = |
There was a problem hiding this comment.
[Nitpick] 三个 Spark 版本中完全重复的 property 读取代码
这 15 行 property 读取逻辑在 UnkeyedUpsertSparkWriter 的 v3.3/v3.4/v3.5 三个版本中完全相同(同样的代码在 3 个文件中各出现一次)。
虽然这是 Amoro 多 Spark 版本架构的固有模式,但可以考虑将此逻辑提取到公共工具方法中,减少未来维护成本。例如:
// 在公共模块中
public class FlushConfig {
public static FlushConfig fromProperties(Map<String, String> properties) { ... }
}这不是阻塞项,但值得考虑。
…SortedPosDeleteWriter to prevent OOM.
…SortedPosDeleteWriter to prevent OOM.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #4167 +/- ##
============================================
- Coverage 29.93% 29.63% -0.30%
- Complexity 4229 4243 +14
============================================
Files 675 677 +2
Lines 53990 54739 +749
Branches 6838 6965 +127
============================================
+ Hits 16161 16224 +63
- Misses 36636 37306 +670
- Partials 1193 1209 +16
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Why are the changes needed?
Close #4166.
SortedPosDeleteWritercurrently only flushes buffered position deletes based on a record count threshold, which can cause OutOfMemoryError (OOM) in memory-constrained environments when processing large-scale delete operations.There was a TODO comment in the code indicating the need for a heap memory-based flush policy:
This PR implements the heap-based flush mechanism to prevent OOM issues by monitoring JVM heap usage and triggering flush when memory pressure is detected, while maintaining backward compatibility.
Brief change log
Core Implementation:
HeapUsageProviderinterface to monitor JVM heap memory (max/used)shouldFlushByHeap()method with safety guards (minimum records, valid ratio check)delete()logic to flush whenrecords >= recordsNumThreshold || shouldFlushByHeap()Configuration:
pos-delete.flush.heap.ratio(default: 0.8) - Heap usage ratio thresholdpos-delete.flush.records(default: Long.MAX_VALUE) - Record count thresholdpos-delete.flush.heap.min-records(default: 1000) - Minimum records before heap flushHow was this patch tested?
TestSortedPosDeleteWriterHeapFlushwith mockHeapUsageProviderto test heap-based flush logicDocumentation