zoukankan      html  css  js  c++  java
  • Spark源码分析之六:Task调度(二)

    话说在《Spark源码分析之五:Task调度(一)》一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法。这个方法针对接收到的ReviveOffers事件进行处理。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Make fake resource offers on all executors  
    2.     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)  
    3.     private def makeOffers() {  
    4.       // Filter out executors under killing  
    5.       // 过滤掉under killing的executors  
    6.       val activeExecutors = executorDataMap.filterKeys(executorIsAlive)  
    7.         
    8.       // 利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源  
    9.       val workOffers = activeExecutors.map { case (id, executorData) =>  
    10.         // 创建WorkerOffer对象  
    11.         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
    12.       }.toSeq  
    13.         
    14.       // 调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks  
    15.       // 这个scheduler就是TaskSchedulerImpl  
    16.       launchTasks(scheduler.resourceOffers(workOffers))  
    17.     }  

            代码逻辑很简单,一共分为三步:

            第一,从executorDataMap中过滤掉under killing的executors,得到activeExecutors;

            第二,利用activeExecutors中executorData的executorHost、freeCores,获取workOffers,即资源;

            第三,调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks:这个scheduler就是TaskSchedulerImpl。

            我们逐个进行分析,首先看看这个executorDataMap,其定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private val executorDataMap = new HashMap[String, ExecutorData]  

            它是CoarseGrainedSchedulerBackend掌握的集群中executor的数据集合,key为String类型的executorId,value为ExecutorData类型的executor详细信息。ExecutorData包含的主要内容如下:

            1、executorEndpoint:RpcEndpointRef类型,RPC终端的引用,用于数据通信;

            2、executorAddress:RpcAddress类型,RPC地址,用于数据通信;

            3、executorHost:String类型,executor的主机;

            4、freeCores:Int类型,可用处理器cores;

            5、totalCores:Int类型,处理器cores总数;

            6、logUrlMap:Map[String, String]类型,日志url映射集合。

            这样,通过executorDataMap这个集合我们就能知道集群当前executor的负载情况,方便资源分析并调度任务。那么executorDataMap内的数据是何时及如何更新的呢?go on,继续分析。
            对于第一步中,过滤掉under killing的executors,其实现是对executorDataMap中的所有executor调用executorIsAlive()方法中,判断是否在executorsPendingToRemove和executorsPendingLossReason两个数据结构中,这两个数据结构中的executors,都是即将移除或者已丢失的executor。

            第二步,在过滤掉已失效或者马上要失效的executor后,利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源,这个workOffers更简单,是一个WorkerOffer对象,它代表了系统的可利用资源。WorkerOffer代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Represents free resources available on an executor. 
    3.  */  
    4. private[spark]  
    5. case class WorkerOffer(executorId: String, host: String, cores: Int)  

            而最重要的第三步,先是调用scheduler.resourceOffers(workOffers),即TaskSchedulerImpl的resourceOffers()方法,然后再调用launchTasks()方法将tasks加载到executor上去执行。

            我们先看下TaskSchedulerImpl的resourceOffers()方法。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Called by cluster manager to offer resources on slaves. We respond by asking our active task 
    3.    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 
    4.    * that tasks are balanced across the cluster. 
    5.    * 
    6.    * 被集群manager调用以提供slaves上的资源。我们通过按照优先顺序询问活动task集中的task来回应。 
    7.    * 我们通过循环的方式将task调度到每个节点上以便tasks在集群中可以保持大致的均衡。 
    8.    */  
    9.   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {  
    10.       
    11.     // Mark each slave as alive and remember its hostname  
    12.     // Also track if new executor is added  
    13.     // 标记每个slave节点为alive活跃的,并且记住它的主机名  
    14.     // 同时也追踪是否有executor被加入  
    15.     var newExecAvail = false  
    16.       
    17.     // 循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源  
    18.     for (o <- offers) {  
    19.         
    20.       // 利用HashMap存储executorId->host映射的集合  
    21.       executorIdToHost(o.executorId) = o.host  
    22.         
    23.       // Number of tasks running on each executor  
    24.       // 每个executor上运行的task的数目,这里如果之前没有的话,初始化为0  
    25.       executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)  
    26.         
    27.       // 每个host上executors的集合  
    28.       // 这个executorsByHost被用来计算host活跃性,反过来我们用它来决定在给定的主机上何时实现数据本地性  
    29.       if (!executorsByHost.contains(o.host)) {// 如果executorsByHost中不存在对应的host  
    30.           
    31.         // executorsByHost中添加一条记录,key为host,value为new HashSet[String]()  
    32.         executorsByHost(o.host) = new HashSet[String]()  
    33.           
    34.         // 发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理  
    35.         // eventProcessLoop.post(ExecutorAdded(execId, host))  
    36.         // 调用DAGScheduler的executorAdded()方法处理  
    37.         executorAdded(o.executorId, o.host)  
    38.           
    39.         // 新的slave加入时,标志位newExecAvail设置为true  
    40.         newExecAvail = true  
    41.       }  
    42.         
    43.       // 更新hostsByRack  
    44.       for (rack <- getRackForHost(o.host)) {  
    45.         hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host  
    46.       }  
    47.     }  
    48.   
    49.     // Randomly shuffle offers to avoid always placing tasks on the same set of workers.  
    50.     // 随机shuffle offers以避免总是把任务放在同一组workers上执行  
    51.     val shuffledOffers = Random.shuffle(offers)  
    52.       
    53.     // Build a list of tasks to assign to each worker.  
    54.     // 构造一个task列表,以分配到每个worker  
    55.     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))  
    56.       
    57.     // 可以使用的cpu资源  
    58.     val availableCpus = shuffledOffers.map(o => o.cores).toArray  
    59.       
    60.     // 获得排序好的task集合  
    61.     // 先调用Pool.getSortedTaskSetQueue()方法  
    62.     // 还记得这个Pool吗,就是调度器中的调度池啊  
    63.     val sortedTaskSets = rootPool.getSortedTaskSetQueue  
    64.       
    65.     // 循环每个taskSet  
    66.     for (taskSet <- sortedTaskSets) {  
    67.       // 记录日志  
    68.       logDebug("parentName: %s, name: %s, runningTasks: %s".format(  
    69.         taskSet.parent.name, taskSet.name, taskSet.runningTasks))  
    70.         
    71.       // 如果存在新的活跃的executor(新的slave节点被添加时)  
    72.       if (newExecAvail) {  
    73.         // 调用executorAdded()方法  
    74.         taskSet.executorAdded()  
    75.       }  
    76.     }  
    77.   
    78.     // Take each TaskSet in our scheduling order, and then offer it each node in increasing order  
    79.     // of locality levels so that it gets a chance to launch local tasks on all of them.  
    80.     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY  
    81.     var launchedTask = false  
    82.       
    83.     // 按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性  
    84.     // 位置本地性规则的顺序是:PROCESS_LOCAL(同进程)、NODE_LOCAL(同节点)、NO_PREF、RACK_LOCAL(同机架)、ANY(任何)  
    85.     for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {  
    86.       do {  
    87.         // 调用resourceOfferSingleTaskSet()方法进行任务集调度  
    88.         launchedTask = resourceOfferSingleTaskSet(  
    89.             taskSet, maxLocality, shuffledOffers, availableCpus, tasks)  
    90.       } while (launchedTask)  
    91.     }  
    92.   
    93.     // 设置标志位hasLaunchedTask  
    94.     if (tasks.size > 0) {  
    95.       hasLaunchedTask = true  
    96.     }  
    97.       
    98.     return tasks  
    99.   }  

             首先来看下它的主体流程。如下:

            1、设置标志位newExecAvail为false,这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;

            2、循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源:

                2.1、更新executorIdToHost,executorIdToHost为利用HashMap存储executorId->host映射的集合;

                2.2、更新executorIdToTaskCount,executorIdToTaskCount为每个executor上运行的task的数目集合,这里如果之前没有的话,初始化为0;

                2.3、如果新的slave加入:

                    2.3.1、executorsByHost中添加一条记录,key为host,value为new HashSet[String]();

                    2.3.2、发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理;

                    2.3.3、新的slave加入时,标志位newExecAvail设置为true;

                2.4、更新hostsByRack;

            3、随机shuffle offers(集群中可用executor资源)以避免总是把任务放在同一组workers上执行;

            4、构造一个task列表,以分配到每个worker,针对每个executor按照其上的cores数目构造一个cores数目大小的ArrayBuffer,实现最大程度并行化;

            5、获取可以使用的cpu资源availableCpus;

            6、调用Pool.getSortedTaskSetQueue()方法获得排序好的task集合,即sortedTaskSets;

            7、循环sortedTaskSets中每个taskSet:

                   7.1、如果存在新加入的slave,则调用taskSet的executorAdded()方法,动态调整位置策略级别,这么做很容易理解,新的slave节点加入了,那么随之而来的是数据有可能存在于它上面,那么这时我们就需要重新调整任务本地性规则;

            8、循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性:

                  8.1、对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度;

            9、设置标志位hasLaunchedTask,并返回tasks。

            接下来,我们详细解释下其中的每个步骤。

            第1步不用讲,只是设置标志位newExecAvail为false,并且记住这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;

            第2步是集群中的可用executor资源offers的循环处理,更新一些数据结构,并且,在新的slave加入时,标志位newExecAvail设置为true,并且发送一个ExecutorAdded事件,交由DAGScheduler的handleExecutorAdded()方法处理。我们来看下DAGScheduler的这个方法:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private[scheduler] def handleExecutorAdded(execId: String, host: String) {  
    2.     // remove from failedEpoch(execId) ?  
    3.     if (failedEpoch.contains(execId)) {  
    4.       logInfo("Host added was in lost list earlier: " + host)  
    5.       failedEpoch -= execId  
    6.     }  
    7.     submitWaitingStages()  
    8.   }  

            很简单,先将对应host从failedEpoch中移除,failedEpoch存储的是系统探测到的失效节点的集合,存储的是execId->host的对应关系。接下来便是调用submitWaitingStages()方法提交等待的stages。这个方法我们之前分析过,这里不再赘述。但是存在一个疑点,之前stage都已提交了,这里为什么还要提交一遍呢?留待以后再寻找答案吧。

            第3步随机shuffle offers以避免总是把任务放在同一组workers上执行,这也没什么特别好讲的,为了避免所谓的热点问题而采取的一种随机策略而已。

            第4步也是,构造一个task列表,以分配到每个worker,针对每个executor,创建一个ArrayBuffer,存储的类型为TaskDescription,大小为executor的cores,即最大程度并行化,充分利用executor的cores。

            第5步就是获取到上述executor集合中cores集合availableCpus,即可以使用的cpu资源;

            下面我们重点分析下第6步,它是调用Pool.getSortedTaskSetQueue()方法,获得排序好的task集合。还记得这个Pool吗?它就是上篇文章《Spark源码分析之五:Task调度(一)》里讲到的调度器的中的调度池啊,我们看下它的getSortedTaskSetQueue()方法。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {  
    2.       
    3.     // 创建一个ArrayBuffer,存储TaskSetManager  
    4.     var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]  
    5.       
    6.     // schedulableQueue为Pool中的一个调度队列,里面存储的是TaskSetManager  
    7.     // 在TaskScheduler的submitTasks()方法中,通过层层调用,最终通过Pool的addSchedulable()方法将之前生成的TaskSetManager加入到schedulableQueue中  
    8.     // 而TaskSetManager包含具体的tasks  
    9.     // taskSetSchedulingAlgorithm为调度算法,包括FIFO和FAIR两种  
    10.     // 这里针对调度队列,<span style="font-family: Arial, Helvetica, sans-serif;">按照调度算法对其排序,</span>生成一个序列sortedSchedulableQueue,  
    11.     val sortedSchedulableQueue =  
    12.       schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)  
    13.       
    14.     // 循环sortedSchedulableQueue中所有的TaskSetManager,通过其getSortedTaskSetQueue来填充sortedTaskSetQueue  
    15.     for (schedulable <- sortedSchedulableQueue) {  
    16.       sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue  
    17.     }  
    18.       
    19.     // 返回sortedTaskSetQueue  
    20.     sortedTaskSetQueue  
    21.   }  

            首先,创建一个ArrayBuffer,用来存储TaskSetManager,然后,对Pool中已经存储好的TaskSetManager,即schedulableQueue队列,按照taskSetSchedulingAlgorithm调度规则或算法来排序,得到sortedSchedulableQueue,并循环其内的TaskSetManager,通过其getSortedTaskSetQueue()方法来填充sortedTaskSetQueue,最后返回。TaskSetManager的getSortedTaskSetQueue()方法也很简单,追加ArrayBuffer[TaskSetManager]即可,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {  
    2.     var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()  
    3.     sortedTaskSetQueue += this  
    4.     sortedTaskSetQueue  
    5.   }  

            我们着重来讲解下这个调度准则或算法taskSetSchedulingAlgorithm,其定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 调度准则,包括FAIR和FIFO两种  
    2.   var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {  
    3.     schedulingMode match {  
    4.       case SchedulingMode.FAIR =>  
    5.         new FairSchedulingAlgorithm()  
    6.       case SchedulingMode.FIFO =>  
    7.         new FIFOSchedulingAlgorithm()  
    8.     }  
    9.   }  

            它包括两种,FAIR和FIFO,下面我们以FIFO为例来讲解。代码在SchedulingAlgorithm.scala中,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {  
    2.   // 比较函数  
    3.   override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {  
    4.     val priority1 = s1.priority  
    5.     val priority2 = s2.priority  
    6.       
    7.     // 先比较priority,即优先级  
    8.     // priority相同的话,再比较stageId  
    9.     // 前者小于后者的话,返回true,否则为false  
    10.     var res = math.signum(priority1 - priority2)  
    11.     if (res == 0) {  
    12.       val stageId1 = s1.stageId  
    13.       val stageId2 = s2.stageId  
    14.       res = math.signum(stageId1 - stageId2)  
    15.     }  
    16.     if (res < 0) {  
    17.       true  
    18.     } else {  
    19.       false  
    20.     }  
    21.   }  
    22. }  

            很简单,就是先比较两个TaskSetManagerder的优先级priority,优先级相同再比较stageId。而这个priority在TaskSet生成时,就是jobId,也就是FIFO是先按照Job的顺序再按照Stage的顺序进行顺序调度,一个Job完了再调度另一个Job,Job内是按照Stage的顺序进行调度。关于priority生成的代码如下所示:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 利用taskScheduler.submitTasks()提交task  
    2.       // jobId即为TaskSet的priority  
    3.       taskScheduler.submitTasks(new TaskSet(  
    4.         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))  

            比较复杂的是FairSchedulingAlgorithm,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {  
    2.   override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {  
    3.       
    4.     val minShare1 = s1.minShare  
    5.     val minShare2 = s2.minShare  
    6.     val runningTasks1 = s1.runningTasks  
    7.     val runningTasks2 = s2.runningTasks  
    8.     val s1Needy = runningTasks1 < minShare1  
    9.     val s2Needy = runningTasks2 < minShare2  
    10.     val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble  
    11.     val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble  
    12.     val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble  
    13.     val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble  
    14.     var compare: Int = 0  
    15.   
    16.     // 前者的runningTasks<minShare而后者相反的的话,返回true;  
    17.     // runningTasks为正在运行的tasks数目,minShare为最小共享cores数;  
    18.     // 前面两个if判断的意思是两个TaskSetManager中,如果其中一个正在运行的tasks数目小于最小共享cores数,则优先调度该TaskSetManager  
    19.     if (s1Needy && !s2Needy) {  
    20.       return true  
    21.     } else if (!s1Needy && s2Needy) {// 前者的runningTasks>=minShare而后者相反的的话,返回true  
    22.       return false  
    23.     } else if (s1Needy && s2Needy) {  
    24.       // 如果两者的正在运行的tasks数目都比最小共享cores数小的话,再比较minShareRatio  
    25.       // minShareRatio为正在运行的tasks数目与最小共享cores数的比率  
    26.       compare = minShareRatio1.compareTo(minShareRatio2)  
    27.     } else {  
    28.       // 最后比较taskToWeightRatio,即权重使用率,weight代表调度池对资源获取的权重,越大需要越多的资源  
    29.       compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)  
    30.     }  
    31.   
    32.     if (compare < 0) {  
    33.       true  
    34.     } else if (compare > 0) {  
    35.       false  
    36.     } else {  
    37.       s1.name < s2.name  
    38.     }  
    39.   }  
    40. }  

            它的调度逻辑主要如下:

            1、优先看正在运行的tasks数目是否小于最小共享cores数,如果两者只有一个小于,则优先调度小于的那个,原因是既然正在运行的Tasks数目小于共享cores数,说明该节点资源比较充足,应该优先利用;

            2、如果不是只有一个的正在运行的tasks数目是否小于最小共享cores数的话,则再判断正在运行的tasks数目与最小共享cores数的比率;

            3、最后再比较权重使用率,即正在运行的tasks数目与该TaskSetManager的权重weight的比,weight代表调度池对资源获取的权重,越大需要越多的资源。

            到此为止,获得了排序好的task集合,我们来到了第7步:如果存在新加入的slave,则调用taskSet的executorAdded()方法,即TaskSetManager的executorAdded()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. def executorAdded() {  
    2.     recomputeLocality()  
    3.   }  

            没说的,继续追踪,看recomputeLocality()方法。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 重新计算位置  
    2.   def recomputeLocality() {  
    3.     // 首先获取之前的位置Level  
    4.     // currentLocalityIndex为有效位置策略级别中的索引,默认为0  
    5.     val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)  
    6.       
    7.     // 计算有效的位置Level  
    8.     myLocalityLevels = computeValidLocalityLevels()  
    9.       
    10.     // 获得位置策略级别的等待时间  
    11.     localityWaits = myLocalityLevels.map(getLocalityWait)  
    12.       
    13.     // 设置当前使用的位置策略级别的索引  
    14.     currentLocalityIndex = getLocalityIndex(previousLocalityLevel)  
    15.   }  

            首先说下这个currentLocalityIndex,它的定义为:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels  

            它是有效位置策略级别中的索引,指示当前的位置信息。也就是我们上一个task被launched所使用的Locality Level。

            接下来看下myLocalityLevels,它是任务集TaskSet中应该使用哪种位置Level的数组,在TaskSetManager对象实例化时即被初始化,变量定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling  
    2.   // 确定在我们的任务集TaskSet中应该使用哪种位置Level,以便我们做延迟调度  
    3.   var myLocalityLevels = computeValidLocalityLevels()  

            computeValidLocalityLevels()方法为计算该TaskSet使用的位置策略的方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been 
    3.    * added to queues using addPendingTask. 
    4.    * 计算该TaskSet使用的位置策略。假设所有的任务已经通过addPendingTask()被添加入队列 
    5.    */  
    6.   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {  
    7.     // 引入任务位置策略  
    8.     import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}  
    9.       
    10.     // 创建ArrayBuffer类型的levels,存储TaskLocality  
    11.     val levels = new ArrayBuffer[TaskLocality.TaskLocality]  
    12.       
    13.     // 如果pendingTasksForExecutor不为空,且PROCESS_LOCAL级别中TaskSetManager等待分配下一个任务的时间不为零,且  
    14.     // 如果pendingTasksForExecutor中每个executorId在sched的executorIdToTaskCount中存在  
    15.     // executorIdToTaskCount为每个executor上运行的task的数目集合  
    16.     if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&  
    17.         pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {  
    18.       levels += PROCESS_LOCAL  
    19.     }  
    20.       
    21.     // 如果pendingTasksForHost不为空,且NODE_LOCAL级别中TaskSetManager等待分配下一个任务的时间不为零,且  
    22.     // 如果pendingTasksForHost中每个host在sched的executorsByHost中存在  
    23.     // executorsByHost为每个host上executors的集合  
    24.     if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&  
    25.         pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {  
    26.       levels += NODE_LOCAL  
    27.     }  
    28.       
    29.     // 如果存在没有位置信息的task,则添加NO_PREF级别  
    30.     if (!pendingTasksWithNoPrefs.isEmpty) {  
    31.       levels += NO_PREF  
    32.     }  
    33.       
    34.     // 同样处理RACK_LOCAL级别  
    35.     if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&  
    36.         pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {  
    37.       levels += RACK_LOCAL  
    38.     }  
    39.       
    40.     // 最后加上一个ANY级别  
    41.     levels += ANY  
    42.     logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))  
    43.       
    44.     // 返回  
    45.     levels.toArray  
    46.   }  

            这里,我们先看下其中几个比较重要的数据结构。在TaskSetManager中,存在如下几个数据结构:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 每个executor上即将被执行的tasks的映射集合  
    2.   private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]  
    3.   
    4. // 每个host上即将被执行的tasks的映射集合  
    5.   private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]  
    6.   
    7. // 每个rack上即将被执行的tasks的映射集合  
    8.   private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]  
    9.   
    10. // Set containing pending tasks with no locality preferences.  
    11.   // 存储所有没有位置信息的即将运行tasks的index索引的集合  
    12.   var pendingTasksWithNoPrefs = new ArrayBuffer[Int]  
    13.   
    14. // Set containing all pending tasks (also used as a stack, as above).  
    15.   // 存储所有即将运行tasks的index索引的集合  
    16.   val allPendingTasks = new ArrayBuffer[Int]  

            这些数据结构,存储了task与不同位置的载体的对应关系。在TaskSetManager对象被构造时,有如下代码被执行:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Add all our tasks to the pending lists. We do this in reverse order  
    2.   // of task index so that tasks with low indices get launched first.  
    3.   // 将所有的tasks添加到pending列表。我们用倒序的任务索引一遍较低索引的任务可以被优先加载  
    4.   for (i <- (0 until numTasks).reverse) {  
    5.     addPendingTask(i)  
    6.   }  

            它对TaskSetManager中的tasks的索引倒序处理。addPendingTask()方法如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** Add a task to all the pending-task lists that it should be on. */  
    2.   // 添加一个任务的索引到所有相关的pending-task索引列表  
    3.   private def addPendingTask(index: Int) {  
    4.     // Utility method that adds `index` to a list only if it's not already there  
    5.     // 定义了一个如果索引不存在添加索引至列表的工具方法  
    6.     def addTo(list: ArrayBuffer[Int]) {  
    7.       if (!list.contains(index)) {  
    8.         list += index  
    9.       }  
    10.     }  
    11.   
    12.     // 遍历task的优先位置  
    13.     for (loc <- tasks(index).preferredLocations) {  
    14.       loc match {  
    15.         case e: ExecutorCacheTaskLocation => // 如果为ExecutorCacheTaskLocation  
    16.           // 添加任务索引index至pendingTasksForExecutor列表  
    17.           addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))  
    18.         case e: HDFSCacheTaskLocation => {// 如果为HDFSCacheTaskLocation  
    19.             
    20.           // 调用sched(即TaskSchedulerImpl)的getExecutorsAliveOnHost()方法,获得指定Host上的Alive Executors  
    21.           val exe = sched.getExecutorsAliveOnHost(loc.host)  
    22.           exe match {  
    23.             case Some(set) => {  
    24.               // 循环host上的每个Alive Executor,添加任务索引index至pendingTasksForExecutor列表  
    25.               for (e <- set) {  
    26.                 addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))  
    27.               }  
    28.               logInfo(s"Pending task $index has a cached location at ${e.host} " +  
    29.                 ", where there are executors " + set.mkString(","))  
    30.             }  
    31.             case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +  
    32.                 ", but there are no executors alive there.")  
    33.           }  
    34.         }  
    35.         case _ => Unit  
    36.       }  
    37.         
    38.       // 添加任务索引index至pendingTasksForHost列表  
    39.       addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))  
    40.         
    41.       // 根据获得任务优先位置host获得机架rack,循环,添加任务索引index至pendingTasksForRack列表  
    42.       for (rack <- sched.getRackForHost(loc.host)) {  
    43.         addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))  
    44.       }  
    45.     }  
    46.   
    47.     // 如果task没有位置属性,则将任务的索引index添加到pendingTasksWithNoPrefs,pendingTasksWithNoPrefs为存储所有没有位置信息的即将运行tasks的index索引的集合  
    48.     if (tasks(index).preferredLocations == Nil) {  
    49.       addTo(pendingTasksWithNoPrefs)  
    50.     }  
    51.   
    52.     // 将任务的索引index加入到allPendingTasks,allPendingTasks为存储所有即将运行tasks的index索引的集合  
    53.     allPendingTasks += index  // No point scanning this whole list to find the old task there  
    54.   }  

            鉴于上面注释很清晰,这里,我们只说下重点,它是根据task的preferredLocations,来决定该往哪个数据结构存储的。最终,将task的位置信息,存储到不同的数据结构中,方便后续任务调度的处理。

            同时,在TaskSetManager中TaskSchedulerImpl类型的变量中,还存在着如下几个数据结构:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Number of tasks running on each executor  
    2.   // 每个executor上正在运行的tasks的数目  
    3.   private val executorIdToTaskCount = new HashMap[String, Int]  
    4.   
    5.   // The set of executors we have on each host; this is used to compute hostsAlive, which  
    6.   // in turn is used to decide when we can attain data locality on a given host  
    7.   // 每个host上executors的集合  
    8.   // 这个executorsByHost被用来计算host活跃性,反过来我们用它来决定在给定的主机上何时实现数据本地性  
    9.   protected val executorsByHost = new HashMap[String, HashSet[String]]  
    10.   
    11.   // 每个rack上hosts的映射关系  
    12.   protected val hostsByRack = new HashMap[String, HashSet[String]]  

            它反映了当前集群中executor、host、rack的对应关系。而在computeValidLocalityLevels()方法中,根据task的位置属性和当前集群中executor、host、rack的对应关系,依靠上面这两组数据结构,就能很方便的确定该TaskSet的TaskLocality Level,详细流程不再赘述,读者可自行阅读代码。

            这里,我们只说下getLocalityWait()方法,它是获取Locality级别对应TaskSetManager等待分配下一个任务的时间,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 获取Locality级别对应TaskSetManager等待分配下一个任务的时间  
    2.   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {  
    3.     // 默认等待时间,取自参数spark.locality.wait,默认为3s  
    4.     val defaultWait = conf.get("spark.locality.wait", "3s")  
    5.       
    6.     // 根据不同的TaskLocality,取不同的参数,设置TaskLocality等待时间  
    7.     // PROCESS_LOCAL取参数spark.locality.wait.process  
    8.     // NODE_LOCAL取参数spark.locality.wait.node  
    9.     // RACK_LOCAL取参数spark.locality.wait.rack  
    10.     val localityWaitKey = level match {  
    11.       case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"  
    12.       case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"  
    13.       case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"  
    14.       case _ => null  
    15.     }  
    16.   
    17.     if (localityWaitKey != null) {  
    18.       conf.getTimeAsMs(localityWaitKey, defaultWait)  
    19.     } else {  
    20.       0L  
    21.     }  
    22.   }  

            不同的Locality级别对应取不同的参数。为什么要有这个Locality级别对应TaskSetManager等待分配下一个任务的时间呢?我们先留个小小的疑问。

            回到recomputeLocality()方法,接下来便是调用computeValidLocalityLevels()这个方法,计算当前最新的有效的位置策略Level,为什么要再次计算呢?主要就是新的slave节点加入,我们需要重新评估下集群中task位置偏好与当前集群executor、host、rack等整体资源的关系,起到了一个位置策略级别动态调整的一个效果。

            然后,便是获得位置策略级别的等待时间localityWaits、设置当前使用的位置策略级别的索引currentLocalityIndex,不再赘述。

            好了,第7步就分析完了,有些细节留到以后再归纳整理吧。

            接着分析第8步,循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性,也就是对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度。显然,我们需要首先看下resourceOfferSingleTaskSet()这个方法。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private def resourceOfferSingleTaskSet(  
    2.       taskSet: TaskSetManager,  
    3.       maxLocality: TaskLocality,  
    4.       shuffledOffers: Seq[WorkerOffer],  
    5.       availableCpus: Array[Int],  
    6.       tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {  
    7.       
    8.     // 标志位launchedTask初始化为false,用它来标记是否有task被成功分配或者launched  
    9.     var launchedTask = false  
    10.       
    11.     // 循环shuffledOffers,即每个可用executor  
    12.     for (i <- 0 until shuffledOffers.size) {  
    13.         
    14.       // 获取其executorId和host  
    15.       val execId = shuffledOffers(i).executorId  
    16.       val host = shuffledOffers(i).host  
    17.         
    18.       // 如果executor上可利用cpu数目大于每个task需要的数目,则继续task分配  
    19.       // CPUS_PER_TASK为参数spark.task.cpus配置的值,未配置的话默认为1  
    20.       if (availableCpus(i) >= CPUS_PER_TASK) {  
    21.         try {  
    22.           
    23.           // 调用TaskSetManager的resourceOffer()方法,处理返回的每个TaskDescription  
    24.           for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {  
    25.               
    26.             // 分配task成功  
    27.             // 将task加入到tasks对应位置  
    28.             // 注意,tasks为一个空的,根据shuffledOffers和其可用cores生成的有一定结构的列表  
    29.             tasks(i) += task  
    30.               
    31.             // 更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、  
    32.             // executorsByHost、availableCpus等数据结构  
    33.             val tid = task.taskId  
    34.             taskIdToTaskSetManager(tid) = taskSet // taskId与TaskSetManager的映射关系  
    35.             taskIdToExecutorId(tid) = execId // taskId与ExecutorId的映射关系  
    36.             executorIdToTaskCount(execId) += 1// executor上正在运行的task数目加1  
    37.             executorsByHost(host) += execId// host上对应的executor的映射关系  
    38.             availableCpus(i) -= CPUS_PER_TASK// 可以Cpu cores减少相应数目  
    39.               
    40.             // 确保availableCpus(i)不小于0  
    41.             assert(availableCpus(i) >= 0)  
    42.               
    43.             // 标志位launchedTask设置为true  
    44.             launchedTask = true  
    45.           }  
    46.         } catch {  
    47.           case e: TaskNotSerializableException =>  
    48.             logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")  
    49.             // Do not offer resources for this task, but don't throw an error to allow other  
    50.             // task sets to be submitted.  
    51.             return launchedTask  
    52.         }  
    53.       }  
    54.     }  
    55.     return launchedTask  
    56.   }  

            该方法的主体流程如下:

            1、标志位launchedTask初始化为false,用它来标记是否有task被成功分配或者launched;

            2、循环shuffledOffers,即每个可用executor:

                 2.1、获取其executorId和host;

                 2.2、如果executor上可利用cpu数目大于每个task需要的数目,则继续task分配;

                 2.3、调用TaskSetManager的resourceOffer()方法,处理返回的每个TaskDescription:

                    2.3.1、分配task成功,将task加入到tasks对应位置(注意,tasks为一个空的,根据shuffledOffers和其可用cores生成的有一定结构的列表);

                    2.3.2、更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、executorsByHost、availableCpus等数据结构;

                    2.3.3、确保availableCpus(i)不小于0;

                    2.3.4、标志位launchedTask设置为true;

           3、返回launchedTask。

            其他都好说,我们只看下TaskSetManager的resourceOffer()方法。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Respond to an offer of a single executor from the scheduler by finding a task 
    3.    * 
    4.    * NOTE: this function is either called with a maxLocality which 
    5.    * would be adjusted by delay scheduling algorithm or it will be with a special 
    6.    * NO_PREF locality which will be not modified 
    7.    * 
    8.    * @param execId the executor Id of the offered resource 
    9.    * @param host  the host Id of the offered resource 
    10.    * @param maxLocality the maximum locality we want to schedule the tasks at 
    11.    */  
    12.   @throws[TaskNotSerializableException]  
    13.   def resourceOffer(  
    14.       execId: String,  
    15.       host: String,  
    16.       maxLocality: TaskLocality.TaskLocality)  
    17.     : Option[TaskDescription] =  
    18.   {  
    19.     if (!isZombie) {  
    20.       
    21.       // 当前时间  
    22.       val curTime = clock.getTimeMillis()  
    23.   
    24.       // 确定可以被允许的位置策略:allowedLocality  
    25.       var allowedLocality = maxLocality  
    26.   
    27.       // 如果maxLocality不为TaskLocality.NO_PREF  
    28.       if (maxLocality != TaskLocality.NO_PREF) {  
    29.         // 获取被允许的Locality,主要是看等待时间  
    30.         allowedLocality = getAllowedLocalityLevel(curTime)  
    31.           
    32.         // 如果allowedLocality大于maxLocality,将maxLocality赋值给allowedLocality  
    33.         if (allowedLocality > maxLocality) {  
    34.           // We're not allowed to search for farther-away tasks  
    35.           allowedLocality = maxLocality  
    36.         }  
    37.       }  
    38.   
    39.       // 出列task,即分配task  
    40.       dequeueTask(execId, host, allowedLocality) match {  
    41.         case Some((index, taskLocality, speculative)) => {  
    42.             
    43.           // 找到对应的task  
    44.           // Found a task; do some bookkeeping and return a task description  
    45.           val task = tasks(index)  
    46.           val taskId = sched.newTaskId()  
    47.           // Do various bookkeeping  
    48.           // 更新copiesRunning  
    49.           copiesRunning(index) += 1  
    50.           val attemptNum = taskAttempts(index).size  
    51.             
    52.           // 创建TaskInfo  
    53.           val info = new TaskInfo(taskId, index, attemptNum, curTime,  
    54.             execId, host, taskLocality, speculative)  
    55.             
    56.           // 更新taskInfos  
    57.           taskInfos(taskId) = info  
    58.             
    59.           // 更新taskAttempts  
    60.           taskAttempts(index) = info :: taskAttempts(index)  
    61.           // Update our locality level for delay scheduling  
    62.           // NO_PREF will not affect the variables related to delay scheduling  
    63.             
    64.           // 设置currentLocalityIndex、lastLaunchTime  
    65.           if (maxLocality != TaskLocality.NO_PREF) {  
    66.             currentLocalityIndex = getLocalityIndex(taskLocality)  
    67.             lastLaunchTime = curTime  
    68.           }  
    69.             
    70.           // Serialize and return the task  
    71.           // 开始时间  
    72.           val startTime = clock.getTimeMillis()  
    73.             
    74.           // 序列化task,得到serializedTask  
    75.           val serializedTask: ByteBuffer = try {  
    76.             Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)  
    77.           } catch {  
    78.             // If the task cannot be serialized, then there's no point to re-attempt the task,  
    79.             // as it will always fail. So just abort the whole task-set.  
    80.             case NonFatal(e) =>  
    81.               val msg = s"Failed to serialize task $taskId, not attempting to retry it."  
    82.               logError(msg, e)  
    83.               abort(s"$msg Exception during serialization: $e")  
    84.               throw new TaskNotSerializableException(e)  
    85.           }  
    86.           if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&  
    87.               !emittedTaskSizeWarning) {  
    88.             emittedTaskSizeWarning = true  
    89.             logWarning(s"Stage ${task.stageId} contains a task of very large size " +  
    90.               s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +  
    91.               s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")  
    92.           }  
    93.             
    94.           // 添加running task  
    95.           addRunningTask(taskId)  
    96.   
    97.           // We used to log the time it takes to serialize the task, but task size is already  
    98.           // a good proxy to task serialization time.  
    99.           // val timeTaken = clock.getTime() - startTime  
    100.           val taskName = s"task ${info.id} in stage ${taskSet.id}"  
    101.           logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +  
    102.             s"$taskLocality, ${serializedTask.limit} bytes)")  
    103.   
    104.           // 调用DagScheduler的taskStarted()方法,标记Task已启动  
    105.           sched.dagScheduler.taskStarted(task, info)  
    106.             
    107.           // 返回TaskDescription,其中包含taskId、attemptNumber、execId、index、serializedTask等重要信息  
    108.           // attemptNumber是推测执行原理必须使用的,即拖后腿的任务可以执行多份,谁先完成用谁的结果  
    109.           return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,  
    110.             taskName, index, serializedTask))  
    111.         }  
    112.         case _ =>  
    113.       }  
    114.     }  
    115.     None  
    116.   }  

            resourceOffer()方法的处理流程大体如下:

            1、记录当前时间;

            2、 确定可以被允许的位置策略:allowedLocality;

            3、出列task,即分配task;

                3.1、如果找到对应的task,即task可以被分配:

                  3.1.1、完成获得taskId、更新copiesRunning、获得attemptNum、创建TaskInfo、更新taskInfos、更新taskAttempts、设置currentLocalityIndex、lastLaunchTime等基础数据结构的更新;

                  3.1.2、序列化task,得到serializedTask;

                  3.1.3、添加running task;

                  3.1.4、调用DagScheduler的taskStarted()方法,标记Task已启动;

                  3.1.5、返回TaskDescription,其中包含taskId、attemptNumber、execId、index、serializedTask等重要信息,attemptNumber是推测执行原理必须使用的,即拖后腿的任务可以执行多份,谁先完成用谁的结果。

            首先说下这个allowedLocality,如果maxLocality不为TaskLocality.NO_PREF,我们需要调用getAllowedLocalityLevel(),传入当前时间,得到allowedLocality,getAllowedLocalityLevel()方法逻辑比较简单,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Get the level we can launch tasks according to delay scheduling, based on current wait time. 
    3.    * 基于当前的等待是时间,得到我们可以调度task的级别 
    4.    */  
    5.   private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {  
    6.     // Remove the scheduled or finished tasks lazily  
    7.     // 判断task是否可以被调度  
    8.     def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {  
    9.       var indexOffset = pendingTaskIds.size  
    10.       // 循环  
    11.       while (indexOffset > 0) {  
    12.         // 索引递减  
    13.         indexOffset -= 1  
    14.           
    15.         // 获得task索引  
    16.         val index = pendingTaskIds(indexOffset)  
    17.           
    18.         // 如果对应task不存在任何运行实例,且未执行成功,可以调度,返回true  
    19.         if (copiesRunning(index) == 0 && !successful(index)) {  
    20.           return true  
    21.         } else {  
    22.           
    23.           // 从pendingTaskIds中移除  
    24.           pendingTaskIds.remove(indexOffset)  
    25.         }  
    26.       }  
    27.       false  
    28.     }  
    29.     // Walk through the list of tasks that can be scheduled at each location and returns true  
    30.     // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have  
    31.     // already been scheduled.  
    32.     def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {  
    33.       val emptyKeys = new ArrayBuffer[String]  
    34.         
    35.       // 循环pendingTasks  
    36.       val hasTasks = pendingTasks.exists {  
    37.         case (id: String, tasks: ArrayBuffer[Int]) =>  
    38.             
    39.           // 判断task是否可以被调度  
    40.           if (tasksNeedToBeScheduledFrom(tasks)) {  
    41.             true  
    42.           } else {  
    43.             emptyKeys += id  
    44.             false  
    45.           }  
    46.       }  
    47.       // The key could be executorId, host or rackId  
    48.       // 移除数据  
    49.       emptyKeys.foreach(id => pendingTasks.remove(id))  
    50.       hasTasks  
    51.     }  
    52.       
    53.     // 从当前索引currentLocalityIndex开始,循环myLocalityLevels  
    54.     while (currentLocalityIndex < myLocalityLevels.length - 1) {  
    55.         
    56.       // 是否存在待调度task,根据不同的Locality Level,调用moreTasksToRunIn()方法从不同的数据结构中获取,  
    57.       // NO_PREF直接看pendingTasksWithNoPrefs是否为空  
    58.       val moreTasks = myLocalityLevels(currentLocalityIndex) match {  
    59.         case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)  
    60.         case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)  
    61.         case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty  
    62.         case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)  
    63.       }  
    64.       if (!moreTasks) {// 不存在可以被调度的task  
    65.         // This is a performance optimization: if there are no more tasks that can  
    66.         // be scheduled at a particular locality level, there is no point in waiting  
    67.         // for the locality wait timeout (SPARK-4939).  
    68.         // 记录lastLaunchTime  
    69.         lastLaunchTime = curTime  
    70.         logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +  
    71.           s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")  
    72.           
    73.         // 位置策略索引加1  
    74.         currentLocalityIndex += 1  
    75.       } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {  
    76.         // Jump to the next locality level, and reset lastLaunchTime so that the next locality  
    77.         // wait timer doesn't immediately expire  
    78.           
    79.         // 更新localityWaits  
    80.         lastLaunchTime += localityWaits(currentLocalityIndex)  
    81.           
    82.         // 位置策略索引加1  
    83.         currentLocalityIndex += 1  
    84.         logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +  
    85.           s"${localityWaits(currentLocalityIndex)}ms")  
    86.       } else {  
    87.         
    88.         // 返回当前位置策略级别  
    89.         return myLocalityLevels(currentLocalityIndex)  
    90.       }  
    91.     }  
    92.       
    93.     // 返回当前位置策略级别  
    94.     myLocalityLevels(currentLocalityIndex)  
    95.   }  

            在确定allowedLocality后,我们就需要调用dequeueTask()方法,出列task,进行调度。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Dequeue a pending task for a given node and return its index and locality level. 
    3.    * Only search for tasks matching the given locality constraint. 
    4.    * 
    5.    * @return An option containing (task index within the task set, locality, is speculative?) 
    6.    */  
    7.   private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)  
    8.     : Option[(Int, TaskLocality.Value, Boolean)] =  
    9.   {  
    10.     // 首先调用dequeueTaskFromList()方法,对PROCESS_LOCAL级别的task进行调度  
    11.     for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {  
    12.       return Some((index, TaskLocality.PROCESS_LOCAL, false))  
    13.     }  
    14.   
    15.     // PROCESS_LOCAL未调度到task的话,再调度NODE_LOCAL级别  
    16.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {  
    17.       for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {  
    18.         return Some((index, TaskLocality.NODE_LOCAL, false))  
    19.       }  
    20.     }  
    21.   
    22.     // NODE_LOCAL未调度到task的话,再调度NO_PREF级别  
    23.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {  
    24.       // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic  
    25.       for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {  
    26.         return Some((index, TaskLocality.PROCESS_LOCAL, false))  
    27.       }  
    28.     }  
    29.   
    30.     // NO_PREF未调度到task的话,再调度RACK_LOCAL级别  
    31.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {  
    32.       for {  
    33.         rack <- sched.getRackForHost(host)  
    34.         index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))  
    35.       } {  
    36.         return Some((index, TaskLocality.RACK_LOCAL, false))  
    37.       }  
    38.     }  
    39.   
    40.     // 最好是ANY级别的调度  
    41.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {  
    42.       for (index <- dequeueTaskFromList(execId, allPendingTasks)) {  
    43.         return Some((index, TaskLocality.ANY, false))  
    44.       }  
    45.     }  
    46.   
    47.     // find a speculative task if all others tasks have been scheduled  
    48.     // 如果所有的class都被调度的话,寻找一个speculative task,同MapReduce的推测执行原理的思想  
    49.     dequeueSpeculativeTask(execId, host, maxLocality).map {  
    50.       case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}  
    51.   }  

            很简单,按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度。最后,如果所有的class都被调度的话,寻找一个speculative task,同MapReduce的推测执行原理的思想。

            至此,我们得到了TaskDescription,也就知道了哪个Task需要在哪个节点上执行,而Task调度也就全讲完了。

            题外话:

            要透彻的、清晰的讲解一个复杂的流程,是很费力的,短短几篇文章也是远远不够的。Task调度这两篇文章,重在叙述一个完整的流程,同时讲解部分细节。在这两篇文章的叙述中,肯定会有很多细节没讲清楚、讲透彻,甚至会有些理解错误的地方,希望高手不吝赐教,以免继续误导大家。

            针对部分细节,和对流程的深入理解,我以后还会陆续推出博文,进行详细讲解,并归纳总结,谢谢大家!

    博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50699939

  • 相关阅读:
    hive0.13.1安装-mysql server作为hive的metastore
    hadoop2.2集群部署教程连接
    hadoop2.4.1伪分布模式部署
    spring cloud (四、服务消费者demo_consumer)
    spring cloud (三、服务提供者demo_provider)
    spring cloud (二、服务注册安全demo_eureka)
    spring cloud (一、服务注册demo_eureka)
    maven里面pom文件的各标签介绍
    如何删除github里面的项目
    用过的工具列表及作用
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5274459.html
Copyright © 2011-2022 走看看