[AMORO-3853] Support Java 17 and removing reflective/internal dependencies in Flink modules#4124
[AMORO-3853] Support Java 17 and removing reflective/internal dependencies in Flink modules#4124xxubai wants to merge 10 commits intoapache:masterfrom
Conversation
2ea94d3 to
b6c7e1c
Compare
| watermarkOutput.emitPeriodicWatermark(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The reflection fallback block (lines 548-560) can be removed entirely.
SplitLocalOutput (the private inner class Flink returns from createOutputForSplit) is still registered in WatermarkOutputMultiplexer at the point releaseOutputForSplit is called — the split is not unregistered until internal.releaseOutputForSplit(splitId) returns. So calling watermarkOutput.emitPeriodicWatermark() at the reader level already covers this split: it iterates all currently-registered split outputs and computes the combined minimum watermark.
Suggested simplification:
private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
if (splitOutput == null) {
return;
}
if (splitOutput instanceof SourceOutputWithWatermarks) {
((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
return;
}
// splitOutput is an internal Flink type (e.g. SplitLocalOutput from
// ProgressiveTimestampsAndWatermarks) that does not expose
// SourceOutputWithWatermarks publicly. The reader-level call below is
// semantically equivalent: it iterates all registered split outputs
// (this split is still registered until internal.releaseOutputForSplit()
// returns) and flushes the combined periodic watermark.
watermarkOutput.emitPeriodicWatermark();
}This removes the setAccessible(true) call which is exactly the JDK-17 blocker this PR aims to fix.
There was a problem hiding this comment.
Thanks, good point. I removed the reflection fallback and simplified it to use watermarkOutput.emitPeriodicWatermark() directly
…istency in time calculations
e77692e to
7907aae
Compare
|
|
||
| private String generateRocksDBPath(FunctionContext context, String tableName) { | ||
| String tmpPath = getTmpDirectoryFromTMContainer(context); | ||
| String tmpPath = getTmpDirectory(context); |
There was a problem hiding this comment.
The new getTmpDirectory() silently changes where RocksDB stores its files.
The original implementation read taskmanager.tmp.dirs via TaskManagerRuntimeInfo —
the directories explicitly configured for I/O-heavy workloads, often pointing to
large dedicated disks. The replacement falls back to System.getProperty("java.io.tmpdir"),
which is typically /tmp on the host OS. In a production cluster this can cause:
- Disk exhaustion on a small system partition
- RocksDB files landing on the wrong disk tier
The job-parameter approach context.getJobParameter("java.io.tmpdir", null) also
requires every user to manually thread a new job parameter through their submission
config — a silent breaking change for existing deployments.
The reflection was the only blocker here; the surrounding logic was correct.
A cleaner fix that avoids reflection and preserves semantics:
- Extend BasicLookupFunction to RichTableFunction (or keep it as is but accept
a tmp-dir supplier injected at construction time). - In open(), call getRuntimeContext().getTaskManagerRuntimeInfo().getTmpDirectories()
— this is a fully public API since Flink 1.14, no reflection needed.
Suggested replacement:
private static String getTmpDirectory(FunctionContext context) {
// FunctionContext wraps a RuntimeContext; access it via the public API
// introduced in FLINK-17165 (Flink 1.14+).
try {
String[] dirs = context.getMetricGroup()
.getAllVariables()
.entrySet()
.stream()
// ... not viable this way
} catch (...) { }
return System.getProperty("java.io.tmpdir");
}
Actually the cleanest solution with zero reflection is to convert BasicLookupFunction
to extend RichFunction directly and access getRuntimeContext() in open():
@Override
public void open(Configuration parameters) throws Exception {
String[] dirs = getRuntimeContext()
.getTaskManagerRuntimeInfo()
.getTmpDirectories();
this.rocksDBTmpDir = dirs[ThreadLocalRandom.current().nextInt(dirs.length)];
...
}
This is a public, stable API — no setAccessible required — and fully restores
the original behavior.
| } | ||
|
|
||
| @Override | ||
| public void initializeState(StateInitializationContext context) throws Exception { |
There was a problem hiding this comment.
FlinkInputSplit implements Serializable and already has a public TypeInformation.
Use Flink's public TypeSerializer instead:
import org.apache.flink.api.java.typeutils.TypeExtractor;
...
TypeSerializer<FlinkInputSplit> serializer =
TypeExtractor.getForClass(FlinkInputSplit.class)
.createSerializer(getRuntimeContext().getExecutionConfig());
inputSplitsState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("splits", serializer));
Or, if a simple Kryo-based fallback is acceptable:
new ListStateDescriptor<>("splits", FlinkInputSplit.class)
Why are the changes needed?
The mixed Flink modules still had several Java 17 blockers, including hard-coded build/toolchain settings, runtime failures caused by JDK proxy and module encapsulation, and brittle reflection against Flink private internals.
This patch makes the mixed Flink Java 17 path buildable, testable, and packageable again.
Close #3853.
Brief change log
How was this patch tested?
Test commands:
./mvnw -nsu -pl amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common -am test -DfailIfNoTests=false./mvnw -nsu -pl amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common,amoro-format-mixed/amoro-mixed-flink/v1.17/amoro-mixed-flink-1.17,amoro-format-mixed/amoro-mixed-flink/v1.18/amoro-mixed-flink-1.18 -am -DskipTests packageDocumentation