Subscription: Drain TsFile batches before termination#17926
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17926 +/- ##
============================================
- Coverage 41.07% 41.06% -0.02%
Complexity 318 318
============================================
Files 5248 5257 +9
Lines 363998 365040 +1042
Branches 47026 47187 +161
============================================
+ Hits 149523 149899 +376
- Misses 214475 215141 +666 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Caideyipi
left a comment
There was a problem hiding this comment.
I found two issues that seem worth fixing before merge:
-
tryCommitCurrentTerminateEvent()checksprefetchingQueue.isEmpty()andinFlightEvents.isEmpty()non-atomically.poll()/pollV2()remove an event fromprefetchingQueuebefore putting it intoinFlightEvents, and both polling and prefetch execution only hold the read lock, so they can run concurrently. If terminate draining runs in that transition window, the terminate event can still be committed while the last batch has just been polled but not yet tracked as in-flight. That preserves the early-completion race this PR is trying to fix. Please make the poll transition and terminate-commit check mutually exclusive, or include an explicit "poll transition in progress" state in the commit condition. -
For tablet queues, a pending
currentTerminateEventis retried only from insidetryPrefetch(), butexecutePrefetch()callstryPrefetch()only whenstates.shouldPrefetch()is true. If normal prefetching is throttled or disabled while the terminate event is waiting for the queue to drain, completion can be delayed until another poll path happens. The terminate-drain path should bypass the normal prefetch heuristic oncecurrentTerminateEventis set.
|
Addressed Caideyipi's review comments in
Validation:
|
|
Added an integration test for the termination-drain scenario requested in review.\n\nWhat it covers:\n- table-model snapshot topic with TsFile subscription format\n- forced TsFile parsing via start-time so data is buffered in SubscriptionPipeTsFileEventBatch\n- large TsFile batch delay/size thresholds so snapshot rows stay pending until the terminate event arrives\n- a background consumer continuously drains and commits all emitted TsFile messages\n- the test asserts the subscription auto-completes and receiver data contains the expected snapshot rows\n\nVerification:\n |
|
|
CI follow-up:\n\n- Investigated the previous |



Motivation
When export-tsfile uses subscription/pipe to export TsFiles, a PipeTerminateEvent could mark the snapshot topic completed before the final internal TsFile batch was emitted and consumed. The client could then receive termination, clean up the topic, and miss the last batch of data.
Modifications
Tests
mvn spotless:apply -pl iotdb-core/datanodegit diff --checkmvn compile -pl iotdb-core/datanode(fails in unrelated existing/generated dependency areas: missingPipePeriodicalLogReducer, relational grammaridentifier(),CalcMessages, andIndexedBlockingReserveQueueAPI mismatches; modified files are not reported in the compilation errors)