diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java index e5d1515d27ec..225d43419132 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java @@ -39,6 +39,7 @@ public enum CnToDnAsyncRequestType { CREATE_SCHEMA_REGION, DELETE_REGION, RESET_PEER_LIST, + NOTIFY_REGION_MIGRATION, UPDATE_REGION_ROUTE_MAP, CHANGE_REGION_LEADER, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 1bae93c73997..59f99fe57cb0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -71,6 +71,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq; +import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq; @@ -245,6 +246,11 @@ protected void initActionMapBuilder() { CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP, (req, client, handler) -> client.updateRegionCache((TRegionRouteReq) req, (DataNodeTSStatusRPCHandler) handler)); + actionMapBuilder.put( + CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, + (req, client, handler) -> + client.notifyRegionMigration( + (TNotifyRegionMigrationReq) req, (DataNodeTSStatusRPCHandler) handler)); actionMapBuilder.put( CnToDnAsyncRequestType.CHANGE_REGION_LEADER, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index dc8f14cfaa9f..6e2a9dc97ccd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -202,6 +202,7 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( case STOP_REPAIR_DATA: case LOAD_CONFIGURATION: case SET_SYSTEM_STATUS: + case NOTIFY_REGION_MIGRATION: case UPDATE_REGION_ROUTE_MAP: case INVALIDATE_SCHEMA_CACHE: case INVALIDATE_MATCHED_SCHEMA_CACHE: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 5ae4a303037f..69f89550c17f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -70,6 +70,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; +import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq; @@ -522,6 +523,20 @@ private TCreateDataRegionReq genCreateDataRegionReq( return req; } + public List notifyRegionMigrationToAllDataNodes( + TConsensusGroupId consensusGroupId, boolean isStart) { + final Map dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + final TNotifyRegionMigrationReq request = + new TNotifyRegionMigrationReq(consensusGroupId, isStart); + + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, request, dataNodeLocationMap); + CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); + return clientHandler.getResponseList(); + } + public void persistRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) { // Persist the allocation result try { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java new file mode 100644 index 000000000000..0160e9116f94 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; +import org.apache.iotdb.confignode.procedure.state.NotifyRegionMigrationState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +/** A procedure that notifies all DNs of the ongoing region migration procedure. */ +public class NotifyRegionMigrationProcedure + extends RegionOperationProcedure { + private static final Logger LOGGER = + LoggerFactory.getLogger(NotifyRegionMigrationProcedure.class); + + private boolean isStart; + + public NotifyRegionMigrationProcedure() { + super(); + } + + public NotifyRegionMigrationProcedure(TConsensusGroupId consensusGroupId, boolean isStart) { + super(consensusGroupId); + this.isStart = isStart; + } + + @Override + protected Flow executeFromState(ConfigNodeProcedureEnv env, NotifyRegionMigrationState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + if (regionId == null) { + return Flow.NO_MORE_STATE; + } + try { + LOGGER.info( + "[pid{}][NotifyRegionMigration] started, region id is {}.", getProcId(), regionId); + env.notifyRegionMigrationToAllDataNodes(regionId, isStart); + } catch (Exception e) { + LOGGER.error("[pid{}][NotifyRegionMigration] state {} failed", getProcId(), state, e); + return Flow.NO_MORE_STATE; + } + LOGGER.info("[pid{}][NotifyRegionMigration] state {} complete", getProcId(), state); + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState( + ConfigNodeProcedureEnv configNodeProcedureEnv, NotifyRegionMigrationState state) + throws IOException, InterruptedException, ProcedureException {} + + @Override + protected NotifyRegionMigrationState getState(int stateId) { + return NotifyRegionMigrationState.values()[stateId]; + } + + @Override + protected int getStateId(NotifyRegionMigrationState state) { + return state.ordinal(); + } + + @Override + protected NotifyRegionMigrationState getInitialState() { + return NotifyRegionMigrationState.INIT; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE.getTypeCode()); + super.serialize(stream); + ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream); + stream.writeBoolean(isStart); + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + try { + regionId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer); + isStart = (byteBuffer.get() != (byte) 0); + } catch (ThriftSerDeException e) { + LOGGER.error("Error in deserialize {}", this.getClass(), e); + } + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof NotifyRegionMigrationProcedure)) { + return false; + } + NotifyRegionMigrationProcedure procedure = (NotifyRegionMigrationProcedure) obj; + return this.regionId.equals(procedure.regionId) && this.isStart == procedure.isStart; + } + + @Override + public int hashCode() { + return Objects.hash(regionId, isStart); + } + + @Override + public String toString() { + return "NotifyRegionMigrationProcedure{" + + "regionId=" + + regionId + + ", isStart=" + + isStart + + '}'; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java index 951d521784b6..df0483bbe424 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java @@ -84,6 +84,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat regionId, handler.simplifiedLocation(originalDataNode), handler.simplifiedLocation(destDataNode)); + addChildProcedure(new NotifyRegionMigrationProcedure(regionId, true)); setNextState(RegionTransitionState.ADD_REGION_PEER); break; case ADD_REGION_PEER: @@ -125,6 +126,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat CommonDateTimeUtils.convertMillisecondToDurationStr( System.currentTimeMillis() - getSubmittedTime()), DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms")); + addChildProcedure(new NotifyRegionMigrationProcedure(regionId, false)); return Flow.NO_MORE_STATE; default: throw new ProcedureException("Unsupported state: " + state.name()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java new file mode 100644 index 000000000000..1b964621e5b7 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.state; + +public enum NotifyRegionMigrationState { + INIT, +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index e5d43137facf..fc88b54f3d56 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure; +import org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure; import org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure; import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure; import org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure; @@ -120,6 +121,10 @@ public Procedure create(ByteBuffer buffer) throws IOException { break; case RECONSTRUCT_REGION_PROCEDURE: procedure = new ReconstructRegionProcedure(); + break; + case NOTIFY_REGION_MIGRATION_PROCEDURE: + procedure = new NotifyRegionMigrationProcedure(); + break; case DELETE_TIMESERIES_PROCEDURE: procedure = new DeleteTimeSeriesProcedure(false); break; @@ -318,6 +323,8 @@ public static ProcedureType getProcedureType(Procedure procedure) { return ProcedureType.DELETE_TIMESERIES_PROCEDURE; } else if (procedure instanceof ReconstructRegionProcedure) { return ProcedureType.RECONSTRUCT_REGION_PROCEDURE; + } else if (procedure instanceof NotifyRegionMigrationProcedure) { + return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE; } else if (procedure instanceof CreateTriggerProcedure) { return ProcedureType.CREATE_TRIGGER_PROCEDURE; } else if (procedure instanceof DropTriggerProcedure) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 82b281867569..48ccca42d44a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -39,6 +39,7 @@ public enum ProcedureType { RECONSTRUCT_REGION_PROCEDURE((short) 203), ADD_REGION_PEER_PROCEDURE((short) 204), REMOVE_REGION_PEER_PROCEDURE((short) 205), + NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206), @TestOnly CREATE_MANY_DATABASES_PROCEDURE((short) 250), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index c5e92c754ac5..7613651d0199 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -207,6 +207,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq; import org.apache.iotdb.mpp.rpc.thrift.TLoadResp; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; +import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq; @@ -2145,6 +2146,12 @@ public TRegionMigrateResult getRegionMaintainResult(long taskId) throws TExcepti return RegionMigrateService.getInstance().getRegionMaintainResult(taskId); } + @Override + public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws TException { + RegionMigrateService.getInstance().notifyRegionMigration(req); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup) { return regionManager.createNewRegion(regionId, storageGroup); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 04d38f25f893..0af4d91adfed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.service.RegionMigrateService; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -182,6 +183,8 @@ public void start() { } shouldRemoveFileFromLoadingSet = true; + final long startTimeMs = System.currentTimeMillis(); + if (node.isTsFileEmpty()) { LOGGER.info("Load skip TsFile {}, because it has no data.", filePath); } else if (!node.needDecodeTsFile( @@ -226,6 +229,13 @@ public void start() { } } + if (RegionMigrateService.getInstance().getLastNotifyTime() > startTimeMs) { + LOGGER.warn( + "LoadTsFileScheduler: Region migration started or ended during loading TsFile {}, will convert to insertion to avoid data loss", + filePath); + isLoadSingleTsFileSuccess = false; + } + if (isLoadSingleTsFileSuccess) { node.clean(); LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 056df802c138..05328ea91b94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; +import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult; import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class RegionMigrateService implements IService { @@ -72,6 +74,9 @@ public class RegionMigrateService implements IService { // where different asynchronous tasks are submitted to the same datanode within a single procedure private static final ConcurrentHashMap taskResultMap = new ConcurrentHashMap<>(); + + private static final AtomicLong lastNotifyTime = new AtomicLong(Long.MIN_VALUE); + private static final TRegionMigrateResult unfinishedResult = new TRegionMigrateResult(); private RegionMigrateService() {} @@ -80,6 +85,19 @@ public static RegionMigrateService getInstance() { return Holder.INSTANCE; } + public void notifyRegionMigration(TNotifyRegionMigrationReq req) { + lastNotifyTime.set(System.currentTimeMillis()); + if (req.isIsStart()) { + LOGGER.info("Region {} is notified to begin migrating", req.getRegionId()); + } else { + LOGGER.info("Region {} is notified to finish migrating", req.getRegionId()); + } + } + + public long getLastNotifyTime() { + return lastNotifyTime.get(); + } + /** * Submit AddRegionPeerTask * diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 4b25e08d1f6b..bef964966508 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -52,6 +52,11 @@ struct TRegionMigrateResult { 4: required common.TRegionMaintainTaskStatus taskStatus } +struct TNotifyRegionMigrationReq { + 1: required common.TConsensusGroupId regionId + 2: required bool isStart +} + struct TCreatePeerReq { 1: required common.TConsensusGroupId regionId 2: required list regionLocations @@ -781,6 +786,11 @@ service IDataNodeRPCService { */ TRegionMigrateResult getRegionMaintainResult(i64 taskId) + /** + * Notify the DataNode of the beginning or ending the migration of the specified RegionGroup + */ + common.TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) + /** * Config node will clean DataNode cache, the Data node will not accept read/write request when disabled * @param data node location