(do not merge) [FLINK-38613][tests] demo#27635
(do not merge) [FLINK-38613][tests] demo#27635rkhachatryan wants to merge 1 commit intoapache:masterfrom
Conversation
|
|
||
| final List<NumberSequenceSplit> splits = | ||
| splitNumberRange(from, to, enumContext.currentParallelism()); | ||
| splitNumberRange(from, to, 10 /* enumContext.currentParallelism() */); |
There was a problem hiding this comment.
Thanks for the analysis. I reproduced and investigated it further.
The root cause is: on restore, the available splits come from the checkpoint state, not from the new parallelism. For example, first run with parallelism 3 → NumberSequenceSource creates 3 splits, all assigned immediately, checkpoint saves 0 remaining splits. Restore with parallelism 7 → restoreEnumerator() gets 0 splits from checkpoint, so 4 extra subtasks get NoMoreSplits and finish.
I like your Option 3 that mentioned in https://issues.apache.org/jira/browse/FLINK-38613, but scoped to the test only — no production code changes. Subclassed DataGeneratorSource and overrode createEnumerator() to always create at least MAX_SLOTS splits:
private static class TestDataGeneratorSource extends DataGeneratorSource<Long> {
TestDataGeneratorSource() {
super(index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond(5000), Types.LONG);
}
@Override
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>>
createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
NumberSequenceSource source =
new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
@Override
protected List<NumberSequenceSplit> splitNumberRange(
long from, long to, int numSplits) {
return super.splitNumberRange(
from, to, Math.max(numSplits, MAX_SLOTS));
}
};
return source.createEnumerator(enumContext);
}
}WDYT?
No description provided.