Skip to content

Comments

(do not merge) [FLINK-38613][tests] demo#27635

Draft
rkhachatryan wants to merge 1 commit intoapache:masterfrom
rkhachatryan:f38613
Draft

(do not merge) [FLINK-38613][tests] demo#27635
rkhachatryan wants to merge 1 commit intoapache:masterfrom
rkhachatryan:f38613

Conversation

@rkhachatryan
Copy link
Contributor

@rkhachatryan rkhachatryan commented Feb 18, 2026

No description provided.

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 18, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build


final List<NumberSequenceSplit> splits =
splitNumberRange(from, to, enumContext.currentParallelism());
splitNumberRange(from, to, 10 /* enumContext.currentParallelism() */);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the analysis. I reproduced and investigated it further.

The root cause is: on restore, the available splits come from the checkpoint state, not from the new parallelism. For example, first run with parallelism 3 → NumberSequenceSource creates 3 splits, all assigned immediately, checkpoint saves 0 remaining splits. Restore with parallelism 7 → restoreEnumerator() gets 0 splits from checkpoint, so 4 extra subtasks get NoMoreSplits and finish.

I like your Option 3 that mentioned in https://issues.apache.org/jira/browse/FLINK-38613, but scoped to the test only — no production code changes. Subclassed DataGeneratorSource and overrode createEnumerator() to always create at least MAX_SLOTS splits:

private static class TestDataGeneratorSource extends DataGeneratorSource<Long> {
    TestDataGeneratorSource() {
        super(index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond(5000), Types.LONG);
    }

    @Override
    public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>>
            createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
        NumberSequenceSource source =
                new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
                    @Override
                    protected List<NumberSequenceSplit> splitNumberRange(
                            long from, long to, int numSplits) {
                        return super.splitNumberRange(
                                from, to, Math.max(numSplits, MAX_SLOTS));
                    }
                };
        return source.createEnumerator(enumContext);
    }
}

WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants