[ES版本]
5.5.0
[分析过程]
找到Recovery有6种状态
public class RecoveryState implements ToXContent, Streamable { public enum Stage { //初始化状态 INIT((byte) 0), /** * recovery of lucene files, either reusing local ones are copying new ones */ //从lucene或本地文件复制新文件 INDEX((byte) 1), /** * potentially running check index */ //检查确认索引 VERIFY_INDEX((byte) 2), /** * starting up the engine, replaying the translog */ //启动引擎重放translog TRANSLOG((byte) 3), /** * performing final task after all translog ops have been done */ //translog结束后执行最终任务 FINALIZE((byte) 4), //完成状态 DONE((byte) 5); ... } ... }
shard有4种状态
public enum ShardRoutingState { /** * The shard is not assigned to any node. */ //分片未分配 UNASSIGNED((byte) 1), /** * The shard is initializing (probably recovering from either a peer shard * or gateway). */ //分片正在初始化(可能正在从peer shard或者gateway进行恢复) INITIALIZING((byte) 2), /** * The shard is started. */ //分片已经启动 STARTED((byte) 3), /** * The shard is in the process being relocated. */ //分片正在迁移 RELOCATING((byte) 4); ... }
找到一处调用位置:PeerRecoverySourceService
/** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. */ public class PeerRecoverySourceService extends AbstractComponent implements IndexEventListener { public static class Actions { public static final String START_RECOVERY = "internal:index/shard/recovery/start_recovery"; } private final TransportService transportService; private final IndicesService indicesService; private final RecoverySettings recoverySettings; private final ClusterService clusterService; private final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries(); @Inject public PeerRecoverySourceService(Settings settings, TransportService transportService, IndicesService indicesService, RecoverySettings recoverySettings, ClusterService clusterService) { super(settings); this.transportService = transportService; this.indicesService = indicesService; this.clusterService = clusterService; this.recoverySettings = recoverySettings; transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, new StartRecoveryTransportRequestHandler()); } //在分片关闭前,要把所有正在recovery的动作中止掉 @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null) { ongoingRecoveries.cancel(indexShard, "shard is closed"); } } private RecoveryResponse recover(StartRecoveryRequest request) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); // starting recovery from that our (the source) shard state is marking the shard to be in recovery mode as well, otherwise // the index operations will not be routed to it properly //先判断目的节点是否正常存在集群中 RoutingNode node = clusterService.state().getRoutingNodes().node(request.targetNode().getId()); //如果不在集群,则推迟进行recovery if (node == null) { logger.debug("delaying recovery of {} as source node {} is unknown", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source node does not have the node [" + request.targetNode() + "] in its state yet.."); } ShardRouting routingEntry = shard.routingEntry(); //是主分片并且当前分片非迁移状态 // 或者 //是主分片且处于迁移状态,但是目标节点与正在迁移的目标节点不一致 if (request.isPrimaryRelocation() && (routingEntry.relocating() == false || routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) { logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]"); } ShardRouting targetShardRouting = node.getByShardId(request.shardId()); //节点上未获取到目标分片 if (targetShardRouting == null) { logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } //目标分片非初始化状态 if (!targetShardRouting.initializing()) { logger.debug("delaying recovery of {} as it is not listed as initializing on the target node {}. known shards state is [{}]", request.shardId(), request.targetNode(), targetShardRouting.state()); throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]"); } //请求中未携带分配的ID,需要重新构造请求 if (request.targetAllocationId() == null) { // ES versions < 5.4.0 do not send targetAllocationId as part of recovery request, just assume that we have the correct id request = new StartRecoveryRequest(request.shardId(), targetShardRouting.allocationId().getId(), request.sourceNode(), request.targetNode(), request.metadataSnapshot(), request.isPrimaryRelocation(), request.recoveryId()); } ` //请求中携带的目标分配ID与分片的分配唯一标识ID不一致 if (request.targetAllocationId().equals(targetShardRouting.allocationId().getId()) == false) { logger.debug("delaying recovery of {} due to target allocation id mismatch (expected: [{}], but was: [{}])", request.shardId(), request.targetAllocationId(), targetShardRouting.allocationId().getId()); throw new DelayRecoveryException("source node has the state of the target shard to have allocation id [" + targetShardRouting.allocationId().getId() + "], expecting to be [" + request.targetAllocationId() + "]"); } //往正在recovery的列表中增加一个新的recovery RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard); logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode()); try { return handler.recoverToTarget(); } finally { ongoingRecoveries.remove(shard, handler); } } ... }
StartRecoveryTransportRequestHandler中的messageReceived负责从transport channel接收启动recovery的请求并执行recover操作。
<未完待续>