Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[To dev/1.3] Load & Region Migrate: Notify all DNs before and after RM #15131

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum CnToDnAsyncRequestType {
CREATE_SCHEMA_REGION,
DELETE_REGION,
RESET_PEER_LIST,
NOTIFY_REGION_MIGRATION,
UPDATE_REGION_ROUTE_MAP,
CHANGE_REGION_LEADER,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
Expand Down Expand Up @@ -245,6 +246,11 @@ protected void initActionMapBuilder() {
CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP,
(req, client, handler) ->
client.updateRegionCache((TRegionRouteReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION,
(req, client, handler) ->
client.notifyRegionMigration(
(TNotifyRegionMigrationReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.CHANGE_REGION_LEADER,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
case SET_SYSTEM_STATUS:
case NOTIFY_REGION_MIGRATION:
case UPDATE_REGION_ROUTE_MAP:
case INVALIDATE_SCHEMA_CACHE:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
Expand Down Expand Up @@ -522,6 +523,20 @@ private TCreateDataRegionReq genCreateDataRegionReq(
return req;
}

public List<TSStatus> notifyRegionMigrationToAllDataNodes(
TConsensusGroupId consensusGroupId, boolean isStart) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
final TNotifyRegionMigrationReq request =
new TNotifyRegionMigrationReq(consensusGroupId, isStart);

final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, request, dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
return clientHandler.getResponseList();
}

public void persistRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) {
// Persist the allocation result
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.procedure.impl.region;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.state.NotifyRegionMigrationState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

/** A procedure that notifies all DNs of the ongoing region migration procedure. */
public class NotifyRegionMigrationProcedure
extends RegionOperationProcedure<NotifyRegionMigrationState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(NotifyRegionMigrationProcedure.class);

private boolean isStart;

public NotifyRegionMigrationProcedure() {
super();
}

public NotifyRegionMigrationProcedure(TConsensusGroupId consensusGroupId, boolean isStart) {
super(consensusGroupId);
this.isStart = isStart;
}

@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, NotifyRegionMigrationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
if (regionId == null) {
return Flow.NO_MORE_STATE;
}
try {
LOGGER.info(
"[pid{}][NotifyRegionMigration] started, region id is {}.", getProcId(), regionId);
env.notifyRegionMigrationToAllDataNodes(regionId, isStart);
} catch (Exception e) {
LOGGER.error("[pid{}][NotifyRegionMigration] state {} failed", getProcId(), state, e);
return Flow.NO_MORE_STATE;
}
LOGGER.info("[pid{}][NotifyRegionMigration] state {} complete", getProcId(), state);
return Flow.HAS_MORE_STATE;
}

@Override
protected void rollbackState(
ConfigNodeProcedureEnv configNodeProcedureEnv, NotifyRegionMigrationState state)
throws IOException, InterruptedException, ProcedureException {}

@Override
protected NotifyRegionMigrationState getState(int stateId) {
return NotifyRegionMigrationState.values()[stateId];
}

@Override
protected int getStateId(NotifyRegionMigrationState state) {
return state.ordinal();
}

@Override
protected NotifyRegionMigrationState getInitialState() {
return NotifyRegionMigrationState.INIT;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE.getTypeCode());
super.serialize(stream);
ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream);
stream.writeBoolean(isStart);
}

@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
regionId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
isStart = (byteBuffer.get() != (byte) 0);
} catch (ThriftSerDeException e) {
LOGGER.error("Error in deserialize {}", this.getClass(), e);
}
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof NotifyRegionMigrationProcedure)) {
return false;
}
NotifyRegionMigrationProcedure procedure = (NotifyRegionMigrationProcedure) obj;
return this.regionId.equals(procedure.regionId) && this.isStart == procedure.isStart;
}

@Override
public int hashCode() {
return Objects.hash(regionId, isStart);
}

@Override
public String toString() {
return "NotifyRegionMigrationProcedure{"
+ "regionId="
+ regionId
+ ", isStart="
+ isStart
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
regionId,
handler.simplifiedLocation(originalDataNode),
handler.simplifiedLocation(destDataNode));
addChildProcedure(new NotifyRegionMigrationProcedure(regionId, true));
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
case ADD_REGION_PEER:
Expand Down Expand Up @@ -125,6 +126,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
CommonDateTimeUtils.convertMillisecondToDurationStr(
System.currentTimeMillis() - getSubmittedTime()),
DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
addChildProcedure(new NotifyRegionMigrationProcedure(regionId, false));
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.procedure.state;

