Skip to content

Subscription: Drain TsFile batches before termination#17926

Open
VGalaxies wants to merge 4 commits into
masterfrom
fix/export-tsfile-drain-before-termination
Open

Subscription: Drain TsFile batches before termination#17926
VGalaxies wants to merge 4 commits into
masterfrom
fix/export-tsfile-drain-before-termination

Conversation

@VGalaxies

Copy link
Copy Markdown
Contributor

Motivation

When export-tsfile uses subscription/pipe to export TsFiles, a PipeTerminateEvent could mark the snapshot topic completed before the final internal TsFile batch was emitted and consumed. The client could then receive termination, clean up the topic, and miss the last batch of data.

Modifications

  • Hold PipeTerminateEvent in the subscription prefetching queue until pending batches are emitted.
  • Add a forced batch emit path for termination draining.
  • Commit the terminate event only after the prefetched queue and in-flight subscription events are empty.

Tests

  • mvn spotless:apply -pl iotdb-core/datanode
  • git diff --check
  • mvn compile -pl iotdb-core/datanode (fails in unrelated existing/generated dependency areas: missing PipePeriodicalLogReducer, relational grammar identifier(), CalcMessages, and IndexedBlockingReserveQueue API mismatches; modified files are not reported in the compilation errors)

@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 0% with 82 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.06%. Comparing base (72e72dd) to head (ddd0921).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...scription/broker/SubscriptionPrefetchingQueue.java 0.00% 55 Missing ⚠️
...tion/event/batch/SubscriptionPipeEventBatches.java 0.00% 18 Missing ⚠️
...iption/event/batch/SubscriptionPipeEventBatch.java 0.00% 9 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17926      +/-   ##
============================================
- Coverage     41.07%   41.06%   -0.02%     
  Complexity      318      318              
============================================
  Files          5248     5257       +9     
  Lines        363998   365040    +1042     
  Branches      47026    47187     +161     
============================================
+ Hits         149523   149899     +376     
- Misses       214475   215141     +666     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Caideyipi Caideyipi left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two issues that seem worth fixing before merge:

  1. tryCommitCurrentTerminateEvent() checks prefetchingQueue.isEmpty() and inFlightEvents.isEmpty() non-atomically. poll()/pollV2() remove an event from prefetchingQueue before putting it into inFlightEvents, and both polling and prefetch execution only hold the read lock, so they can run concurrently. If terminate draining runs in that transition window, the terminate event can still be committed while the last batch has just been polled but not yet tracked as in-flight. That preserves the early-completion race this PR is trying to fix. Please make the poll transition and terminate-commit check mutually exclusive, or include an explicit "poll transition in progress" state in the commit condition.

  2. For tablet queues, a pending currentTerminateEvent is retried only from inside tryPrefetch(), but executePrefetch() calls tryPrefetch() only when states.shouldPrefetch() is true. If normal prefetching is throttled or disabled while the terminate event is waiting for the queue to drain, completion can be delayed until another poll path happens. The terminate-drain path should bypass the normal prefetch heuristic once currentTerminateEvent is set.

@VGalaxies

Copy link
Copy Markdown
Contributor Author

Addressed Caideyipi's review comments in cfdf8dacc1:

  1. Made the prefetch-to-in-flight poll transition mutually exclusive with the terminate commit check by moving the transition into a synchronized helper and making tryCommitCurrentTerminateEvent() synchronized as well. This removes the window where a polled event is no longer in prefetchingQueue but not yet visible in inFlightEvents.

  2. Updated executePrefetch() so a pending currentTerminateEvent bypasses the normal states.shouldPrefetch() heuristic and keeps retrying the drain/commit path directly.

Validation:

  • mvn spotless:apply -pl iotdb-core/datanode
  • git diff --check
  • mvn compile -pl iotdb-core/datanode still fails with the same unrelated dependency/generated-source mismatches (PipePeriodicalLogReducer, relational grammar identifier(), CalcMessages, IndexedBlockingReserveQueue, etc.); the modified file is not reported in the compilation errors.

@VGalaxies

Copy link
Copy Markdown
Contributor Author

Added an integration test for the termination-drain scenario requested in review.\n\nWhat it covers:\n- table-model snapshot topic with TsFile subscription format\n- forced TsFile parsing via start-time so data is buffered in SubscriptionPipeTsFileEventBatch\n- large TsFile batch delay/size thresholds so snapshot rows stay pending until the terminate event arrives\n- a background consumer continuously drains and commits all emitted TsFile messages\n- the test asserts the subscription auto-completes and receiver data contains the expected snapshot rows\n\nVerification:\nmvn verify -DskipUTs -Dit.test=org.apache.iotdb.subscription.it.dual.tablemodel.IoTDBSubscriptionTopicIT#testTsFileSnapshotDrainsPendingBatchBeforeTermination -DfailIfNoTests=false -Dfailsafe.failIfNoSpecifiedTests=false -pl integration-test -am -P MultiClusterIT2SubscriptionTableArchVerification -P with-integration-tests\n\nResult: BUILD SUCCESS, Tests run: 1, Failures: 0, Errors: 0, Skipped: 0.

@sonarqubecloud

Copy link
Copy Markdown

@VGalaxies

Copy link
Copy Markdown
Contributor Author

CI follow-up:\n\n- Investigated the previous dual-tree-auto-enhanced (17, HighPerformanceMode, HighPerformanceMode, ubuntu-latest, 0) failure. It failed in IoTDBPipeClusterIT.testRegisteringNewDataNodeAfterTransferringData with inconsistent row counts across DataNodes, which is unrelated to the subscription TsFile termination-drain change.\n- Pushed ddd09219b2 to address the Sonar/checkstyle findings on this PR: removed the non-primitive volatile field access pattern for currentTerminateEvent, fixed the long log line, and placed overloaded methods next to each other.\n- Local verification passed:\n - mvn spotless:apply -pl iotdb-core/datanode\n - mvn checkstyle:check -pl iotdb-core/datanode\n - mvn verify -DskipUTs -Dit.test=org.apache.iotdb.subscription.it.dual.tablemodel.IoTDBSubscriptionTopicIT#testTsFileSnapshotDrainsPendingBatchBeforeTermination -DfailIfNoTests=false -Dfailsafe.failIfNoSpecifiedTests=false -pl integration-test -am -P MultiClusterIT2SubscriptionTableArchVerification -P with-integration-tests\n- The new CI run on ddd09219b2 has already passed SonarCloud Code Analysis and subscription-table-arch-verification; the remaining large matrix jobs and Codecov are still pending.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants