zoukankan      html  css  js  c++  java
  • Spark中的Scheduler

    Spark中的Scheduler

    scheduler分成两个类型。一个是TaskScheduler与事实上现,一个是DAGScheduler

    TaskScheduler:主要负责各stage中传入的task的运行与调度。

    DAGScheduler:主要负责对JOB中的各种依赖进行解析,依据RDD的依赖生成stage并通知TaskScheduler运行。

    实例生成

    TaskScheduler实例生成:

    scheduler实例生成,我眼下主要是针对onyarnspark进行的相关分析,

    appmaster启动后,通过调用startUserClass()启动线程来调用用户定义的spark分析程序。

    传入的第一个參数为appmastername(master),可传入的如:yarn-cluster等。

    在用户定义的spark分析程序中。生成SparkContext实例。

    通过SparkContext.createTaskScheduler函数。假设是yarn-cluster,生成YarnClusterScheduler实例。

    此部分生成的schedulerTaskScheduler实例。

    defthis(sc:SparkContext) = this(sc,newConfiguration())

    同一时候YarnClusterSchduler实现TaskSchedulerImpl

    defthis(sc:SparkContext) = this(sc,sc.conf.getInt("spark.task.maxFailures",4))

    生成TaskScheduler中的SchedulerBackend属性引用,yarn-clusterCoarseGrainedSchedulerBackend

    valbackend =newCoarseGrainedSchedulerBackend(scheduler,sc.env.actorSystem)

    scheduler.initialize(backend)



    DAGScheduler实例生成:

    classDAGScheduler(

    taskSched: TaskScheduler,

    mapOutputTracker:MapOutputTrackerMaster,

    blockManagerMaster:BlockManagerMaster,

    env: SparkEnv)

    extendsLogging {


    defthis(taskSched:TaskScheduler){

    this(taskSched,SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],

    SparkEnv.get.blockManager.master,SparkEnv.get)

    }

    taskSched.setDAGScheduler(this)


    scheduler调度过程分析

    1.rdd运行action操作。如saveAsHadoopFile

    2.调用SparkContext.runJob

    3.调用DAGScheduler.runJob-->此函数调用submitJob,并等job运行完毕。

    Waiter.awaitResult()中通过_jobFinished检查job执行是否完毕,假设完毕,此传为true,否则为false.

    _jobFinished的值通过resultHandler函数,每调用一次finishedTasks的值加一,

    假设finishedTasks的个数等于totalTasks的个数时,表示完毕。

    或者出现exception.

    defrunJob[T, U: ClassTag](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T])=> U,

    partitions: Seq[Int],

    callSite: String,

    allowLocal: Boolean,

    resultHandler: (Int, U) =>Unit,

    properties: Properties = null)

    {

    valwaiter =submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler,properties)

    waiter.awaitResult()match{

    caseJobSucceeded => {}

    caseJobFailed(exception:Exception, _) =>

    logInfo("Failedto run " + callSite)

    throwexception

    }

    }


    4.调用DAGScheduler.submitJob函数,

    部分代码:生成JobWaiter实例,并传入此实例。发送消息。调用JobSubmitted事件。并返回waiter实例。

    JobWaiterJobListener的实现。

    valwaiter =newJobWaiter(this,jobId,partitions.size, resultHandler)

    eventProcessActor! JobSubmitted(

    jobId,rdd, func2,partitions.toArray, allowLocal, callSite, waiter,properties)

    waiter


    5.处理DAGSchedulerJobSubmitted事件消息,通过processEvent处理消息接收的事件。

    defreceive = {

    caseevent:DAGSchedulerEvent =>

    logTrace("Gotevent of type " +event.getClass.getName)

    if(!processEvent(event)){

    submitWaitingStages()

    } else{

    resubmissionTask.cancel()

    context.stop(self)

    }

    }

    }))


    6.processEvent函数中处理JobSubmitted事件部分代码:

    caseJobSubmitted(jobId,rdd, func,partitions,allowLocal,callSite,listener,properties)=>

    varfinalStage:Stage = null

    try{

    生成stage实例,stageid通过nextStageId的值加一得到,task的个数就是partitions的分区个数,

    依据job相应的rdd,得到假设parentrddshufflerdd时生成ShuffleMapStage,通过getParentStages函数,

    此处去拿到parentrdd时,假设currentrddparentrdd不是shuffle,递归调用parentrdd,

    假设parendrdd中没有shufflerdd,不生成新的stage,否则有多少个,生成多少个。

    此处是处理DAG类的依赖

    finalStage= newStage(rdd,partitions.size,None, jobId,Some(callSite))

    } catch{

    casee:Exception =>

    logWarning("Creatingnew stage failed due to exception - job: "+ jobId, e)

    listener.jobFailed(e)

    returnfalse

    }

    生成ActiveJob实例。

    设置numFinished的值为0,表示job中有0个完毕的task.

    设置全部task个数的arrayfinished.并把全部元素的值设置为false.JobWaiterlistener传入ActiveJob.

    valjob = newActiveJob(jobId,finalStage,func,partitions,callSite,listener,properties)


    对已经cache过的TaskLocation进行清理。

    clearCacheLocs()

    logInfo("Gotjob " + job.jobId+ " ("+ callSite+ ") with "+ partitions.length+

    "output partitions (allowLocal=" +allowLocal+ ")")

    logInfo("Finalstage: " + finalStage+ " ("+ finalStage.name+ ")")

    logInfo("Parentsof final stage: " +finalStage.parents)

    logInfo("Missingparents: " +getMissingParentStages(finalStage))

    假设runJob时传入的allowLocal的值为true,同一时候没有须要shufflerdd,同一时候partitions的长度为1

    也就是task仅仅有一个,直接在local执行此job..通过runLocallyWithinThread生成一个线程来执行。

    if(allowLocal&& finalStage.parents.size== 0 &&partitions.length== 1) {

    //Compute very short actions like first() or take() with no parentstages locally.

    listenerBus.post(SparkListenerJobStart(job,Array(), properties))

    通过ActiveJob中的func函数来执行job的执行,此函数在rddaction调用时生成定义。

    saveAsHadoopFile(saveAsHadoopDataset)中的定义的内部func,writeToFile函数。

    完毕函数运行后,调用上面提到的生成的JobWaiter.taskSucceeded函数。

    runLocally(job)

    } else{

    否则有多个partition也就是有多个task,或者有shuffle的情况,

    idToActiveJob(jobId)= job

    activeJobs+= job

    resultStageToJob(finalStage)= job

    listenerBus.post(SparkListenerJobStart(job,jobIdToStageIds(jobId).toArray,properties))

    调用DAGScheduler.submitStage函数。

    submitStage(finalStage)

    }


    7.DAGScheduler.submitStage函数:递归函数调用,

    假设stage包括parentstage(shuffle的情况)stage设置为waiting状态。等待parentstage运行完毕才进行运行。

    privatedefsubmitStage(stage: Stage) {

    valjobId =activeJobForStage(stage)

    if(jobId.isDefined){

    logDebug("submitStage("+ stage + ")")

    假设RDDDependencyRDD还没有运行完毕,等待Dependency运行完毕后当前的RDD才干进行运行操作。

    if(!waiting(stage)&& !running(stage)&& !failed(stage)){

    依据stagerddDependency,检查是否须要生成新的stage,假设是ShuffleDependency,会生成新的ShuffleMapStage

    此处去拿到parentrdd时,假设currentrddparentrdd不是shuffle,递归调用parentrdd,

    假设parendrdd中没有shufflerdd,不生成新的stage,否则有多少个,生成多少个。此处是处理DAG类的依赖

    valmissing =getMissingParentStages(stage).sortBy(_.id)

    logDebug("missing:" + missing)

    假设没有RDD中的shuffleDependency,也就是RDD之间都是NarrowDependencyDependency

    表示全部的Dependency都在map端本地运行。

    if(missing ==Nil) {

    logInfo("Submitting" + stage + "(" + stage.rdd+ "), which has no missingparents")

    submitMissingTasks(stage,jobId.get)

    running+= stage

    } else{

    假设RDDDependency,先运行parentrddstage操作。此处是递归函数调用

    for(parent <-missing) {

    submitStage(parent)

    }

    waiting+= stage

    }

    }

    }else{

    abortStage(stage, "Noactive job for stage " + stage.id)

    }

    }


      8.DAGScheduler.submitMissingTask的运行流程:

      privatedefsubmitMissingTasks(stage: Stage, jobId: Int) {

    logDebug("submitMissingTasks("+ stage + ")")

    //Get our pending tasks and remember them in our pendingTasks entry

    valmyPending =pendingTasks.getOrElseUpdate(stage,newHashSet)

    myPending.clear()

    vartasks =ArrayBuffer[Task[_]]()

    假设stageshufflerdd,迭代stage下的的全部partition,依据partition与相应的TaskLocation

    生成ShuffleMapTask.加入到task列表中。

    if(stage.isShuffleMap){

    for(p <- 0until stage.numPartitionsifstage.outputLocs(p)== Nil) {

    vallocs =getPreferredLocs(stage.rdd,p)

    tasks+= newShuffleMapTask(stage.id,stage.rdd,stage.shuffleDep.get,p, locs)

    }

    }else{

    否则表示stage是非shufflerdd,此是是运行完毕后直接返回结果的stage,生成ResultTask实例。

    因为是ResultTask,因此须要传入定义的func,也就是怎样处理结果返回

    //This is a final stage; figure out its job's missing partitions

    valjob =resultStageToJob(stage)

    for(id <- 0until job.numPartitionsif!job.finished(id)){

    valpartition =job.partitions(id)

    vallocs =getPreferredLocs(stage.rdd,partition)

    tasks+= newResultTask(stage.id,stage.rdd,job.func,partition,locs, id)

    }

    }


    valproperties= if(idToActiveJob.contains(jobId)){

    idToActiveJob(stage.jobId).properties

    }else{

    //thisstage will be assigned to "default" pool

    null

    }


    //must be run listener before possible NotSerializableException

    //should be "StageSubmitted" first and then "JobEnded"

    listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage),properties))


    if(tasks.size> 0) {

    //Preemptively serialize a task to make sure it can be serialized. Weare catching this

    //exception here because it would be fairly hard to catch thenon-serializableexception

    //down the road, where we have several different implementations forlocal scheduler and

    //cluster schedulers.

    try{

    SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)

    } catch{

    casee:NotSerializableException =>

    abortStage(stage, "Tasknot serializable: " + e.toString)

    running-= stage

    return

    }


    logInfo("Submitting" + tasks.size+ " missing tasks from "+ stage + " ("+ stage.rdd+ ")")

    myPending++= tasks

    logDebug("Newpending tasks: " + myPending)

    生成TaskSet实例。把stage中要运行的Task列表传入。同一时候把stage相应的ActiveJob也传入。

    通过TaskScheduler的实现。调用submitTasks函数,YarnClusterScheduler(TaskSchedulerImpl)

    taskSched.submitTasks(

    newTaskSet(tasks.toArray,stage.id,stage.newAttemptId(), stage.jobId,properties))

    stageToInfos(stage).submissionTime= Some(System.currentTimeMillis())

    }else{

    logDebug("Stage" + stage + "is actually done; %b %d %d".format(

    stage.isAvailable,stage.numAvailableOutputs,stage.numPartitions))

    running-= stage

    }

    }


    9.TaskSchedulerImpl.submitTasks函数流程分析:

    通过传入的TaskSet,得到要运行的tasks列表。并生成TaskSetmanager实例。

    同一时候把实例加入到的schedulableBuilder(FIFOSchedulableBuilder/FairSchedulableBuilder)队列中。

    关于TaskSetManager实例可參见后面的分析。

    overridedefsubmitTasks(taskSet: TaskSet) {

    valtasks =taskSet.tasks

    logInfo("Addingtask set " + taskSet.id+ " with "+ tasks.length+ " tasks")

    this.synchronized{

    valmanager =newTaskSetManager(this,taskSet, maxTaskFailures)

    activeTaskSets(taskSet.id)= manager

    schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)

    taskSetTaskIds(taskSet.id)= newHashSet[Long]()

    定期检查task的运行消息是否被生成运行。假设task被分配运行,关闭此线程。

    否则一直给出提示.

    if(!isLocal && !hasReceivedTask){

    starvationTimer.scheduleAtFixedRate(newTimerTask() {

    overridedefrun() {

    if(!hasLaunchedTask){

    logWarning("Initialjob has not accepted any resources; "+

    "checkyour cluster UI to ensure that workers are registered "+

    "andhave sufficient memory")

    } else{

    this.cancel()

    }

    }

    }, STARVATION_TIMEOUT,STARVATION_TIMEOUT)

    }

    hasReceivedTask= true

    }

    通过SchedulerBackend的实现CoarseGrainedSchedulerBackend.reviceOffers发起运行处理操作。

    backend.reviveOffers()

    }


    9.1TaskSetManager的实例生成:

    private[spark]classTaskSetManager(

    sched: TaskSchedulerImpl,

    valtaskSet:TaskSet,

    valmaxTaskFailures:Int,

    clock: Clock = SystemClock)

    extendsSchedulablewithLogging

    ...........................

    for(i <- (0until numTasks).reverse){

    addPendingTask(i)

    }

    关于addPendingTask的定义:此睦传入的readding的值为false.


    privatedefaddPendingTask(index: Int, readding: Boolean = false){

    //Utility method that adds `index` to a list only if readding=falseor it's not already there

    内部定义的addTo方法。

    defaddTo(list:ArrayBuffer[Int]) {

    if(!readding || !list.contains(index)) {

    list += index

    }

    }


    varhadAliveLocations= false

    迭代全部的要运行的task,并通过taskTaskLocation检查运行的节点级别。加入到对应的pendingTask容器中

    for(loc <-tasks(index).preferredLocations){

    for(execId <-loc.executorId){

    检查TaskSchedulerImpl.activeExecutorIds的活动的workerexecutor是否存在,

    假设是第一个运行的RDD时,此时activeExecutorIds容器的的值为空,当第一个RDD中有TASK在此executor中运行过后。

    会把executorid加入到activeExecutorIds容器中。

    第一个RDDstage运行时,此部分不运行。但第二个stage运行时,可最大可能的保证taskPROCESS_LOCAL的运行。

    if(sched.isExecutorAlive(execId)){

    addTo(pendingTasksForExecutor.getOrElseUpdate(execId,newArrayBuffer))

    hadAliveLocations= true

    }

    }

    if(sched.hasExecutorsAliveOnHost(loc.host)){


    假设在TaskSchedulerImplexecutorsByHost容器中包括此host,pendingTasksForHost中加入相应的task.

    TaskSchedulerImpl.executorsByHost容器的值在每个worker注冊时

    通过向CoarseGrainedSchedulerBackend.DriverActor发送RegisterExecutor事件消息。

    通过makeOffers()-->TaskSchedulerImpl.resourceOffershost加入到executorsByHost容器中。


    addTo(pendingTasksForHost.getOrElseUpdate(loc.host,newArrayBuffer))


    通过调用YarnClusterScheduler.getRackForHost得到host相应的rack,

    并在rackpending容器中加入相应的task个数和。


    for(rack <-sched.getRackForHost(loc.host)){

    addTo(pendingTasksForRack.getOrElseUpdate(rack,newArrayBuffer))

    }

    hadAliveLocations= true

    }

    }

    假设上面两种情况都没有加入到容器中pendingTasksWithNoPrefs

    if(!hadAliveLocations){

    //Even though the task might've had preferred locations, all of thosehosts or executors

    //are dead; put it in the no-prefslist so we can schedule it elsewhere right away.

    addTo(pendingTasksWithNoPrefs)

    }

    TaskSetManager实例生成是,把全部task的个数都加入到allPendingTasks容器中

    if(!readding) {

    allPendingTasks+= index // No point scanning thiswhole list to find the old task there

    }

    }


    .............................

    得到可选择的LocalityLevel级别。

    valmyLocalityLevels= computeValidLocalityLevels()

    vallocalityWaits= myLocalityLevels.map(getLocalityWait)// Time to wait at each level

    下面代码是computeValidLocalityLevels的定义,主要依据各种localitypending的容器中是否有值。

    生成当前stage中的task运行可选择的Locality级别。

    privatedefcomputeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {

    importTaskLocality.{PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL,ANY}

    vallevels =newArrayBuffer[TaskLocality.TaskLocality]

    if(!pendingTasksForExecutor.isEmpty&& getLocalityWait(PROCESS_LOCAL)!= 0) {

    levels+= PROCESS_LOCAL

    }

    if(!pendingTasksForHost.isEmpty&& getLocalityWait(NODE_LOCAL)!= 0) {

    levels+= NODE_LOCAL

    }

    if(!pendingTasksForRack.isEmpty&& getLocalityWait(RACK_LOCAL)!= 0) {

    levels+= RACK_LOCAL

    }

    levels+= ANY

    logDebug("Validlocality levels for " + taskSet+ ": "+ levels.mkString(","))

    levels.toArray

    }

    }

    下面代码是getLocalityWait的定义代码:此函数主要是定义每个Task在此Locality级别中运行的等待时间。

    也就是scheduler调度在传入的Locality级别时所花的时间是否超过指定的等待时间,

    假设超过表示须要放大Locality的查找级别。

    privatedefgetLocalityWait(level: TaskLocality.TaskLocality): Long = {

    valdefaultWait= conf.get("spark.locality.wait","3000")

    level match{

    caseTaskLocality.PROCESS_LOCAL=>

    conf.get("spark.locality.wait.process",defaultWait).toLong

    caseTaskLocality.NODE_LOCAL=>

    conf.get("spark.locality.wait.node",defaultWait).toLong

    caseTaskLocality.RACK_LOCAL=>

    conf.get("spark.locality.wait.rack",defaultWait).toLong

    caseTaskLocality.ANY=>

    0L

    }

    }


    10.SchedulerBackend.reviveOffers()的调度处理流程:

    SchedulerBackend的实现为CoarseGrainedSchedulerBackend

    overridedefreviveOffers() {

    driverActor! ReviveOffers

    }

    以上代码发CoarseGrainedSchedulerBackend内部的DriverActor发送消息,处理ReviveOffers事件。

    caseReviveOffers =>

    makeOffers()

    ................

    defmakeOffers() {

    见以下的launchTasksresourceOffers函数

    launchTasks(scheduler.resourceOffers(

    executorHost.toArray.map{case(id, host)=> newWorkerOffer(id,host,freeCores(id))}))

    }

    调用TaskSchedulerImpl.resourceOffers并传入注冊的workerexecutoridhostkvarray.


    defresourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] =synchronized {

    SparkEnv.set(sc.env)


    //Mark each slave as alive and remember its hostname

    for(o <-offers) {

    executorIdToHost(o.executorId)= o.host

    此部分主要是在worker注冊时executorsByHost中还不存在时会运行。

    if(!executorsByHost.contains(o.host)){

    executorsByHost(o.host)= newHashSet[String]()

    executorGained(o.executorId,o.host)

    }

    }

    offers表示有多少个注冊的workerexecutor,依据每个worker中可能的cpucore个数生成可运行的task个数。

    //Build a list of tasks to assign to each worker

    valtasks =offers.map(o => newArrayBuffer[TaskDescription](o.cores))

    可分配的cpu个数,由此处能够看出每个任务分配时最好按每个worker能分配的最大cpucore个数来分配。

    valavailableCpus= offers.map(o => o.cores).toArray

    得到队列中的全部的TaskSetManager列表。

    valsortedTaskSets= rootPool.getSortedTaskSetQueue()

    for(taskSet <-sortedTaskSets){

    logDebug("parentName:%s, name: %s, runningTasks: %s".format(

    taskSet.parent.name,taskSet.name,taskSet.runningTasks))

    }


    计算taskLocality级别,launchedTask=false表示须要放大Locality的级别。

    //Take each TaskSet in our scheduling order, and then offer it eachnode in increasing order

    //of locality levels so that it gets a chance to launch local tasks onall of them.

    varlaunchedTask= false

    计算taskLocality,此处是一个for的迭代调用。先从taskset列表中拿出一个tasetset,

    子迭代是从PROCESS_LOCAL開始迭代locality的级别。

    for(taskSet <-sortedTaskSets;maxLocality<- TaskLocality.values) {

    do{

    launchedTask= false

    迭代调用每个worker的值,从每个worker中在taskset中选择task的运行级别,生成TaskDescription

    for(i <- 0until offers.size) {

    得到迭代出的workerexecutoridhost

    valexecId =offers(i).executorId

    valhost =offers(i).host

    通过TaskSetManager.resourceOffer选择一个运行级别,通过此函数选择Locality级别时。

    不能超过传入的maxLocality,每次生成一个task,


    for(task <-taskSet.resourceOffer(execId,host,availableCpus(i),maxLocality)){


    每次生成一个task,把生成的task加入到上面的tasks列表中。


    tasks(i)+= task

    valtid =task.taskId

    taskIdToTaskSetId(tid)= taskSet.taskSet.id

    taskSetTaskIds(taskSet.taskSet.id)+= tid

    taskIdToExecutorId(tid)= execId


    设置当前executorid设置到activeExecutorIds列表中。当有多个依赖的stage运行时。

    第二个stagesubmitTasks时,生成TaskSetManager时,会依据的activeExecutorIds值,

    pendingTasksForExecutor中生成等运行的PROCESS_LOCALpendingtasks.


    activeExecutorIds+= execId


    executor相应的host记录到executorsByHost容器中。


    executorsByHost(host)+= execId


    当前worker中可用的cpucore的值须要减去一,这样能充分保证一个cpucore运行一个task


    availableCpus(i) -= 1

    这个值用来检查是否在当前的Locality级别中接着运行其他的task的分配,

    假设这个值为true,不放大maxLocality的级别,从下一个worker中接着分配剩余的task

    launchedTask= true

    }

    }

    } while(launchedTask)

    }


    if(tasks.size> 0) {

    设置hasLaunchedTask的值为true,表示task的运行分配完毕。在上面提到过的检查线程中对线程运行停止操作。

    hasLaunchedTask= true

    }

    returntasks

    }



    10.1TaskSetManager.resourceOffer流程分析


    defresourceOffer(

    execId: String,

    host:String,

    availableCpus: Int,

    maxLocality:TaskLocality.TaskLocality)

    :Option[TaskDescription] =

    {

    假设完毕的task个数小于要生成的总task个数,同一时候当前cpu可用的core个数和大于或等于一个配置的。默认1

    if(tasksSuccessful< numTasks&& availableCpus >= CPUS_PER_TASK){

    valcurTime =clock.getTime()

    通过如今运行task分配的时间减去上一次并从currentLocalityIndex的下标開始,

    取出locality相应的task分配等待时间,假设时间超过了此配置,把下标值加一,

    找到下一个locality的配置时间,按这方式找,直到找到ANY的值,详细可见以下的此方法说明

    varallowedLocality= getAllowedLocalityLevel(curTime)

    假设通过的locality的级别超过了传入的最大locality级别。把级别设置为传入的最大级别

    if(allowedLocality> maxLocality) {

    allowedLocality= maxLocality // We're not allowed tosearch for farther-away tasks

    }

    findTask主要是从相应的pending的列表中依据相应的Locality拿到相应的task的下标,在TaskSet.tasks中的下标。

    findTask(execId, host,allowedLocality)match{

    caseSome((index,taskLocality))=> {

    //Found a task; do some bookkeeping and return a task description

    valtask =tasks(index)

    valtaskId =sched.newTaskId()

    //Figure out whether this should count as a preferred launch

    logInfo("Startingtask %s:%d as TID %s on executor %s: %s (%s)".format(

    taskSet.id,index,taskId,execId, host, taskLocality))

    //Do various bookkeeping

    copiesRunning(index) += 1

    valinfo = newTaskInfo(taskId,index,curTime,execId, host, taskLocality)

    taskInfos(taskId)= info

    taskAttempts(index)= info ::taskAttempts(index)

    把分配此tasklocality级别拿到相应的下标,并又一次设置下标的值。

    //Update our locality level for delay scheduling

    currentLocalityIndex= getLocalityIndex(taskLocality)

    把这次的task的分配时间设置成最后一次分配时间。

    lastLaunchTime= curTime

    //Serialize and return the task

    valstartTime =clock.getTime()

    //We rely on the DAGScheduler to catch non-serializableclosures and RDDs, so in here

    //we assume the task can be serialized without exceptions.

    valserializedTask= Task.serializeWithDependencies(

    task,sched.sc.addedFiles,sched.sc.addedJars,ser)

    valtimeTaken =clock.getTime() - startTime

    addRunningTask(taskId)

    logInfo("Serializedtask %s:%d as %d bytes in %d ms".format(

    taskSet.id,index,serializedTask.limit,timeTaken))

    valtaskName ="task %s:%d".format(taskSet.id,index)

    假设是第一次运行。通过DAGScheduler.taskStarted发送BeginEvent事件。

    if(taskAttempts(index).size== 1)

    taskStarted(task,info)

    returnSome(newTaskDescription(taskId,execId, taskName,index,serializedTask))

    }

    case_ =>

    }

    }

    None

    }

    依据超时时间配置,假设这次分配task的时间减去上次task分配的时间超过了locality分配等待的配置时间,

    locality的级别向上移动一级。并又一次比对时间,拿到不超时的locality级别或ANY的级别。

    privatedefgetAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {

    while(curTime - lastLaunchTime>= localityWaits(currentLocalityIndex)&&

    currentLocalityIndex< myLocalityLevels.length- 1)

    {

    下标值加一,也就是把当前的Locality的级别向上放大一级。

    //Jump to the next locality level, and remove our waiting time for thecurrent one since

    //we don't want to count it again on the next one

    lastLaunchTime+= localityWaits(currentLocalityIndex)

    currentLocalityIndex+= 1

    }

    myLocalityLevels(currentLocalityIndex)

    }


    DAGScheduler中处理BeginEvent事件:

    caseBeginEvent(task,taskInfo)=>

    for(

    job<- idToActiveJob.get(task.stageId);

    stage<- stageIdToStage.get(task.stageId);

    stageInfo<- stageToInfos.get(stage)

    ) {

    if(taskInfo.serializedSize> TASK_SIZE_TO_WARN* 1024 &&

    !stageInfo.emittedTaskSizeWarning){

    stageInfo.emittedTaskSizeWarning= true

    logWarning(("Stage%d (%s) contains a task of very large "+

    "size(%d KB). The maximum recommended task size is %d KB.").format(

    task.stageId,stageInfo.name,taskInfo.serializedSize/ 1024,TASK_SIZE_TO_WARN))

    }

    }

    listenerBus.post(SparkListenerTaskStart(task,taskInfo))


      11.CoarseGrainedSchedulerBackend.launchTasks流程

    运行task的运行。发送LaunchTask事件处理消息

      deflaunchTasks(tasks: Seq[Seq[TaskDescription]]) {

    for(task <-tasks.flatten) {

    freeCores(task.executorId) -= 1

    依据worker注冊时的actor,向此actor发送LaunchTask事件。

    executorActor(task.executorId)! LaunchTask(task)

    }

    }


    12.启动task,因为是onyarn的模式,workeractorCoarseGrainedExecutorBackend.

    处理代码例如以下:

    caseLaunchTask(taskDesc)=>

    logInfo("Gotassigned task " + taskDesc.taskId)

    if(executor== null){

    logError("ReceivedLaunchTask command but executor was null")

    System.exit(1)

    } else{

    executor.launchTask(this,taskDesc.taskId,taskDesc.serializedTask)

    }

    .............................

    通过Executor启动task的运行。

    其他actor的消息处理与task的详细运行与shuffle后面分析。这里先不做细的说明。


    吐槽一把scala,这玩意编写代码是方便,但看起来有点麻烦呀。
  • 相关阅读:
    spring学习(二十九)--参数绑定注解
    spring学习(二十八)--@Async线程池
    spring学习(二十七)--@Async
    spring学习(二十六)--@ContextConfiguration、@RunWith(SpringJUnit4ClassRunner.class)、@ComponentScan
    ## ansible 批量在远程主机上执行命令 ##
    Celery
    Flask-Script
    Flask-SQLAlchemy
    SQLAlchemy
    数据库连接池
  • 原文地址:https://www.cnblogs.com/yxwkf/p/5128183.html
Copyright © 2011-2022 走看看