From 67316b26f1dc1cf76828b8b89a15af806df2d9cd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 11 Jun 2026 15:01:39 +0800 Subject: [PATCH 1/3] Fix pipe schema snapshot database creation --- .../thrift/IoTDBDataNodeReceiver.java | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index e6c8ddefc997..d43840e8e238 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -44,6 +44,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -101,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; @@ -625,8 +627,16 @@ static LoadTsFileStatement buildLoadTsFileStatementForSync( private TSStatus loadSchemaSnapShot( final Map parameters, final List fileAbsolutePaths) throws IllegalPathException, IOException { - final PartialPath databasePath = - PartialPath.getQualifiedDatabasePartialPath(parameters.get(ColumnHeaderConstant.DATABASE)); + final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE); + final PartialPath databasePath = PartialPath.getQualifiedDatabasePartialPath(databaseName); + + if (!PathUtils.isTableModelDatabase(databaseName)) { + final TSStatus createDatabaseStatus = createSchemaSnapshotDatabaseIfNecessary(databasePath); + if (createDatabaseStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return createDatabaseStatus; + } + } + final SRStatementGenerator generator = SchemaRegionSnapshotParser.translate2Statements( Paths.get(fileAbsolutePaths.get(0)), @@ -697,6 +707,33 @@ private TSStatus loadSchemaSnapShot( return PipeReceiverStatusHandler.getPriorStatus(results); } + private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath databasePath) { + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(databasePath); + + final TSStatus status = executeStatementAndClassifyExceptions(statement); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + if (status.getCode() == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + return Objects.equals( + status.getMessage(), + databasePath.getFullPath() + " has already been created as database") + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + return status; + } + private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq req) { // We may be able to skip the alter logical view's exception parsing because // the "AlterLogicalViewNode" is itself idempotent From f4358de71fb5529fff5aa829e7c8c26a015b0fd2 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 11 Jun 2026 15:31:21 +0800 Subject: [PATCH 2/3] Fix legacy pipe receiver database conflict handling --- .../receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java index c4c3986259f5..feeabc698fff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -175,8 +175,7 @@ private boolean registerDatabase( false, false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() - && result.status.code != TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { LOGGER.error( DataNodePipeMessages.CREATE_DATABASE_ERROR_STATEMENT_RESULT_STATUS, statement, From 1ba4d7518f578b5235b4b7e00858acbd227cb153 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 11 Jun 2026 17:39:33 +0800 Subject: [PATCH 3/3] Fix pipe enriched config statement execution --- .../iotdb/db/queryengine/plan/Coordinator.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index bd1512aa428a..06dadecce798 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -153,6 +153,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewriteFactory; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.thrift.TBase; @@ -399,13 +400,19 @@ private IQueryExecution createQueryExecutionForTreeModel( long startTime) { queryContext.setTimeOut(timeOut); queryContext.setStartTime(startTime); - if (statement instanceof IConfigStatement) { - queryContext.setQueryType(((IConfigStatement) statement).getQueryType()); + final Statement configStatement = + statement instanceof PipeEnrichedStatement + && ((PipeEnrichedStatement) statement).getInnerStatement() + instanceof IConfigStatement + ? ((PipeEnrichedStatement) statement).getInnerStatement() + : statement; + if (configStatement instanceof IConfigStatement) { + queryContext.setQueryType(((IConfigStatement) configStatement).getQueryType()); return new ConfigExecution( queryContext, - statement.getType(), + configStatement.getType(), executor, - statement.accept(new TreeConfigTaskVisitor(), queryContext)); + configStatement.accept(new TreeConfigTaskVisitor(), queryContext)); } TreeModelPlanner treeModelPlanner = new TreeModelPlanner(