Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,17 @@ private void generalTestWithAllOptions(
Assert.fail();
}

// The kill point is detected by a background thread tailing the node log, so the migration
// result (observed by awaitUntilSuccess above) can become visible before that thread has read
// and processed the kill-point line of the last migration phase (e.g.
// RemoveRegionLocationCache). Give that thread a short grace period to catch up before
// asserting, otherwise checkKillPointsAllTriggered may fail spuriously with "Some kill points
// was not triggered". This is best-effort: the authoritative assertion remains
// checkKillPointsAllTriggered, which still fails the test if a kill point genuinely never
// triggers (e.g. the badKillPoint test).
graceWaitForKillPointsTriggered(configNodeKeywords);
graceWaitForKillPointsTriggered(dataNodeKeywords);

// make sure all kill points have been triggered
checkKillPointsAllTriggered(configNodeKeywords);
checkKillPointsAllTriggered(dataNodeKeywords);
Expand Down Expand Up @@ -520,6 +531,26 @@ private static void awaitKillPointsTriggered(KeySetView<String, Boolean> killPoi
Awaitility.await().atMost(2, TimeUnit.MINUTES).until(killPoints::isEmpty);
}

/**
* Best-effort wait for all kill points to be triggered. The kill point is detected by a
* background thread tailing the node log, so there can be a short lag between the migration
* result becoming visible and that thread processing the kill-point line. This gives it a brief
* grace period to catch up. Unlike {@link #awaitKillPointsTriggered}, it never throws: the
* authoritative check is {@link #checkKillPointsAllTriggered}, so a kill point that genuinely
* never triggers (e.g. the badKillPoint test) is still caught there as an AssertionError rather
* than masked here.
*/
private static void graceWaitForKillPointsTriggered(KeySetView<String, Boolean> killPoints) {
if (killPoints.isEmpty()) {
return;
}
try {
Awaitility.await().atMost(1, TimeUnit.MINUTES).until(killPoints::isEmpty);
} catch (ConditionTimeoutException ignored) {
// Fall through to checkKillPointsAllTriggered, which makes the real assertion.
}
}