public enum NotifyRegionMigrationState {
INIT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure;
Expand Down Expand Up @@ -120,6 +121,10 @@ public Procedure create(ByteBuffer buffer) throws IOException {
break;
case RECONSTRUCT_REGION_PROCEDURE:
procedure = new ReconstructRegionProcedure();
break;
case NOTIFY_REGION_MIGRATION_PROCEDURE:
procedure = new NotifyRegionMigrationProcedure();
break;
case DELETE_TIMESERIES_PROCEDURE:
procedure = new DeleteTimeSeriesProcedure(false);
break;
Expand Down Expand Up @@ -318,6 +323,8 @@ public static ProcedureType getProcedureType(Procedure<?> procedure) {
return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
} else if (procedure instanceof ReconstructRegionProcedure) {
return ProcedureType.RECONSTRUCT_REGION_PROCEDURE;
} else if (procedure instanceof NotifyRegionMigrationProcedure) {
return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE;
} else if (procedure instanceof CreateTriggerProcedure) {
return ProcedureType.CREATE_TRIGGER_PROCEDURE;
} else if (procedure instanceof DropTriggerProcedure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ProcedureType {
RECONSTRUCT_REGION_PROCEDURE((short) 203),
ADD_REGION_PEER_PROCEDURE((short) 204),
REMOVE_REGION_PEER_PROCEDURE((short) 205),
NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206),
@TestOnly
CREATE_MANY_DATABASES_PROCEDURE((short) 250),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
Expand Down Expand Up @@ -2145,6 +2146,12 @@ public TRegionMigrateResult getRegionMaintainResult(long taskId) throws TExcepti
return RegionMigrateService.getInstance().getRegionMaintainResult(taskId);
}

@Override
public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws TException {
RegionMigrateService.getInstance().notifyRegionMigration(req);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup) {
return regionManager.createNewRegion(regionId, storageGroup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
Expand Down Expand Up @@ -182,6 +183,8 @@ public void start() {
}
shouldRemoveFileFromLoadingSet = true;

final long startTimeMs = System.currentTimeMillis();

if (node.isTsFileEmpty()) {
LOGGER.info("Load skip TsFile {}, because it has no data.", filePath);
} else if (!node.needDecodeTsFile(
Expand Down Expand Up @@ -226,6 +229,13 @@ public void start() {
}
}

if (RegionMigrateService.getInstance().getLastNotifyTime() > startTimeMs) {
LOGGER.warn(
"LoadTsFileScheduler: Region migration started or ended during loading TsFile {}, will convert to insertion to avoid data loss",
filePath);
isLoadSingleTsFileSuccess = false;
}

if (isLoadSingleTsFileSuccess) {
node.clean();
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.rpc.TSStatusCode;
Expand All @@ -53,6 +54,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class RegionMigrateService implements IService {
Expand All @@ -72,6 +74,9 @@ public class RegionMigrateService implements IService {
// where different asynchronous tasks are submitted to the same datanode within a single procedure
private static final ConcurrentHashMap<Long, TRegionMigrateResult> taskResultMap =
new ConcurrentHashMap<>();

private static final AtomicLong lastNotifyTime = new AtomicLong(Long.MIN_VALUE);

private static final TRegionMigrateResult unfinishedResult = new TRegionMigrateResult();

private RegionMigrateService() {}
Expand All @@ -80,6 +85,19 @@ public static RegionMigrateService getInstance() {
return Holder.INSTANCE;
}

public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
lastNotifyTime.set(System.currentTimeMillis());
if (req.isIsStart()) {
LOGGER.info("Region {} is notified to begin migrating", req.getRegionId());
} else {
LOGGER.info("Region {} is notified to finish migrating", req.getRegionId());
}
}

public long getLastNotifyTime() {
return lastNotifyTime.get();
}

/**
* Submit AddRegionPeerTask
*
Expand Down
Loading
Loading