zoukankan      html  css  js  c++  java
  • Flink – submitJob

    Jobmanager的submitJob逻辑,

    /**
       * Submits a job to the job manager. The job is registered at the libraryCacheManager which
       * creates the job's class loader. The job graph is appended to the corresponding execution
       * graph and the execution vertices are queued for scheduling.
       *
       * @param jobGraph representing the Flink job
       * @param jobInfo the job info
       * @param isRecovery Flag indicating whether this is a recovery or initial submission
       */
      private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
        if (jobGraph == null) {
          jobInfo.notifyClients(
            decorateMessage(JobResultFailure(
              new SerializedThrowable(
                new JobSubmissionException(null, "JobGraph must not be null.")))))
        }
        else {
          val jobId = jobGraph.getJobID
          val jobName = jobGraph.getName
          var executionGraph: ExecutionGraph = null
    
          try {
            // Important: We need to make sure that the library registration is the first action,
            // because this makes sure that the uploaded jar files are removed in case of
            // unsuccessful
            try {
              libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
                jobGraph.getClasspaths)
            }
            var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) //加载Jar
    
            val restartStrategy = //加载重启策略
              Option(jobGraph.getSerializedExecutionConfig()
                .deserializeValue(userCodeLoader)
                .getRestartStrategy())
                .map(RestartStrategyFactory.createRestartStrategy)
                .filter(p => p != null) match {
                case Some(strategy) => strategy
                case None => restartStrategyFactory.createRestartStrategy()
              }
    
            val jobMetrics = jobManagerMetricGroup match { //生成job manager metric group
              case Some(group) =>
                group.addJob(jobGraph) match {
                  case (jobGroup:Any) => jobGroup
                  case null => new UnregisteredMetricsGroup()
                }
              case None =>
                new UnregisteredMetricsGroup()
            }
    
            val numSlots = scheduler.getTotalNumberOfSlots() //现有的slots数目
    
            // see if there already exists an ExecutionGraph for the corresponding job ID
            val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
              case Some((graph, currentJobInfo)) =>
                executionGraph = graph
                currentJobInfo.setLastActive()
                false
              case None =>
                true
            }
    
            executionGraph = ExecutionGraphBuilder.buildGraph( //build ExecutionGraph
              executionGraph,
              jobGraph,
              flinkConfiguration,
              futureExecutor,
              ioExecutor,
              userCodeLoader,
              checkpointRecoveryFactory,
              Time.of(timeout.length, timeout.unit),
              restartStrategy,
              jobMetrics,
              numSlots,
              log.logger)
            
            if (registerNewGraph) { //如果是新的JobGraph,注册到currentJobs
              currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
            }
    
            // get notified about job status changes
            executionGraph.registerJobStatusListener( //jobmananger加到通知listeners
              new StatusListenerMessenger(self, leaderSessionID.orNull))
    
            jobInfo.clients foreach { //client加到通知listeners
              // the sender wants to be notified about state changes
              case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>
                val listener  = new StatusListenerMessenger(client, leaderSessionID.orNull)
                executionGraph.registerExecutionListener(listener)
                executionGraph.registerJobStatusListener(listener)
              case _ => // do nothing
            }
    
          } catch { //失败
            case t: Throwable =>
              log.error(s"Failed to submit job $jobId ($jobName)", t)
    
              libraryCacheManager.unregisterJob(jobId)
              currentJobs.remove(jobId)
    
              if (executionGraph != null) {
                executionGraph.fail(t) //fail executionGraph
              }
    
              val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
                t
              } else {
                new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
              }
    
              jobInfo.notifyClients(
                decorateMessage(JobResultFailure(new SerializedThrowable(rt)))) //通知提交失败
              return
          }
    
          //上面是准备executionGraph,下面是异步提交
          // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
          // because it is a blocking operation
          future {
            try {
              if (isRecovery) {
                // this is a recovery of a master failure (this master takes over)
                executionGraph.restoreLatestCheckpointedState(false, false) //加载checkpoint状态
              }
              else {
                // load a savepoint only if this is not starting from a newer checkpoint
                // as part of an master failure recovery
                val savepointSettings = jobGraph.getSavepointRestoreSettings
                if (savepointSettings.restoreSavepoint()) { //处理savePoint
                  try {
                    val savepointPath = savepointSettings.getRestorePath()
                    val allowNonRestored = savepointSettings.allowNonRestoredState()
    
                    log.info(s"Starting job from savepoint '$savepointPath'" +
                      (if (allowNonRestored) " (allowing non restored state)" else "") + ".")
    
                      // load the savepoint as a checkpoint into the system
                      val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
                        jobId,
                        executionGraph.getAllVertices,
                        savepointPath,
                        executionGraph.getUserClassLoader,
                        allowNonRestored)
    
                    executionGraph.getCheckpointCoordinator.getCheckpointStore
                      .addCheckpoint(savepoint)
    
                    // Reset the checkpoint ID counter
                    val nextCheckpointId: Long = savepoint.getCheckpointID + 1
                    log.info(s"Reset the checkpoint ID to $nextCheckpointId")
                    executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
                      .setCount(nextCheckpointId)
    
                    executionGraph.restoreLatestCheckpointedState(true, allowNonRestored)
                  } catch {
                    case e: Exception =>
                      jobInfo.notifyClients(
                        decorateMessage(JobResultFailure(new SerializedThrowable(e))))
                      throw new SuppressRestartsException(e)
                  }
                }
    
                try {
                  submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //存储该JobGraph到zk,ZooKeeperSubmittedJobGraphStore
                } catch {
                  case t: Throwable =>
                    // Don't restart the execution if this fails. Otherwise, the
                    // job graph will skip ZooKeeper in case of HA.
                    jobInfo.notifyClients(
                      decorateMessage(JobResultFailure(new SerializedThrowable(t))))
                    throw new SuppressRestartsException(t)
                }
              }
    
              jobInfo.notifyClients(
                decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知clients提交成功
    
              if (leaderElectionService.hasLeadership) {
                // There is a small chance that multiple job managers schedule the same job after if
                // they try to recover at the same time. This will eventually be noticed, but can not be
                // ruled out from the beginning.
    
                // NOTE: Scheduling the job for execution is a separate action from the job submission.
                // The success of submitting the job must be independent from the success of scheduling
                // the job.
                log.info(s"Scheduling job $jobId ($jobName).")
    
                executionGraph.scheduleForExecution(scheduler) //开始调度
              } else {
                // Remove the job graph. Otherwise it will be lingering around and possibly removed from
                // ZooKeeper by this JM.
                self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
    
                log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
                  "this. I am not scheduling the job for execution.")
              }
            } catch {
              case t: Throwable => try {
                executionGraph.fail(t)
              } catch {
                case tt: Throwable =>
                  log.error("Error while marking ExecutionGraph as failed.", tt)
              }
            }
          }(context.dispatcher)
        }
      }

    可以看到executionGraph在调度前就已经通知用户提交成功

     

    当job发生问题,需要调用到tryRestartOrFail

    private boolean tryRestartOrFail() {
            JobStatus currentState = state;
    
            if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
                synchronized (progressLock) { //
    
                    final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);
                    final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart(); //重启策略是否允许重启
                    boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;
    
                    if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
                        restartStrategy.restart(this);
    
                        return true;
                    } else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) { //如果不允许重启,就failed
                        final List<String> reasonsForNoRestart = new ArrayList<>(2);
                        if (!isFailureCauseAllowingRestart) {
                            reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
                        }
                        if (!isRestartStrategyAllowingRestart) {
                            reasonsForNoRestart.add("the restart strategy prevented it");
                        }
    
                        LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
                            StringUtils.join(reasonsForNoRestart, " and "), failureCause);
                        postRunCleanup();
    
                        return true;
                    } else {
                        // we must have changed the state concurrently, thus we cannot complete this operation
                        return false;
                    }
                }
            } else {
                // this operation is only allowed in the state FAILING or RESTARTING
                return false;
            }
        }

     

    有两处会调用到tryRestartOrFail

    1. ExecutionGraph.jobVertexInFinalState

    void jobVertexInFinalState() {
        synchronized (progressLock) {
            if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
                throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
            }
    
            numFinishedJobVertices++;
    
            if (numFinishedJobVertices == verticesInCreationOrder.size()) { //当所有的vertices都已经finished
    
                // we are done, transition to the final state
                JobStatus current;
                while (true) {
                    current = this.state;
    
                    if (current == JobStatus.RUNNING) {
                        if (transitionState(current, JobStatus.FINISHED)) {
                            postRunCleanup();
                            break;
                        }
                    }
                    else if (current == JobStatus.CANCELLING) {
                        if (transitionState(current, JobStatus.CANCELED)) {
                            postRunCleanup();
                            break;
                        }
                    }
                    else if (current == JobStatus.FAILING) {
                        if (tryRestartOrFail()) { //如果failing,调用tryRestartOrFail
                            break;
                        }
                        // concurrent job status change, let's check again
                    }


    2. 显式的调用到ExecutionGraph.fail

    } else if (current == JobStatus.RESTARTING) {
        this.failureCause = t;
    
        if (tryRestartOrFail()) {
            return;
        }
        // concurrent job status change, let's check again
    }

     

    上面调用到restartStrategy.restart(this);

    restartStrategy有很多种,我们先看看

    FixedDelayRestartStrategy

     

    @Override
        public void restart(final ExecutionGraph executionGraph) {
            currentRestartAttempt++;
            FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
        }

    异步的调用,ExecutionGraphRestarter.restartWithDelay

    最终调用到

    executionGraph.restart();
    public void restart() {
            try {
                synchronized (progressLock) {
                    this.currentExecutions.clear();
    
                    Collection<CoLocationGroup> colGroups = new HashSet<>();
    
                    for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
    
                        CoLocationGroup cgroup = jv.getCoLocationGroup();
                        if(cgroup != null && !colGroups.contains(cgroup)){
                            cgroup.resetConstraints();
                            colGroups.add(cgroup);
                        }
    
                        jv.resetForNewExecution();
                    }
    
                    for (int i = 0; i < stateTimestamps.length; i++) {
                        if (i != JobStatus.RESTARTING.ordinal()) {
                            // Only clear the non restarting state in order to preserve when the job was
                            // restarted. This is needed for the restarting time gauge
                            stateTimestamps[i] = 0;
                        }
                    }
                    numFinishedJobVertices = 0;
                    transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
    
                    // if we have checkpointed state, reload it into the executions
                    if (checkpointCoordinator != null) {
                        checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
                    }
                }
    
                scheduleForExecution(slotProvider); //加入schedule
            }
            catch (Throwable t) {
                LOG.warn("Failed to restart the job.", t);
                fail(t);
            }
        }

     

    关于重启策略,

    参考https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html

    If checkpointing is not enabled, the “no restart” strategy is used. If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE restart attempts.

     

    StreamingJobGraphGenerator
    private void configureCheckpointing() {
            CheckpointConfig cfg = streamGraph.getCheckpointConfig();
    
            long interval = cfg.getCheckpointInterval();
            if (interval > 0) {
                // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
                if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                    // if the user enabled checkpointing, the default number of exec retries is infinite.
                    streamGraph.getExecutionConfig().setRestartStrategy(
                        RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
                }
            }
    当打开checkpoint的时候,默认是使用fixedDelayRestart,并Integer.MAX_VALUE次重启
  • 相关阅读:
    Android图像格式类及图像转换方法
    Java实现文件的加密与解密
    Google最新截屏案例详解
    Android应用程序模拟手机按键
    Android浮动小球与开机自启动
    Android手机截屏
    Android图片浏览器之图片删除
    Android图片浏览器之缩略图
    MFC实现Gif动画制作工具
    QRadioButton分组且无边框的简单实现
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6386201.html
Copyright © 2011-2022 走看看