zoukankan      html  css  js  c++  java
  • Spark作业执行原理(四)——提交任务

    首先熟悉一下该阶段的一些重要方法的调用关系图:

            在上一篇提交调度阶段中,提到:当该阶段不存在父调度阶段时,就会调用DAGScheduler的submitMissingTasks方法。这个方法就是触发任务的提交。在方法中,会根据调度阶段Partition个数拆分对应的个数的任务,一个partition对应一个task,每一个stage里的所有task组成一个TaskSet,将会被提交到TaskScheduler进行处理。对于ResultStage,生成ResultTask;对于ShuffleMapStage,生成ShuffleMapTask。DAGScheduler的submitMissingTasks方法的部分源码:

    private def submitMissingTasks(stage: Stage, jobId:Int){
        ...
        //生成TaskSet对象
        val tasks:Sqg[Task[_]] = try{
            stage match{
                //对于ShuffleMapStage,生成ShuffleMapTask
                case stage: ShuffleMapStage =>
                    partitionsToCompute.map{ id =>
                        val locs = taskIdToLocations(id)
                        val part = stage.rdd.partitions(id)
                        new ShuffleMapTask(stage.id, stage.latesInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators)
                    }
                //对于ResultStage,生成ResultTask
                case stage: ResultStage =>
                    val job = stage.resultOfJob.get
                    partitionsToCompute.map{id =>
                        val p: Int = job.partitions(id)
                        val part = stage.rdd.partitions(p)
                        val locs = taskIdToLocations(id)
                        new ResultTask(stage.id, stage.latesInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators)
                    }
            }
        }catch{...}
     
        if(tasks.size > 0){
            //将tasks以任务集TaskSet的方式提交给TaskScheduler
            stage.pendingPartitions ++= tasks.map(_.partitionId)
            //TaskScheduler引用(指向TaskSchedulerImpl实例)调用submitTasks方法
            taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
            stage.lastestInfo.submissionTime = Some(clock.getTimeMillis())
        }else{
            //如果调度阶段中不存在任务标记,则表示该调度阶段已经完成
            markStageAsFinished(stage, None)
            ...
        }
    }

            在submitMissingTasks方法中,做了两件事:

    1. 根据Stage的不同,分别生成ShuffleMapTask和ResultTask;
    2. 将生成的Tasks以TaskSet的形式发送给TaskScheduler进行处理;

            进入TaskScheduler的submitTasks方法(具体有TaskSchedulerImpl实现)中,构建一个TaskSetManager实例,用于管理整个TaskSet的生命周期,而该TaskSetManager会被放到系统的调度池中,根据系统设置的调度算法进行调度。

    TaskSchedulerImpl的submitTasks方法部分源码:

    override def submitTasks(taskSet: TaskSet){
        val tasks = taskSet.tasks
        this.synchronized{
            //创建TaskSetManager实例,使用了同步限制
            val manager = createTaskSetManager(taskSet, maxTaskFailures)
            val stage = taskSet.stageId
            val stageTaskSets = taskSetByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
            stageTaskSets(taskSet.stageAttemptId) = manager
        }
     
        val conflictingTaskSet = stageTaskSets.exists{case (_, ts) => 
            ts.taskSet != taskSet && !ts.isZombie
        }
     
        //将TaskSetManager放进调度池中,由系统统一调配,因为TaskSetManager属于应用级别,所以支持两种调度机制:FIFO和FAIR
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
        ...
     
        //调用调度器后台进程SparkDeploSchedulerBackend的reviveOffers方法,进行资源分配的一些操作
        //SparkDeploSchedulerBackend是DriverEndpoint的进程
        backend.reviveOffers()
    }

            SparkDeploySchedulerBackend的reviveOffers方法继承于CoarseGrainedSchedulerBackend,该方法会向DriverEndpoint终端点发送消息,调用CoarseGrainedSchedulerBackend的makeOffers方法。在makeOffers方法中做了三件事:

    1. 获取收集集群中可用的Executor;
    2. 将Executor发送给TaskScheduler,进行资源的分配;
    3. 等待资源分配完成,提交到launchTasks方法中。

    CoarseGrainedSchedulerBackend的makeOffers的部分源码:

    private def makeOffers(){
        //收集集群中可用的Executor
        val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
        
        val workOffers = activeExecutors.map{case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
        }.toSeq
     
        //调用resourceOffers,对资源进行分配,并将返回值提交给launchTasks
        launchTasks(scheduler.resourceOffers(workOffers))
    }

            TaskSchedulerImpl的resourceOffers方法有一个很重要的步骤——资源分配,分配过程中,会根据调度策略对TaskSetManger进行排序(参考:https://www.cnblogs.com/SysoCjs/p/11357009.html,然后依次对这些TaskSetManger按照就近原则分配资源,顺序依次为:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,PACK_LOCAL和ANY。resourceOffers部分源码:

    def resourceOffers(offers:Seq[WorkerOffer]) : Seq[Seq[TaskDescripetion]] = 
        synchronized{
            //标记变量,用于标记是否有新的Executor加入
            var newExecAvail = false
            //记录传入的Executor信息
            for(o <- offers){
                executorIdToHost(o.executorId) = o.host
                executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
                if(!executorsByHost.contains(o.host)){
                    executorsByHost(o.host) = new HashSet[String]()
                    executorAdded(o.executorId, o.host)
                    newExecAvail = true
                }
            }
            for(rack <- getRackForHost(o.host)){
                hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
            }
     
            //将任务随机分配Executor
            val shuffledOffers = Random.shuffle(offers)
     
            //用于存储已经分配好资源的任务
            val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
            val availableCpus = shuffledOffers.map(o => o.cores).toArray
     
            //获取按照调度策略排序好的TaskSetManager
            val sortedTaskSets = rootPool.getSortedTaskSetQueue  //使用调度排序算法
     
            //如果有新加入的Executor,需要重新计算数据本地性
            for(taskSet <- sortedTaskSets){
                if(newExecAvail){
                    taskSet.executorAdded()
                }
            }
            
            //为排好的TaskSetManager列表进行资源分配,分配原则:就近原则
            val launchedTask = false
            for(taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels){
                do{
                    launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
                }while(launchedTask)
            }
            if(tasks.size > 0){
                hasLaunchedTask = true
            }
     
            //返回
            return tasks
        }

            最后,CoarseGrainedSchedulerBackend的launchTasks方法将任务一个个发送到Worker节点上的CoarseGrainedExecutorBackend,通过Executor来执行任务。

    提交调度阶段中任务的运行顺序:

    1. 第一次调度的是ShuffleMapStage0和ShuffleMapStage1,调度阶段发生在DAGScheduler的submitMissingTasks方法中,根据partition个数拆分任务。假设每个Stage都有两个Partition,那么ShuffleMapStage0的TaskSet0可以表示为:ShuffleMapStage(0,0)和ShuffleMapStage(0,1)的集合,ShuffleMapStage1同理;
    2. TaskScheduler收到两个任务集TaskSet0和TaskSet1,在submitTasks中,分别创建TaskSetManager0和TaskManager1,对任务集进行管理,并将TaskSetManager放进系统的调度池中;
    3. 接着到了TaskScheduler的resourceOffers对任务进行资源分配,到该步骤每个任务均分配到运行代码、数据分片和资源等,借助launchTasks方法将任务分发到Worker节点去执行;
    4. 第一次调度执行完毕,依次到了ShuffleMapStage2和ResultStage3,步骤跟上面三步一样,不同的是,ResultStage3生成的任务类型是ResultTask。
  • 相关阅读:
    HDU-2037 今年暑假不AC
    HDU-3348 coins
    CodeForces-985C Liebig's Barrels
    CSU-2034 Column Addition
    -----------------******Java API提供了几个常用包:
    DOM4j解析、修改、删除、增加、保存XML的方法
    Dom解析XML(添加,删除,修改,保存)
    ------------------------------------日期之间的转换
    -----------------------------------String类的常用方法
    ------------ 异常笔记
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11357026.html
Copyright © 2011-2022 走看看