Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,31 @@ public void testTsFileDecompositionWithMods() {

executeNonQueryWithRetry(senderEnv, "FLUSH");

HashSet<String> expectedResults = new HashSet<>();
expectedResults.add(
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");

TestUtils.assertDataEventuallyOnEnv(
senderEnv,
TableModelUtils.getQuerySql("table1"),
TableModelUtils.generateHeaderResults(),
expectedResults,
"sg1");

TestUtils.assertDataEventuallyOnEnv(
senderEnv,
"SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4",
"s4,",
Collections.emptySet(),
"sg1");

executeNonQueryWithRetry(
senderEnv,
String.format(
"CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true', 'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')",
"CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true', 'capture.table'='true', 'inclusion'='data.insert,data.delete') WITH CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')",
receiverEnv.getDataNodeWrapperList().get(0).getIp(),
receiverEnv.getDataNodeWrapperList().get(0).getPort()));

HashSet<String> expectedResults = new HashSet<>();
expectedResults.add(
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
TableModelUtils.getQuerySql("table1"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.queryengine.plan.analyze;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IoTDBException;
Expand All @@ -39,7 +38,8 @@
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
Expand Down Expand Up @@ -320,18 +320,34 @@

try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// TODO: may use time and db/table to filter
final TRegionRouteMapResp latestRegionRouteMap = configNodeClient.getLatestRegionRouteMap();
final Set<TRegionReplicaSet> replicaSets = new HashSet<>();
latestRegionRouteMap.getRegionRouteMap().entrySet().stream()
.filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion)
.forEach(e -> replicaSets.add(e.getValue()));
node.setReplicaSets(replicaSets);
node.setReplicaSets(fetchDeleteReplicaSets(configNodeClient, node));
} catch (final IoTDBRuntimeException e) {
throw e;
} catch (final Exception e) {
throw new IoTDBRuntimeException(e, TSStatusCode.CAN_NOT_CONNECT_CONFIGNODE.getStatusCode());
}
}

static Set<TRegionReplicaSet> fetchDeleteReplicaSets(
final ConfigNodeClient configNodeClient, final Delete node) throws Exception {

Check warning on line 332 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6vZMjC38fUmrCr7Pyh&open=AZ6vZMjC38fUmrCr7Pyh&pullRequest=17896
final Set<TRegionReplicaSet> replicaSets = new HashSet<>();
for (final TableDeletionEntry tableDeletionEntry : node.getTableDeletionEntries()) {
final TGetRegionGroupsByTimeResp resp =
configNodeClient.getRegionGroupsByTime(
new TGetRegionGroupsByTimeReq(
node.getDatabaseName(),
tableDeletionEntry.getStartTime(),
tableDeletionEntry.getEndTime()));
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new IoTDBRuntimeException(resp.getStatus());
}
if (resp.isSetRegionReplicaSets()) {
replicaSets.addAll(resp.getRegionReplicaSets());
}
}
return replicaSets;
}

@SuppressWarnings("java:S3655") // optional is checked
public static String getDatabaseName(final Delete node, final MPPQueryContext queryContext) {
final String databaseName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,40 @@

package org.apache.iotdb.db.queryengine.plan.analyze;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ComparisonExpression;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.QualifiedName;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Table;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class AnalyzeUtilsTest {

Expand All @@ -52,4 +72,58 @@ public void testParseDeletePredicateWithRenamedTimeColumn() {
assertEquals(Long.MIN_VALUE, entries.get(0).getStartTime());
assertEquals(100, entries.get(0).getEndTime());
}

@Test
public void testFetchDeleteReplicaSetsOnlyQueriesTargetDatabaseRegions() throws Exception {
final Delete delete = new Delete(new Table(QualifiedName.of("table1")));
delete.setDatabaseName("root.db1");
delete.setTableDeletionEntries(
Arrays.asList(
new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(10, 20)),
new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(30, 40))));

final TRegionReplicaSet regionReplicaSet1 = dataRegionReplicaSet(1);
final TRegionReplicaSet regionReplicaSet2 = dataRegionReplicaSet(2);
final TGetRegionGroupsByTimeResp resp1 =
successRegionGroupsResp(Collections.singleton(regionReplicaSet1));
final TGetRegionGroupsByTimeResp resp2 =
successRegionGroupsResp(new HashSet<>(Arrays.asList(regionReplicaSet1, regionReplicaSet2)));
final ConfigNodeClient configNodeClient = Mockito.mock(ConfigNodeClient.class);
Mockito.when(
configNodeClient.getRegionGroupsByTime(Mockito.any(TGetRegionGroupsByTimeReq.class)))
.thenReturn(resp1, resp2);

final Set<TRegionReplicaSet> result =
AnalyzeUtils.fetchDeleteReplicaSets(configNodeClient, delete);

assertEquals(2, result.size());
assertTrue(result.contains(regionReplicaSet1));
assertTrue(result.contains(regionReplicaSet2));

final ArgumentCaptor<TGetRegionGroupsByTimeReq> reqCaptor =
ArgumentCaptor.forClass(TGetRegionGroupsByTimeReq.class);
Mockito.verify(configNodeClient, Mockito.times(2)).getRegionGroupsByTime(reqCaptor.capture());
Mockito.verify(configNodeClient, Mockito.never()).getLatestRegionRouteMap();

final List<TGetRegionGroupsByTimeReq> requests = reqCaptor.getAllValues();
assertEquals("root.db1", requests.get(0).getDatabase());
assertEquals(10, requests.get(0).getStartTime());
assertEquals(20, requests.get(0).getEndTime());
assertEquals("root.db1", requests.get(1).getDatabase());
assertEquals(30, requests.get(1).getStartTime());
assertEquals(40, requests.get(1).getEndTime());
}

private static TGetRegionGroupsByTimeResp successRegionGroupsResp(
final Set<TRegionReplicaSet> replicaSets) {
final TGetRegionGroupsByTimeResp resp =
new TGetRegionGroupsByTimeResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
resp.setRegionReplicaSets(replicaSets);
return resp;
}

private static TRegionReplicaSet dataRegionReplicaSet(final int regionId) {
return new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId), Collections.emptyList());
}
}
Loading