private static String buildRegionMigrateCommand(int who, int from, int to) {
String result = String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to);
LOGGER.info(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.DailyIT;

import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein

@Category({DailyIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
Expand All @@ -41,7 +40,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
private final int configNodeNum = 1;
private final int dataNodeNum = 3;

// @Test
@Test
public void coordinatorCrashDuringAddPeerTransition() throws Exception {
failTest(
2,
Expand All @@ -53,7 +52,7 @@ public void coordinatorCrashDuringAddPeerTransition() throws Exception {
KillNode.COORDINATOR_DATANODE);
}

// @Test
@Test
public void coordinatorCrashDuringAddPeerDone() throws Exception {
failTest(
2,
Expand All @@ -69,9 +68,13 @@ public void coordinatorCrashDuringAddPeerDone() throws Exception {

// region Original DataNode crash tests

// @Test
@Test
public void originalCrashDuringAddPeerDone() throws Exception {
failTest(
// Once the add-peer phase is done, the new peer already holds the data, so the migration is
// designed to tolerate the original (source) DataNode crashing afterwards: it completes
// successfully and merely leaves the region files on the dead node to be cleaned up later.
// Hence this is a successTest, not a failTest.
successTest(
2,
2,
1,
Expand All @@ -85,7 +88,7 @@ public void originalCrashDuringAddPeerDone() throws Exception {

// region Destination DataNode crash tests

// @Test
@Test
public void destinationCrashDuringCreateLocalPeer() throws Exception {
failTest(
2,
Expand All @@ -97,7 +100,7 @@ public void destinationCrashDuringCreateLocalPeer() throws Exception {
KillNode.DESTINATION_DATANODE);
}

// @Test
@Test
public void destinationCrashDuringAddPeerDone() throws Exception {
failTest(
2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.apache.iotdb.itbase.category.DailyIT;

import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein

@Category({DailyIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
Expand All @@ -54,7 +53,7 @@ public void setUp() throws Exception {
.setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE);
}

// @Test
@Test
public void coordinatorCrashDuringAddPeerTransition() throws Exception {
failTest(
2,
Expand All @@ -66,7 +65,7 @@ public void coordinatorCrashDuringAddPeerTransition() throws Exception {
KillNode.COORDINATOR_DATANODE);
}

// @Test
@Test
public void coordinatorCrashDuringAddPeerDone() throws Exception {
failTest(
2,
Expand All @@ -82,9 +81,13 @@ public void coordinatorCrashDuringAddPeerDone() throws Exception {

// region Original DataNode crash tests

// @Test
@Test
public void originalCrashDuringAddPeerDone() throws Exception {
failTest(
// Once the add-peer phase is done, the new peer already holds the data, so the migration is
// designed to tolerate the original (source) DataNode crashing afterwards: it completes
// successfully and merely leaves the region files on the dead node to be cleaned up later.
// Hence this is a successTest, not a failTest.
successTest(
2,
2,
1,
Expand All @@ -98,7 +101,7 @@ public void originalCrashDuringAddPeerDone() throws Exception {

// region Destination DataNode crash tests

// @Test
@Test
public void destinationCrashDuringCreateLocalPeer() throws Exception {
failTest(
2,
Expand All @@ -110,7 +113,7 @@ public void destinationCrashDuringCreateLocalPeer() throws Exception {
KillNode.DESTINATION_DATANODE);
}

// @Test
@Test
public void destinationCrashDuringAddPeerDone() throws Exception {
failTest(
2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public void clusterCrash1() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void clusterCrash2() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false);
}
Expand All @@ -60,8 +59,7 @@ public void clusterCrash6() throws Exception {
killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void clusterCrash7() throws Exception {
killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void cnCrashDuringCreatePeerTest() throws Exception {
KillNode.CONFIG_NODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void testCnCrashDuringDoAddPeer() throws Exception {
successTest(
1,
Expand Down Expand Up @@ -127,8 +126,7 @@ public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception {
KillNode.CONFIG_NODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception {
successTest(
1,
Expand All @@ -140,8 +138,7 @@ public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception {
KillNode.CONFIG_NODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void cnCrashTest() throws Exception {
ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords = noKillPoints();
killConfigNodeKeywords.addAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ public void setUp() throws Exception {
.setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void clusterCrash1() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void clusterCrash2() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false);
}
Expand All @@ -74,8 +72,7 @@ public void clusterCrash6() throws Exception {
killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void clusterCrash7() throws Exception {
killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ public void cnCrashDuringCreatePeerTest() throws Exception {
KillNode.CONFIG_NODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void testCnCrashDuringDoAddPeer() throws Exception {
successTest(
1,
Expand Down Expand Up @@ -141,8 +140,7 @@ public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception {
KillNode.CONFIG_NODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception {
successTest(
1,
Expand All @@ -154,8 +152,7 @@ public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception {
KillNode.CONFIG_NODE);
}

// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
// @Test
@Test
public void cnCrashTest() throws Exception {
ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords = noKillPoints();
killConfigNodeKeywords.addAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.DailyIT;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;

import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -64,22 +66,42 @@ public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception {
testRepeatedlyRestartWholeCluster(
(s, i, env) -> {
if (i != 0) {
ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**");
ResultSetMetaData metaData = resultSet.getMetaData();
assertEquals(4, metaData.getColumnCount());
int cnt = 0;
while (resultSet.next()) {
cnt++;
StringBuilder result = new StringBuilder();
for (int j = 0; j < metaData.getColumnCount(); j++) {
result
.append(metaData.getColumnName(j + 1))
.append(":")
.append(resultSet.getString(j + 1))
.append(",");
}
System.out.println(result);
}
// This query is fanned out to every DataNode and the results are compared across
// replicas. Right after a restart the last cache on each coordinator is reloaded
// lazily, so the cross-replica comparison may transiently observe an inconsistent
// result until the cluster converges. ORDER BY TIMESERIES makes the row order
// deterministic across coordinators (the root cause of the observed flakiness), and the
// retry tolerates the brief convergence window (e.g. a replica that has not finished
// recovering yet) without masking a genuine, persistent inconsistency.
//
// ignoreExceptionsMatching(InconsistentDataException) is required: a mismatch surfaces
// as InconsistentDataException (a RuntimeException) thrown from getString(), and
// untilAsserted() only retries on AssertionError by default, so without it the retry
// would not actually cover this failure. We match only InconsistentDataException so a
// genuine error (e.g. a real SQLException) still fails fast instead of being retried.
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.ignoreExceptionsMatching(e -> e instanceof InconsistentDataException)
.untilAsserted(
() -> {
try (ResultSet resultSet =
s.executeQuery("SELECT last s1 FROM root.** ORDER BY TIMESERIES ASC")) {
ResultSetMetaData metaData = resultSet.getMetaData();
assertEquals(4, metaData.getColumnCount());
while (resultSet.next()) {
StringBuilder result = new StringBuilder();
for (int j = 0; j < metaData.getColumnCount(); j++) {
result
.append(metaData.getColumnName(j + 1))
.append(":")
.append(resultSet.getString(j + 1))
.append(",");
}
System.out.println(result);
}
}
});
}
s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)");
s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)");
Expand Down
Loading
Loading