zoukankan      html  css  js  c++  java
  • Flink -- Failover

     

    JobManager failover

     

    LeaderLatch

    private synchronized void setLeadership(boolean newValue)
    {
    boolean oldValue = hasLeadership.getAndSet(newValue);

    if ( oldValue && !newValue ) //原来是leader,当前不是leader,所以是lost leadership
    { // Lost leadership, was true, now false
    listeners.forEach(new Function<LeaderLatchListener, Void>()
    {
    @Override
    public Void apply(LeaderLatchListener listener)
    {
    listener.notLeader();
    return null;
    }
    });
    }
    else if ( !oldValue && newValue )
    { // Gained leadership, was false, now true
    listeners.forEach(new Function<LeaderLatchListener, Void>()
    {
    @Override
    public Void apply(LeaderLatchListener input)
    {
    input.isLeader();
    return null;
    }
    });
    }

    notifyAll();
    }
     

    ZooKeeperLeaderElectionService

    @Override
    public void isLeader() {
    synchronized (lock) {
    issuedLeaderSessionID = UUID.randomUUID();


    leaderContender.grantLeadership(issuedLeaderSessionID);
    }
    }

    @Override
    public void notLeader() {
    synchronized (lock) {
    issuedLeaderSessionID = null;
    confirmedLeaderSessionID = null;



    leaderContender.revokeLeadership();
    }
    }

    可以看到,只是分别调用leaderContender.grantLeadership,leaderContender.revokeLeadership

     

    而JobManager继承了leaderContender接口,

    revokeLeadership

    val newFuturesToComplete = cancelAndClearEverything(
    new Exception("JobManager is no longer the leader."))

     

    在cancelAndClearEverything中,关键的是suspend executionGraph;停止执行,但是并不会job删除,这样其他的JobManager还能重新提交

    * The SUSPENDED state is a local terminal state which stops the execution of the job but does
    * not remove the job from the HA job store so that it can be recovered by another JobManager.
    private def cancelAndClearEverything(cause: Throwable)
    : Seq[Future[Unit]] = {
    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
    future {
    eg.suspend(cause) //suspend Execution Graph

    if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
    jobInfo.client ! decorateMessage(
    Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
    }
    }(context.dispatcher)
    }

    currentJobs.clear()

    futures.toSeq
    }

     

    grantLeadership

    context.system.scheduler.scheduleOnce(
    jobRecoveryTimeout,
    self,
    decorateMessage(RecoverAllJobs))(
    context.dispatcher)

    主要是要恢复所有的job,RecoverAllJobs

    case RecoverAllJobs =>
    future {
    try {
    // The ActorRef, which is part of the submitted job graph can only be
    // de-serialized in the scope of an actor system.
    akka.serialization.JavaSerializer.currentSystem.withValue(
    context.system.asInstanceOf[ExtendedActorSystem]) {

    log.info(s"Attempting to recover all jobs.")

    val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala //从submittedJobGraphs store里面读出所有submitted的job,也是从zk里面读出

    if (!leaderElectionService.hasLeadership()) {
    // we've lost leadership. mission: abort.
    log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
    s"jobs.")
    } else {
    log.info(s"Re-submitting ${jobGraphs.size} job graphs.")

    jobGraphs.foreach{
    submittedJobGraph =>
    self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) //recover job
    }
    }
    }
    } catch {
    case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
    }
    }(context.dispatcher)

     

    在recover job,

    case RecoverSubmittedJob(submittedJobGraph) =>
    if (!currentJobs.contains(submittedJobGraph.getJobId)) {
    submitJob(
    submittedJobGraph.getJobGraph(),
    submittedJobGraph.getJobInfo(),
    isRecovery = true)
    }
    else {
    log.info(s"Ignoring job recovery for ${submittedJobGraph.getJobId}, " +
    s"because it is already submitted.")
    }

    其实就是重新的submit job,注意这里的,isRecovery = true

    在submit job时,如果isRecovery = true,会做下面的操作,然后后续具体的操作参考Checkpoint篇

    if (isRecovery) {
    executionGraph.restoreLatestCheckpointedState()
    }

     

    TaskManager Failover

    在job manager内部通过death watch发现task manager dead,

    /**
        * Handler to be executed when a task manager terminates.
        * (Akka Deathwatch or notifiction from ResourceManager)
        *
        * @param taskManager The ActorRef of the taskManager
        */
      private def handleTaskManagerTerminated(taskManager: ActorRef): Unit = {
        if (instanceManager.isRegistered(taskManager)) {
          log.info(s"Task manager ${taskManager.path} terminated.")
    
          instanceManager.unregisterTaskManager(taskManager, true)
          context.unwatch(taskManager)
        }
      }

    instanceManager.unregisterTaskManager,

    /**
    * Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
    * the given instance as dead and notify {@link InstanceListener} about the dead instance.
    *
    * @param instanceID TaskManager which is about to be marked dead.
    */
    public void unregisterTaskManager(ActorRef instanceID, boolean terminated){
        Instance instance = registeredHostsByConnection.get(instanceID);
        
        if (instance != null){
            ActorRef host = instance.getActorGateway().actor();
            
            registeredHostsByConnection.remove(host);
            registeredHostsById.remove(instance.getId());
            registeredHostsByResource.remove(instance.getResourceId());
            
            if (terminated) {
                deadHosts.add(instance.getActorGateway().actor());
            }
            
            instance.markDead();
            
            totalNumberOfAliveTaskSlots -= instance.getTotalNumberOfSlots();
            
            notifyDeadInstance(instance);
        }
    }

     

    instance.markDead()

    public void markDead() {
    
        // create a copy of the slots to avoid concurrent modification exceptions
        List<Slot> slots;
        
        synchronized (instanceLock) {
        if (isDead) {
            return;
        }
        isDead = true;
        
        // no more notifications for the slot releasing
        this.slotAvailabilityListener = null;
        
        slots = new ArrayList<Slot>(allocatedSlots);
        
        allocatedSlots.clear();
            availableSlots.clear();
        }
        
        /*
        * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot
        * owning the assignment group lock wants to give itself back to the instance which requires
        * the instance lock
        */
        for (Slot slot : slots) {
            slot.releaseSlot();
        }
    }

     

    SimpleSolt.releaseSlot

    @Override 
    public void releaseSlot() { 
    
        if (!isCanceled()) { 
    
            // kill all tasks currently running in this slot 
            Execution exec = this.executedTask; 
            if (exec != null && !exec.isFinished()) { 
                exec.fail(new Exception( 
                        "The slot in which the task was executed has been released. Probably loss of TaskManager " 
                                + getInstance())); 
            } 
    
            // release directly (if we are directly allocated), 
            // otherwise release through the parent shared slot 
            if (getParent() == null) { 
                // we have to give back the slot to the owning instance 
                if (markCancelled()) { 
                    getInstance().returnAllocatedSlot(this); 
                } 
            } else { 
                // we have to ask our parent to dispose us 
                getParent().releaseChild(this); 
            }
    
    }

     

    Execution.fail

    public void fail(Throwable t) {
    processFail(t, false);
    }

     

    Execution.processFail

    先将Execution的状态设为failed

    transitionState(current, FAILED, t)
    private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { 
    
        if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
            markTimestamp(targetState); 
    
            try {
                vertex.notifyStateTransition(attemptId, targetState, error);
            }
            catch (Throwable t) {
                LOG.error("Error while notifying execution graph of execution state transition.", t);
            }
            return true;
        } else {
            return false;
        }
    }

    设置完后,需要notifyStateTransition

    getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
    void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
                            newExecutionState, Throwable error)
    {
        ExecutionJobVertex vertex = getJobVertex(vertexId);
    
        if (executionListenerActors.size() > 0) {
            String message = error == null ? null : ExceptionUtils.stringifyException(error);
            ExecutionGraphMessages.ExecutionStateChanged actorMessage =
                    new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,  vertex.getJobVertex().getName(),
                                                                    vertex.getParallelism(), subtask,
                                                                    executionID, newExecutionState,
                                                                    System.currentTimeMillis(), message);
    
            for (ActorGateway listener : executionListenerActors) {
                listener.tell(actorMessage);
            }
        }
    
        // see what this means for us. currently, the first FAILED state means -> FAILED
        if (newExecutionState == ExecutionState.FAILED) {
            fail(error);
        }
    }

    主要就是将ExecutionGraphMessages.ExecutionStateChanged,发送给所有的listeners

    listener是在JobManager里面在提交job的时候加上的,

         if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
              // the sender wants to be notified about state changes
              val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
    
              executionGraph.registerExecutionListener(gateway)
              executionGraph.registerJobStatusListener(gateway)
          }

    而在client,

    JobClientActor,只是log和print这些信息
    if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
        logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);
    } else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
        logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);
    }

     

    注意,这里如果newExecutionState == ExecutionState.FAILED,会调用ExecutionGraph.fail
    就像注释说的,第一个failed,就意味着整个jobfailed

    public void fail(Throwable t) {
        while (true) {
            JobStatus current = state;
            // stay in these states
            if (current == JobStatus.FAILING ||
                current == JobStatus.SUSPENDED ||
                current.isGloballyTerminalState()) {
                return;
            } else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
                synchronized (progressLock) {
                    postRunCleanup();
                    progressLock.notifyAll();
                    return;
                }
            } else if (transitionState(current, JobStatus.FAILING, t)) { //将job的状态设为JobStatus.FAILING
                this.failureCause = t;
    
                if (!verticesInCreationOrder.isEmpty()) {
                    // cancel all. what is failed will not cancel but stay failed
                    for (ExecutionJobVertex ejv : verticesInCreationOrder) {
                        ejv.cancel();
                    }
                } else {
                    // set the state of the job to failed
                    transitionState(JobStatus.FAILING, JobStatus.FAILED, t); //
                }
    
                return;
            }
    
        }
    }

    可以看到,这里直接把job状态设为Failing,并且调用所有的ExecutionJobVertex.cancel

     

    接着,从ExecutionGraph中deregister这个execution,

    vertex.getExecutionGraph().deregisterExecution(this);
    Execution contained = currentExecutions.remove(exec.getAttemptId());

     

    最终,调用

    vertex.executionFailed(t);
    void executionFailed(Throwable t) {
        jobVertex.vertexFailed(subTaskIndex, t);
    }

     

    ExecutionJobVertex
    
    void vertexFailed(int subtask, Throwable error) {
        subtaskInFinalState(subtask);
    }
    
    private void subtaskInFinalState(int subtask) {
        synchronized (stateMonitor) {
            if (!finishedSubtasks[subtask]) {
                finishedSubtasks[subtask] = true;
                
                if (numSubtasksInFinalState+1 == parallelism) { //看看对于Vertex而言,是否所有的subTask都已经finished
                    
                    // call finalizeOnMaster hook
                    try {
                        getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
                    }
                    catch (Throwable t) {
                        getGraph().fail(t);
                    }
    
                    numSubtasksInFinalState++;
                    
                    // we are in our final state
                    stateMonitor.notifyAll();
                    
                    // tell the graph
                    graph.jobVertexInFinalState();
                } else {
                    numSubtasksInFinalState++;
                }
            }
        }
    }

    graph.jobVertexInFinalState()

    void jobVertexInFinalState() {
            numFinishedJobVertices++;
    
            if (numFinishedJobVertices == verticesInCreationOrder.size()) { //是否所有JobVertices都已经finished
    
                // we are done, transition to the final state
                JobStatus current;
                while (true) {
                    current = this.state;
    
                    if (current == JobStatus.RUNNING) {
                        if (transitionState(current, JobStatus.FINISHED)) {
                            postRunCleanup();
                            break;
                        }
                    }
                    else if (current == JobStatus.CANCELLING) {
                        if (transitionState(current, JobStatus.CANCELED)) {
                            postRunCleanup();
                            break;
                        }
                    }
                    else if (current == JobStatus.FAILING) {
                        boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
    
                        if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
                            restartStrategy.restart(this);
                            break;
                        } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
                            postRunCleanup();
                            break;
                        }
                    }
                    else if (current == JobStatus.SUSPENDED) {
                        // we've already cleaned up when entering the SUSPENDED state
                        break;
                    }
                    else if (current.isGloballyTerminalState()) {
                        LOG.warn("Job has entered globally terminal state without waiting for all " +
                            "job vertices to reach final state.");
                        break;
                    }
                    else {
                        fail(new Exception("ExecutionGraph went into final state from state " + current));
                        break;
                    }
                }
                // done transitioning the state
    
                // also, notify waiters
                progressLock.notifyAll();
            }
        }
    }

    如果Job状态是JobStatus.FAILING,并且满足restart的条件,transitionState(current, JobStatus.RESTARTING)

    restartStrategy.restart(this);

    这个restart策略是可以配置的,但无论什么策略最终调用到,

    executionGraph.restart();
    public void restart() {
        try {
            synchronized (progressLock) {
                JobStatus current = state;
    
                if (current == JobStatus.CANCELED) {
                    LOG.info("Canceled job during restart. Aborting restart.");
                    return;
                } else if (current == JobStatus.FAILED) {
                    LOG.info("Failed job during restart. Aborting restart.");
                    return;
                } else if (current == JobStatus.SUSPENDED) {
                    LOG.info("Suspended job during restart. Aborting restart.");
                    return;
                } else if (current != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
    
                if (scheduler == null) {
                    throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
                }
    
                this.currentExecutions.clear();
    
                Collection<CoLocationGroup> colGroups = new HashSet<>();
    
                for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
    
                    CoLocationGroup cgroup = jv.getCoLocationGroup();
                    if(cgroup != null && !colGroups.contains(cgroup)){
                        cgroup.resetConstraints();
                        colGroups.add(cgroup);
                    }
    
                    jv.resetForNewExecution();
                }
    
                for (int i = 0; i < stateTimestamps.length; i++) {
                    if (i != JobStatus.RESTARTING.ordinal()) {
                        // Only clear the non restarting state in order to preserve when the job was
                        // restarted. This is needed for the restarting time gauge
                        stateTimestamps[i] = 0;
                    }
                }
                numFinishedJobVertices = 0;
                transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
    
                // if we have checkpointed state, reload it into the executions
                if (checkpointCoordinator != null) {
                    boolean restored = checkpointCoordinator
                            .restoreLatestCheckpointedState(getAllVertices(), false, false); //重新加载checkpoint和状态
    
                    // TODO(uce) Temporary work around to restore initial state on
                    // failure during recovery. Will be superseded by FLINK-3397.
                    if (!restored && savepointCoordinator != null) {
                        String savepointPath = savepointCoordinator.getSavepointRestorePath();
                        if (savepointPath != null) {
                            savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
                        }
                    }
                }
            }
    
            scheduleForExecution(scheduler); //把ExecuteGraph加入调度,重新提交
        }
        catch (Throwable t) {
            fail(t);
        }
    }
  • 相关阅读:
    抽象类存在的意义
    抽象类的特征
    抽象类的使用
    抽象类的概述
    引用类型作为方法参数和返回值
    继承的特点
    目前Java水平以及理解自我反思---01
    继承后- 构造器的特点
    指针函数
    C数组灵活多变的访问形式
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6099193.html
Copyright © 2011-2022 走看看