From c0fc1596018d5b693049201f47ae31c5ffd40424 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 12 Jun 2026 14:44:41 +0800 Subject: [PATCH 1/3] Stabilize region-migration / cluster ITs and enable IoTV2 daily tests Fixes three flaky integration tests surfaced by the nightly Daily IT, plus re-enables and corrects the IoTV2 region-migration daily tests. Flaky-test fixes (all "result visible before the depended-on state is ready" timing issues, not product bugs): * IoTConsensusV2 3C3D testDeleteTimeSeriesReplicaConsistency (stream & batch): in Step 7 the restarted DataNode was only awaited via isAlive() (process up), not until it could serve queries, so the next iteration intermittently hit "Connection refused". Wait until the node is actually queryable. * IoTDBCustomizedClusterIT.testRepeatedlyRestartWholeClusterWithWrite: the cross-replica "SELECT last" comparison was order-sensitive and ran before the cluster converged after a full restart, causing InconsistentDataException. Add ORDER BY TIMESERIES to make the row order deterministic and retry the comparison during the brief convergence window. * Region-migration kill-point framework: in the success path checkKillPointsAllTriggered could fire before the background log-tailer thread had processed the kill-point line of the last phase, failing with "Some kill points was not triggered". Add a best-effort graceWaitForKillPointsTriggered (bounded, swallows the timeout) before the authoritative checkKillPointsAllTriggered, so it does not regress the badKillPoint test (which expects an AssertionError when a kill point never triggers). IoTV2 region-migration daily tests: * Re-enable the ConfigNode / Cluster / DataNode crash tests that were commented out pending discussion (the @Ignore'd PreCheck case is left ignored). * originalCrashDuringAddPeerDone (batch & stream): change failTest -> successTest. Once the add-peer phase is done the new peer already holds the data, so the migration is designed to tolerate the original (source) DataNode crashing afterwards and completes successfully; only the coordinator / destination crash scenarios should fail. All of the above were pre-validated through the per-PR Cluster IT pipeline (the region-migration suite was temporarily run at PR granularity) before restoring it to the DailyIT category. --- ...RegionOperationReliabilityITFramework.java | 31 ++++++++++++ ...onMigrateDataNodeCrashForIoTV2BatchIT.java | 19 ++++---- ...nMigrateDataNodeCrashForIoTV2StreamIT.java | 19 ++++---- ...RegionMigrateClusterCrashIoTV2BatchIT.java | 6 +-- ...ionMigrateConfigNodeCrashIoTV2BatchIT.java | 9 ++-- ...egionMigrateClusterCrashIoTV2StreamIT.java | 9 ++-- ...onMigrateConfigNodeCrashIoTV2StreamIT.java | 9 ++-- .../iotdb/db/it/IoTDBCustomizedClusterIT.java | 46 +++++++++++------- .../IoTDBIoTConsensusV23C3DBasicITBase.java | 48 +++++++++++++++++-- 9 files changed, 137 insertions(+), 59 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java index 9b9c4ad41a1f6..4ab85264fc826 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java @@ -355,6 +355,17 @@ private void generalTestWithAllOptions( Assert.fail(); } + // The kill point is detected by a background thread tailing the node log, so the migration + // result (observed by awaitUntilSuccess above) can become visible before that thread has read + // and processed the kill-point line of the last migration phase (e.g. + // RemoveRegionLocationCache). Give that thread a short grace period to catch up before + // asserting, otherwise checkKillPointsAllTriggered may fail spuriously with "Some kill points + // was not triggered". This is best-effort: the authoritative assertion remains + // checkKillPointsAllTriggered, which still fails the test if a kill point genuinely never + // triggers (e.g. the badKillPoint test). + graceWaitForKillPointsTriggered(configNodeKeywords); + graceWaitForKillPointsTriggered(dataNodeKeywords); + // make sure all kill points have been triggered checkKillPointsAllTriggered(configNodeKeywords); checkKillPointsAllTriggered(dataNodeKeywords); @@ -520,6 +531,26 @@ private static void awaitKillPointsTriggered(KeySetView killPoi Awaitility.await().atMost(2, TimeUnit.MINUTES).until(killPoints::isEmpty); } + /** + * Best-effort wait for all kill points to be triggered. The kill point is detected by a + * background thread tailing the node log, so there can be a short lag between the migration + * result becoming visible and that thread processing the kill-point line. This gives it a brief + * grace period to catch up. Unlike {@link #awaitKillPointsTriggered}, it never throws: the + * authoritative check is {@link #checkKillPointsAllTriggered}, so a kill point that genuinely + * never triggers (e.g. the badKillPoint test) is still caught there as an AssertionError rather + * than masked here. + */ + private static void graceWaitForKillPointsTriggered(KeySetView killPoints) { + if (killPoints.isEmpty()) { + return; + } + try { + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(killPoints::isEmpty); + } catch (ConditionTimeoutException ignored) { + // Fall through to checkKillPointsAllTriggered, which makes the real assertion. + } + } + private static String buildRegionMigrateCommand(int who, int from, int to) { String result = String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to); LOGGER.info(result); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java index ad9738144981e..ea9a40e4699e5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java @@ -25,11 +25,10 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.DailyIT; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - @Category({DailyIT.class}) @RunWith(IoTDBTestRunner.class) public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT @@ -41,7 +40,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT private final int configNodeNum = 1; private final int dataNodeNum = 3; - // @Test + @Test public void coordinatorCrashDuringAddPeerTransition() throws Exception { failTest( 2, @@ -53,7 +52,7 @@ public void coordinatorCrashDuringAddPeerTransition() throws Exception { KillNode.COORDINATOR_DATANODE); } - // @Test + @Test public void coordinatorCrashDuringAddPeerDone() throws Exception { failTest( 2, @@ -69,9 +68,13 @@ public void coordinatorCrashDuringAddPeerDone() throws Exception { // region Original DataNode crash tests - // @Test + @Test public void originalCrashDuringAddPeerDone() throws Exception { - failTest( + // Once the add-peer phase is done, the new peer already holds the data, so the migration is + // designed to tolerate the original (source) DataNode crashing afterwards: it completes + // successfully and merely leaves the region files on the dead node to be cleaned up later. + // Hence this is a successTest, not a failTest. + successTest( 2, 2, 1, @@ -85,7 +88,7 @@ public void originalCrashDuringAddPeerDone() throws Exception { // region Destination DataNode crash tests - // @Test + @Test public void destinationCrashDuringCreateLocalPeer() throws Exception { failTest( 2, @@ -97,7 +100,7 @@ public void destinationCrashDuringCreateLocalPeer() throws Exception { KillNode.DESTINATION_DATANODE); } - // @Test + @Test public void destinationCrashDuringAddPeerDone() throws Exception { failTest( 2, diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java index eeca6dacc1998..a8aa62f81b0ca 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java @@ -28,11 +28,10 @@ import org.apache.iotdb.itbase.category.DailyIT; import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - @Category({DailyIT.class}) @RunWith(IoTDBTestRunner.class) public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT @@ -54,7 +53,7 @@ public void setUp() throws Exception { .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE); } - // @Test + @Test public void coordinatorCrashDuringAddPeerTransition() throws Exception { failTest( 2, @@ -66,7 +65,7 @@ public void coordinatorCrashDuringAddPeerTransition() throws Exception { KillNode.COORDINATOR_DATANODE); } - // @Test + @Test public void coordinatorCrashDuringAddPeerDone() throws Exception { failTest( 2, @@ -82,9 +81,13 @@ public void coordinatorCrashDuringAddPeerDone() throws Exception { // region Original DataNode crash tests - // @Test + @Test public void originalCrashDuringAddPeerDone() throws Exception { - failTest( + // Once the add-peer phase is done, the new peer already holds the data, so the migration is + // designed to tolerate the original (source) DataNode crashing afterwards: it completes + // successfully and merely leaves the region files on the dead node to be cleaned up later. + // Hence this is a successTest, not a failTest. + successTest( 2, 2, 1, @@ -98,7 +101,7 @@ public void originalCrashDuringAddPeerDone() throws Exception { // region Destination DataNode crash tests - // @Test + @Test public void destinationCrashDuringCreateLocalPeer() throws Exception { failTest( 2, @@ -110,7 +113,7 @@ public void destinationCrashDuringCreateLocalPeer() throws Exception { KillNode.DESTINATION_DATANODE); } - // @Test + @Test public void destinationCrashDuringAddPeerDone() throws Exception { failTest( 2, diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java index a276acc4d0123..6a74d99c09672 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java @@ -39,8 +39,7 @@ public void clusterCrash1() throws Exception { killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash2() throws Exception { killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false); } @@ -60,8 +59,7 @@ public void clusterCrash6() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash7() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), true); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java index bc4f477b6bd84..853d834966614 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java @@ -66,8 +66,7 @@ public void cnCrashDuringCreatePeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void testCnCrashDuringDoAddPeer() throws Exception { successTest( 1, @@ -127,8 +126,7 @@ public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { successTest( 1, @@ -140,8 +138,7 @@ public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashTest() throws Exception { ConcurrentHashMap.KeySetView killConfigNodeKeywords = noKillPoints(); killConfigNodeKeywords.addAll( diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java index 384f5e61dd76e..5f0f2fe3cee4f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java @@ -47,14 +47,12 @@ public void setUp() throws Exception { .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash1() throws Exception { killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash2() throws Exception { killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false); } @@ -74,8 +72,7 @@ public void clusterCrash6() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash7() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), true); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java index 39b5953de4a15..f29482811f0a0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java @@ -80,8 +80,7 @@ public void cnCrashDuringCreatePeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void testCnCrashDuringDoAddPeer() throws Exception { successTest( 1, @@ -141,8 +140,7 @@ public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { successTest( 1, @@ -154,8 +152,7 @@ public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashTest() throws Exception { ConcurrentHashMap.KeySetView killConfigNodeKeywords = noKillPoints(); killConfigNodeKeywords.addAll( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 74391b99bb3c4..d0df5d5662f5b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -30,6 +30,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; +import org.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -64,22 +65,35 @@ public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { testRepeatedlyRestartWholeCluster( (s, i, env) -> { if (i != 0) { - ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**"); - ResultSetMetaData metaData = resultSet.getMetaData(); - assertEquals(4, metaData.getColumnCount()); - int cnt = 0; - while (resultSet.next()) { - cnt++; - StringBuilder result = new StringBuilder(); - for (int j = 0; j < metaData.getColumnCount(); j++) { - result - .append(metaData.getColumnName(j + 1)) - .append(":") - .append(resultSet.getString(j + 1)) - .append(","); - } - System.out.println(result); - } + // This query is fanned out to every DataNode and the results are compared across + // replicas. Right after a restart the last cache on each coordinator is reloaded + // lazily, so the cross-replica comparison may transiently observe an inconsistent + // result (e.g. different row order) until the cluster converges. ORDER BY TIMESERIES + // makes the row order deterministic across coordinators (the root cause of the observed + // flakiness), and the retry tolerates the brief convergence window without masking a + // genuine, persistent inconsistency. + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (ResultSet resultSet = + s.executeQuery("SELECT last s1 FROM root.** ORDER BY TIMESERIES ASC")) { + ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(4, metaData.getColumnCount()); + while (resultSet.next()) { + StringBuilder result = new StringBuilder(); + for (int j = 0; j < metaData.getColumnCount(); j++) { + result + .append(metaData.getColumnName(j + 1)) + .append(":") + .append(resultSet.getString(j + 1)) + .append(","); + } + System.out.println(result); + } + } + }); } s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)"); s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)"); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index ec04aab39bd7b..e07e6e4ebc138 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -342,11 +342,12 @@ public void testDeleteTimeSeriesReplicaConsistency() throws Exception { // Restart the stopped node before moving to the next iteration LOGGER.info("Restarting {}", stoppedDesc); stoppedNode.start(); - // Wait for the restarted node to rejoin - Awaitility.await() - .atMost(120, TimeUnit.SECONDS) - .pollInterval(2, TimeUnit.SECONDS) - .until(stoppedNode::isAlive); + // Wait for the restarted node to actually be able to serve queries again, not just for + // its process to be up. The next loop iteration will treat this node as a surviving node + // and connect to it, so if we only waited for isAlive() (process started) the node might + // still be in startup (RPC port not yet open / not registered), causing a spurious + // "Connection refused" failure. + waitUntilDataNodeQueryable(stoppedNode, stoppedDesc); } } @@ -356,6 +357,43 @@ public void testDeleteTimeSeriesReplicaConsistency() throws Exception { } } + /** + * Wait until the given DataNode can actually serve queries again after a restart. A node's + * process being alive ({@link DataNodeWrapper#isAlive()}) does not mean its client RPC service is + * open and it has rejoined the cluster, so we poll a real connection plus a trivial query until + * it succeeds. + */ + private void waitUntilDataNodeQueryable(DataNodeWrapper node, String nodeDesc) { + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + if (!node.isAlive()) { + return false; + } + try (Connection conn = + EnvFactory.getEnv() + .getConnection( + node, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TREE_SQL_DIALECT); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SHOW_TIMESERIES_D1)) { + // Drain the result set to make sure the query fully executes. + while (rs.next()) { + // no-op + } + return true; + } catch (Exception e) { + LOGGER.info("{} not queryable yet, retrying: {}", nodeDesc, e.getMessage()); + return false; + } + }); + } + /** * Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and * that data queries do not return the deleted timeseries. From 2648b6b123e877396d9a11dfd71687f35ee5e65f Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 12 Jun 2026 15:49:34 +0800 Subject: [PATCH 2/3] Address review: make the cross-replica retry actually retry InconsistentDataException is a RuntimeException thrown from getString(), and Awaitility's untilAsserted() only retries on AssertionError by default, so the retry never covered the described flaky failure (only ORDER BY TIMESERIES did). Add ignoreExceptions() so the retry genuinely tolerates a transient cross-replica inconsistency during the post-restart convergence window. --- .../iotdb/db/it/IoTDBCustomizedClusterIT.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index d0df5d5662f5b..13a8a8ed81202 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -68,13 +68,19 @@ public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { // This query is fanned out to every DataNode and the results are compared across // replicas. Right after a restart the last cache on each coordinator is reloaded // lazily, so the cross-replica comparison may transiently observe an inconsistent - // result (e.g. different row order) until the cluster converges. ORDER BY TIMESERIES - // makes the row order deterministic across coordinators (the root cause of the observed - // flakiness), and the retry tolerates the brief convergence window without masking a - // genuine, persistent inconsistency. + // result until the cluster converges. ORDER BY TIMESERIES makes the row order + // deterministic across coordinators (the root cause of the observed flakiness), and the + // retry tolerates the brief convergence window (e.g. a replica that has not finished + // recovering yet) without masking a genuine, persistent inconsistency. + // + // ignoreExceptions() is required: a mismatch surfaces as InconsistentDataException + // (a RuntimeException) thrown from getString(), and untilAsserted() only retries on + // AssertionError by default, so without it the retry would not actually cover this + // failure. Awaitility.await() .atMost(60, TimeUnit.SECONDS) .pollInterval(2, TimeUnit.SECONDS) + .ignoreExceptions() .untilAsserted( () -> { try (ResultSet resultSet = From 483a4ce0de7036c7d569aa6532cd66b0b84b9f79 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 12 Jun 2026 16:00:40 +0800 Subject: [PATCH 3/3] Address review: target InconsistentDataException in the retry Per review feedback, narrow the retry to only InconsistentDataException via ignoreExceptionsMatching, so a genuine error (e.g. a real SQLException) still fails fast instead of being retried for up to 60s. --- .../apache/iotdb/db/it/IoTDBCustomizedClusterIT.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 13a8a8ed81202..42d91eb74092b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -25,6 +25,7 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.DailyIT; +import org.apache.iotdb.itbase.exception.InconsistentDataException; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -73,14 +74,15 @@ public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { // retry tolerates the brief convergence window (e.g. a replica that has not finished // recovering yet) without masking a genuine, persistent inconsistency. // - // ignoreExceptions() is required: a mismatch surfaces as InconsistentDataException - // (a RuntimeException) thrown from getString(), and untilAsserted() only retries on - // AssertionError by default, so without it the retry would not actually cover this - // failure. + // ignoreExceptionsMatching(InconsistentDataException) is required: a mismatch surfaces + // as InconsistentDataException (a RuntimeException) thrown from getString(), and + // untilAsserted() only retries on AssertionError by default, so without it the retry + // would not actually cover this failure. We match only InconsistentDataException so a + // genuine error (e.g. a real SQLException) still fails fast instead of being retried. Awaitility.await() .atMost(60, TimeUnit.SECONDS) .pollInterval(2, TimeUnit.SECONDS) - .ignoreExceptions() + .ignoreExceptionsMatching(e -> e instanceof InconsistentDataException) .untilAsserted( () -> { try (ResultSet resultSet =