Skip to content

[AMORO-3853] Support Java 17 and removing reflective/internal dependencies in Flink modules#4124

Open
xxubai wants to merge 10 commits intoapache:masterfrom
xxubai:java17-experimental
Open

[AMORO-3853] Support Java 17 and removing reflective/internal dependencies in Flink modules#4124
xxubai wants to merge 10 commits intoapache:masterfrom
xxubai:java17-experimental

Conversation

@xxubai
Copy link
Copy Markdown
Contributor

@xxubai xxubai commented Mar 17, 2026

Why are the changes needed?

The mixed Flink modules still had several Java 17 blockers, including hard-coded build/toolchain settings, runtime failures caused by JDK proxy and module encapsulation, and brittle reflection against Flink private internals.

This patch makes the mixed Flink Java 17 path buildable, testable, and packageable again.

Close #3853.

Brief change log

  • Align Java 17 build settings across affected modules and remove hard-coded JDK/toolchain constraints.
  • Reduce Flink-side dependence on private/internal reflection by switching to public APIs or explicit wrappers where possible.
  • Upgrade dependencies more smoothly
  • Prepare for upgrading Iceberg version and Iceberg V3 support

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible
  • Add screenshots for manual tests if appropriate
  • Run test locally before making a pull request

Test commands:

  • ./mvnw -nsu -pl amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common -am test -DfailIfNoTests=false
  • ./mvnw -nsu -pl amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common,amoro-format-mixed/amoro-mixed-flink/v1.17/amoro-mixed-flink-1.17,amoro-format-mixed/amoro-mixed-flink/v1.18/amoro-mixed-flink-1.18 -am -DskipTests package

Documentation

  • Does this pull request introduce a new feature? (yes / no)
    • no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    • not applicable

@github-actions github-actions bot added type:docs Improvements or additions to documentation module:mixed-flink Flink moduel for Mixed Format module:ams-server Ams server module module:mixed-trino trino module for Mixed Format type:infra type:build module:common labels Mar 17, 2026
@xxubai xxubai force-pushed the java17-experimental branch from 2ea94d3 to b6c7e1c Compare March 17, 2026 16:47
@xxubai xxubai marked this pull request as ready for review March 18, 2026 06:16
@xxubai xxubai changed the title [Draft] Support Java 17 and removing reflective/internal dependencies in Flink modules [AMORO-3853] Support Java 17 and removing reflective/internal dependencies in Flink modules Mar 18, 2026
watermarkOutput.emitPeriodicWatermark();
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reflection fallback block (lines 548-560) can be removed entirely.

SplitLocalOutput (the private inner class Flink returns from createOutputForSplit) is still registered in WatermarkOutputMultiplexer at the point releaseOutputForSplit is called — the split is not unregistered until internal.releaseOutputForSplit(splitId) returns. So calling watermarkOutput.emitPeriodicWatermark() at the reader level already covers this split: it iterates all currently-registered split outputs and computes the combined minimum watermark.

Suggested simplification:

private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
  if (splitOutput == null) {
    return;
  }

  if (splitOutput instanceof SourceOutputWithWatermarks) {
    ((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
    return;
  }

  // splitOutput is an internal Flink type (e.g. SplitLocalOutput from
  // ProgressiveTimestampsAndWatermarks) that does not expose
  // SourceOutputWithWatermarks publicly. The reader-level call below is
  // semantically equivalent: it iterates all registered split outputs
  // (this split is still registered until internal.releaseOutputForSplit()
  // returns) and flushes the combined periodic watermark.
  watermarkOutput.emitPeriodicWatermark();
}

This removes the setAccessible(true) call which is exactly the JDK-17 blocker this PR aims to fix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good point. I removed the reflection fallback and simplified it to use watermarkOutput.emitPeriodicWatermark() directly

@xxubai xxubai force-pushed the java17-experimental branch from e77692e to 7907aae Compare April 2, 2026 08:30
@github-actions github-actions bot removed the module:ams-server Ams server module label Apr 2, 2026
Copy link
Copy Markdown
Contributor

@czy006 czy006 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @xxubai


private String generateRocksDBPath(FunctionContext context, String tableName) {
String tmpPath = getTmpDirectoryFromTMContainer(context);
String tmpPath = getTmpDirectory(context);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new getTmpDirectory() silently changes where RocksDB stores its files.

The original implementation read taskmanager.tmp.dirs via TaskManagerRuntimeInfo —
the directories explicitly configured for I/O-heavy workloads, often pointing to
large dedicated disks. The replacement falls back to System.getProperty("java.io.tmpdir"),
which is typically /tmp on the host OS. In a production cluster this can cause:

  • Disk exhaustion on a small system partition
  • RocksDB files landing on the wrong disk tier

The job-parameter approach context.getJobParameter("java.io.tmpdir", null) also
requires every user to manually thread a new job parameter through their submission
config — a silent breaking change for existing deployments.

The reflection was the only blocker here; the surrounding logic was correct.
A cleaner fix that avoids reflection and preserves semantics:

  1. Extend BasicLookupFunction to RichTableFunction (or keep it as is but accept
    a tmp-dir supplier injected at construction time).
  2. In open(), call getRuntimeContext().getTaskManagerRuntimeInfo().getTmpDirectories()
    — this is a fully public API since Flink 1.14, no reflection needed.

Suggested replacement:

private static String getTmpDirectory(FunctionContext context) {
  // FunctionContext wraps a RuntimeContext; access it via the public API
  // introduced in FLINK-17165 (Flink 1.14+).
  try {
    String[] dirs = context.getMetricGroup()
        .getAllVariables()
        .entrySet()
        .stream()
        // ... not viable this way
  } catch (...) { }
  return System.getProperty("java.io.tmpdir");
}

Actually the cleanest solution with zero reflection is to convert BasicLookupFunction
to extend RichFunction directly and access getRuntimeContext() in open():

  @Override
  public void open(Configuration parameters) throws Exception {
    String[] dirs = getRuntimeContext()
        .getTaskManagerRuntimeInfo()
        .getTmpDirectories();
    this.rocksDBTmpDir = dirs[ThreadLocalRandom.current().nextInt(dirs.length)];
    ...
  }

This is a public, stable API — no setAccessible required — and fully restores
the original behavior.

}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlinkInputSplit implements Serializable and already has a public TypeInformation.
Use Flink's public TypeSerializer instead:

import org.apache.flink.api.java.typeutils.TypeExtractor;
...

  TypeSerializer<FlinkInputSplit> serializer =
      TypeExtractor.getForClass(FlinkInputSplit.class)
                   .createSerializer(getRuntimeContext().getExecutionConfig());
  inputSplitsState = context.getOperatorStateStore()
      .getListState(new ListStateDescriptor<>("splits", serializer));

Or, if a simple Kryo-based fallback is acceptable:

new ListStateDescriptor<>("splits", FlinkInputSplit.class)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

module:common module:mixed-flink Flink moduel for Mixed Format module:mixed-trino trino module for Mixed Format type:build type:docs Improvements or additions to documentation type:infra

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement]: Support to build amoro on java 17

3 participants