zoukankan      html  css  js  c++  java
  • 一个Spark job的生命历程

    一个job的生命历程
    dagScheduler.runJob //(1)
    --> submitJob ( eventProcessLoop.post(JobSubmitted,***) //(2)
        --> eventProcessLoop //(3)
            --> onReceive(event: DAGSchedulerEvent) //(4)
                --> doOnReceive(event: DAGSchedulerEvent) //(5)
                    --> case JobSubmitted //(6)
                        --> dagScheduler.handleJobSubmitted //(7)
                            --> finalStage =createResultStage(finalRDD, func, partitions, jobId, callSite) //(8)    
                            --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9)
                            --> jobIdToActiveJob(jobId) = job //(10)
                            --> activeJobs += job //(11)
                            --> finalStage.setActiveJob(job) //(12)
                            --> stageIds = jobIdToStageIds(jobId).toArray //(13)
                            --> stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //(14)
                            --> listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //(15)
                            --> submitStage(finalStage) //(16)
                                --> getMissingParentStages(stage).sortBy(_.id) //(17)
                                    --> finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18)
                                        --> createShuffleMapStage(dep, firstJobId) //(19)
                                            -->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
                                    --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20)                    
                                    --> submitStage(finalStage)  //(21)//划分和提交stage算法精髓
                                        --> submitMissingTasks(stage, jobId.get) //(22)
                                            --> submitWaitingChildStages(stage) //(23)
                                    --> markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))  //(24)
    (1)所有的action算子都会触发一个job的调度,经过多次不同的runjob重载后停在这里调度 submitJob
    (2)调用eventProcessLoop方法,并发送 JobSubmitted 消息给DAGSchedulerEventProcessLoop(DAGScheduler的循环响应函数体)
    (3)eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
    (4)onReceive 函数是接受 DAGSchedulerEventProcessLoop DAG调度程序的事件接受函数
    (5)doOnReceive 实际是步骤4的事件处理函数
    (6)根据步骤2的发送事件,触发 JobSubmitted 这个事件响应
    (7)dagScheduler 的核心入口
    (8)使用触发的job的最后一个RDD创建一个 finalstage,并且放入内存缓存中 stageIdToStage
    (9)使用 finalStage 创建一个job。这个job最后一个stage就是final stage
    (10)(11)(12)(13)(14)(15)把 job 加入各种内存缓存中,其实就是各个数据结构
    (16)提交fianlStage。总是从最后开始往前推测。
    (17)获取当前stage的父stage。stage的划分算法,主要在这里。waitingForVisit = new Stack[RDD[_]]。栈结构,从最后的stage往前的stage 放进栈中,实现先进后出。符合程序调用顺序。
    (18)获取最后一个stage,finalstage
    (19)生成一个 ShuffleMapStage
    (20)利用finalestage 生成一个job
    (21)划分和提交stage算法精髓,划分好stage之后全部放在waiting stage 数据结构中
    (22)提交所有在 waiting stage 中的stage,从stage0...finalstage
    (23)检查等待的阶段,现在有资格重新提交。提交依赖于给定父级阶段的阶段。当父阶段完成时调用成功
    (24)所有的stage划分完并提交结束
    ------------------------------------------------------------------------------
    stage划分算法非常重要,精通spark,必须对stage划分算法很清晰,知道自己编写的spark程序被划分为几个job,每个job被划分为几个stage,
    每个stage包含了哪些代码,只有知道每个stage包括哪些代码后。在线上,如果发现某个stage执行特别慢,或者某个stage一直报错,才能针对
    特定的stage包含的代码排查问题,或性能调优。
    stage划分算法总结:
    1.从finalstage倒推(通过 栈 数据结构实现)
    2.通过宽依赖,进行stage的划分
    3.通过递归,优先提交父stage
    ------------------------------------------------------------------------------
    /**
    * 获取某个stage的父stage
    * 对于一个stage,如果它的最后一个RDD的所有依赖都是窄依赖,将不会创建新的stage
    * 如果其RDD会依赖某个RDD,用宽依赖的RDD创建一个新的stage,并立即返回这个stage
    * @type {[type]}
    */
    private def getMissingParentStages(stage: Stage): List[Stage] = {
        val missing = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        val waitingForVisit = new Stack[RDD[_]]
        
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
            if (rddHasUncachedPartitions) {
                //遍历RDD的依赖,对于每种具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底层对应了3个RDD:
                //Map
              for (dep <- rdd.dependencies) {
                dep match {
                    //如果是宽依赖
                  case shufDep: ShuffleDependency[_, _, _] =>
                      //使用宽依赖的RDD创建一个 ShuffleMapStage,并且将isShuffleMap 设置为true,
                      //默认最后一个stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage
                    val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                    if (!mapStage.isAvailable) {
                      missing += mapStage
                    }
                  
                    //如果是窄依赖
                  case narrowDep: NarrowDependency[_] =>
                  //将依赖的RDD放入栈中
                    waitingForVisit.push(narrowDep.rdd)
                }
              }
            }
          }
        }
        //
        waitingForVisit.push(stage.rdd)
        while (waitingForVisit.nonEmpty) {
        //
          visit(waitingForVisit.pop())
        }
        missing.toList
      }
     
    -------------------------------------------------------------------------------------------------------------------------------
    taskScheduler
    -->taskSchedulerImpl (standalone模式)
        -->SparkDeploySchedulerBackend (负责创建AppClient, 向master注册Application)
    在TaskSchedulerImpl中,对一个单独的taskset的任务进行调度.这个类负责追踪每一个taskset,如果task失败的话
    会负责重试spark,直到超过重试次数,并且会通知延迟调度,为这个taskSet处理本地化机制.它的主要接口是
    resourceOffer,在这个接口中,taskset会希望在一个节点上运行一个任务,并且接受任务的状态改变消息,
    来知道它负责的task的状态改变了.
    override def submitTasks(taskSet: TaskSet) {
        val tasks = taskSet.tasks //获取ttaskSet的task列表
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          //每个taskSet都会创建一个manager,用于管理每个taskSet,并设定最大失败次数 maxTaskFailures
          val manager = createTaskSetManager(taskSet, maxTaskFailures)
          val stage = taskSet.stageId
          //尝试连接task,如果task失败,会负责重试spark,直到超过重试次数,并且会通知延迟调度
          val stageTaskSets =
            taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
          //通过 manager 获得活着的taskSet
          stageTaskSets(taskSet.stageAttemptId) = manager
          val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
            ts.taskSet != taskSet && !ts.isZombie
          }
          if (conflictingTaskSet) {
            throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
              s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
          }
          //利用已选择的调度器schedulableBuilder,把一个taskSet的manager加入调度管理池中
          /*
          def initialize(backend: SchedulerBackend) {
            this.backend = backend
            schedulableBuilder = {
              schedulingMode match {
                case SchedulingMode.FIFO =>
                  new FIFOSchedulableBuilder(rootPool)
                case SchedulingMode.FAIR =>
                  new FairSchedulableBuilder(rootPool, conf)
                case _ =>
                  throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
                  s"$schedulingMode")
              }
            }
            schedulableBuilder.buildPools()
          }*/
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
          
          if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
              override def run() {
                if (!hasLaunchedTask) {
                  logWarning("Initial job has not accepted any resources; " +
                    "check your cluster UI to ensure that workers are registered " +
                    "and have sufficient resources")
                } else {
                  this.cancel()
                }
              }
            }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
          }
          hasReceivedTask = true
        }
        /**
          * 创建 taskScheduler 的时候,就是为 taskSchedulerImpl 创建一个 SparkDeploySchedulerBackend .
          * 它负责创建AppClient,向master注册Application
          */
        backend.reviveOffers()
      }
  • 相关阅读:
    【原创】(九)Linux内存管理
    【原创】(八)Linux内存管理
    【原创】(六)Linux内存管理
    【原创】(四)Linux内存模型之Sparse Memory Model
    2019年总结
    被低估的.NET(下)-2019 中国.NET 开发者峰会
    《.NET内存管理宝典》阅读指南
    《 .NET并发编程实战》扩展阅读
    《 .NET并发编程实战》阅读指南
    《 .NET并发编程实战》阅读指南
  • 原文地址:https://www.cnblogs.com/liangjf/p/8322420.html
Copyright © 2011-2022 走看看