zoukankan      html  css  js  c++  java
  • Spark源码分析之五:Task调度(一)

    在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:

            1、Job的调度模型与运行反馈;

            2、Stage划分;

            3、Stage提交:对应TaskSet的生成。

            Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet。

            接下来我们要讲的第二阶段Task调度与执行,则是Spark中Job的物理调度,它实际上分为两个主要阶段:

            1、Task调度;

            2、Task运行。

            下面,我们分析下Task的调度。我们知道,在第一阶段的末尾,stage被提交后,每个stage被转化为一组task的集合--TaskSet,而紧接着,则调用taskScheduler.submitTasks()提交这些tasks,而TaskScheduler的主要职责,则是负责Job物理调度阶段--Task调度。TaskScheduler为scala中的一个trait,你可以简单的把它理解为Java中的接口,目前它仅仅有一个实现类TaskSchedulerImpl。

            TaskScheduler负责低层次任务的调度,每个TaskScheduler为一个特定的SparkContext调度tasks。这些调度器获取到由DAGScheduler为每个stage提交至他们的一组Tasks,并负责将这些tasks发送到集群,以执行它们,在它们失败时重试,并减轻掉队情况(类似MapReduce的推测执行原理吧,在这里留个疑问)。这些调度器返回一些事件events给DAGScheduler。其源码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Low-level task scheduler interface, currently implemented exclusively by 
    3.  * [[org.apache.spark.scheduler.TaskSchedulerImpl]]. 
    4.  * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks 
    5.  * for a single SparkContext. These schedulers get sets of tasks submitted to them from the 
    6.  * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running 
    7.  * them, retrying if there are failures, and mitigating stragglers. They return events to the 
    8.  * DAGScheduler. 
    9.  *  
    10.  */  
    11. private[spark] trait TaskScheduler {  
    12.   
    13.   private val appId = "spark-application-" + System.currentTimeMillis  
    14.   
    15.   def rootPool: Pool  
    16.   
    17.   def schedulingMode: SchedulingMode  
    18.   
    19.   def start(): Unit  
    20.   
    21.   // Invoked after system has successfully initialized (typically in spark context).  
    22.   // Yarn uses this to bootstrap allocation of resources based on preferred locations,  
    23.   // wait for slave registrations, etc.  
    24.   def postStartHook() { }  
    25.   
    26.   // Disconnect from the cluster.  
    27.   def stop(): Unit  
    28.   
    29.   // Submit a sequence of tasks to run.  
    30.   def submitTasks(taskSet: TaskSet): Unit  
    31.   
    32.   // Cancel a stage.  
    33.   def cancelTasks(stageId: Int, interruptThread: Boolean)  
    34.   
    35.   // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.  
    36.   def setDAGScheduler(dagScheduler: DAGScheduler): Unit  
    37.   
    38.   // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.  
    39.   def defaultParallelism(): Int  
    40.   
    41.   /** 
    42.    * Update metrics for in-progress tasks and let the master know that the BlockManager is still 
    43.    * alive. Return true if the driver knows about the given block manager. Otherwise, return false, 
    44.    * indicating that the block manager should re-register. 
    45.    */  
    46.   def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],  
    47.     blockManagerId: BlockManagerId): Boolean  
    48.   
    49.   /** 
    50.    * Get an application ID associated with the job. 
    51.    * 
    52.    * @return An application ID 
    53.    */  
    54.   def applicationId(): String = appId  
    55.   
    56.   /** 
    57.    * Process a lost executor 
    58.    */  
    59.   def executorLost(executorId: String, reason: ExecutorLossReason): Unit  
    60.   
    61.   /** 
    62.    * Get an application's attempt ID associated with the job. 
    63.    * 
    64.    * @return An application's Attempt ID 
    65.    */  
    66.   def applicationAttemptId(): Option[String]  
    67.   
    68. }  

            通过源码我们可以知道,TaskScheduler提供了实例化与销毁时必要的start()和stop()方法,并提供了提交Tasks与取消Tasks的submitTasks()和cancelTasks()方法,并且通过executorHeartbeatReceived()周期性的接收executor的心跳,更新运行中tasks的元信息,并让master知晓BlockManager仍然存活。

            好了,结合源码,我们一步步来看吧。

            首先,在DAGScheduler的submitMissingTasks()方法的最后,每个stage生成一组tasks后,即调用用TaskScheduler的submitTasks()方法提交task,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 利用taskScheduler.submitTasks()提交task  
    2.       taskScheduler.submitTasks(new TaskSet(  
    3.         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))  
    4.       // 记录提交时间  
    5.       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  

            那么我们先来看下TaskScheduler的submitTasks()方法,在其实现类TaskSchedulerImpl中,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def submitTasks(taskSet: TaskSet) {  
    2.       
    3.     // 获取TaskSet中的tasks  
    4.     val tasks = taskSet.tasks  
    5.     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")  
    6.       
    7.     // 使用synchronized进行同步  
    8.     this.synchronized {  
    9.         
    10.       // 创建TaskSetManager  
    11.       val manager = createTaskSetManager(taskSet, maxTaskFailures)  
    12.         
    13.       // 获取taskSet对应的stageId  
    14.       val stage = taskSet.stageId  
    15.         
    16.       // taskSetsByStageIdAndAttempt存储的是stageId->[taskSet.stageAttemptId->TaskSetManager]  
    17.       // 更新taskSetsByStageIdAndAttempt,将上述对应关系存入  
    18.       val stageTaskSets =  
    19.         taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])  
    20.       stageTaskSets(taskSet.stageAttemptId) = manager  
    21.         
    22.       // 查看是否存在冲突的taskSet,如果存在,抛出IllegalStateException异常  
    23.       val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>  
    24.         ts.taskSet != taskSet && !ts.isZombie  
    25.       }  
    26.       if (conflictingTaskSet) {  
    27.         throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +  
    28.           s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")  
    29.       }  
    30.         
    31.       // 将TaskSetManager添加到schedulableBuilder中  
    32.       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  
    33.   
    34.       // 如果不是本地任务,且不再接受任务  
    35.       if (!isLocal && !hasReceivedTask) {  
    36.         starvationTimer.scheduleAtFixedRate(new TimerTask() {  
    37.           override def run() {  
    38.             if (!hasLaunchedTask) {  
    39.               logWarning("Initial job has not accepted any resources; " +  
    40.                 "check your cluster UI to ensure that workers are registered " +  
    41.                 "and have sufficient resources")  
    42.             } else {  
    43.               this.cancel()  
    44.             }  
    45.           }  
    46.         }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)  
    47.       }  
    48.         
    49.       // 设置标志位hasReceivedTask为true  
    50.       hasReceivedTask = true  
    51.     }  
    52.       
    53.     // 最后调用SchedulerBackend的reviveOffers()  
    54.     backend.reviveOffers()  
    55.   }  

            该方法首先从入参TaskSet中获取tasks;

            接下来,在synchronized同步代码块内,主要完成以下几件事:

            1、创建TaskSetManager,TaskSetManager主要用来干什么呢,后面我们会分析;

            2、通过taskSet获取stageId;

            3、更新数据结构taskSetsByStageIdAndAttempt,将映射关系stageId->[taskSet.stageAttemptId->TaskSetManager]存入,这里的TaskSetManager就是上面创建的TaskSetManager,taskSet.stageAttemptId是怎么赋值的呢?为了保证叙述的完整性,还是先留个小小的疑问吧;

            4、查看是否存在冲突的taskSet,如果存在,抛出IllegalStateException异常;

            5、将TaskSetManager添加到schedulableBuilder中;

            6、最后调用SchedulerBackend的reviveOffers()。

            下面慢慢分析上述流程,首先这个TaskSetManager是干什么呢?通过名字可以简单的推论出,它是TaskSet的管理者,主要在TaskSchedulerImpl中调度同一个TaskSet中的tasks。该类追踪每个task,当它们失败时重试(直到限制的最大次数),并通过延迟调度处理位置感知调度。该类最主要的接口就是resourceOffer()方法,该方法会询问TaskSet,它是否想要在一个节点上运行一个task,并在TaskSet中的task状态变更时通知它(比如完成等)。

            现在再来看下taskSet的stageAttemptId,在DAGScheduler的submitMissingTasks()方法中调用TaskScheduler的submitTasks()方法提交task,构造TaskSet对象时,赋值给TaskSet的stageAttemptId字段的是stage.latestInfo.attemptId。而Stage的latestInfo是这样定义的:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** Returns the StageInfo for the most recent attempt for this stage. */  
    2.   // 返回该stage的最新尝试attempt的StageInfo  
    3.   def latestInfo: StageInfo = _latestInfo  

            即它是由_latestInfo来赋值的,那么_latestInfo呢?代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized 
    3.    * here, before any attempts have actually been created, because the DAGScheduler uses this 
    4.    * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts 
    5.    * have been created). 
    6.    * 指向stage最新一次尝试的StageInfo对象。 
    7.    * 在任何尝试实际发生之前,都需要在这里被初始化,因为当一个Job启动时(任何stage尝试发生时)DAGScheduler使用 
    8.    * 这个StageInfo告诉SparkListeners。 
    9.    */  
    10.   private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)  

            具体初始化过程我们不做过多讨论,我们只要知道,StageInfo中存在一个成员变量attemptId即可,而这个成员变量就是上面我们所说的taskSet的stageAttemptId。而StageInfo中attemptId的值,则是由Stage中nextAttemptId的值确定的,其定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** The ID to use for the next new attempt for this stage. */  
    2.   // 该stage下一次新尝试的id  
    3.   private var nextAttemptId: Int = 0  

             而它值的变化是怎么样的呢?答案就在Stage的makeNewStageAttempt()方法中,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */  
    2.   // 通过用一个最新的nextAttemptId创建的StageInfo对象来创建该stage的最新的一次尝试  
    3.   def makeNewStageAttempt(  
    4.       numPartitionsToCompute: Int,  
    5.       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {  
    6.       
    7.     // 构造_latestInfo  
    8.     _latestInfo = StageInfo.fromStage(  
    9.       this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)  
    10.       
    11.     // nextAttemptId自增  
    12.     nextAttemptId += 1  
    13.   }  

            什么时候调用makeNewStageAttempt()方法呢?还记得《Spark源码分析之Stage提交》一文的最后,真正提交stage的方法submitMissingTasks()中第6步,标记新的stage attempt,并发送一个SparkListenerStageSubmitted事件吗,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 标记新的stage attempt  
    2.     stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)  
    3.     // 发送一个SparkListenerStageSubmitted事件  
    4.     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  

            也就是说,在每次提交stage时,即会调用该方法,创建一个新的_latestInfo对象,并对nextAttemptId进行自增。

            好了,言归正传,继续往下看。第5步便是将TaskSetManager添加到schedulableBuilder中,那么这里就有两个问题:

            1、schedulableBuilder是什么?

            2、为什么要将TaskSetManager添加到schedulableBuilder中呢?

            我们首先看下schedulableBuilder的定义及初始化。其定义代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. var schedulableBuilder: SchedulableBuilder = null  

            而它的初始化则是在TaskSchedulerImpl的initialize()方法中。如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 初始化  
    2.   def initialize(backend: SchedulerBackend) {  
    3.     // 赋值SchedulerBackend  
    4.     this.backend = backend  
    5.       
    6.     // temporarily set rootPool name to empty  
    7.     // 临时将rootPool的名字设置为空  
    8.     rootPool = new Pool("", schedulingMode, 0, 0)  
    9.       
    10.     // 调度构造器,分两种,FIFO和FAIR  
    11.     schedulableBuilder = {  
    12.       schedulingMode match {  
    13.         case SchedulingMode.FIFO =>  
    14.           new FIFOSchedulableBuilder(rootPool)  
    15.         case SchedulingMode.FAIR =>  
    16.           new FairSchedulableBuilder(rootPool, conf)  
    17.       }  
    18.     }  
    19.     schedulableBuilder.buildPools()  
    20.   }  

            这个方法同时也初始化了TaskSchedulerImpl中SchedulerBackend类型的backend对象,这个对象在最后一步会用到,我们稍后再说。

            继续看schedulableBuilder,通过代码我们就能知道,这个schedulableBuilder是调度构造器,分FIFO和FAIR两种。至于这两种构造器的含义和区别,我们以后再分析。下面看下SchedulableBuilder的源码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * An interface to build Schedulable tree 
    3.  * buildPools: build the tree nodes(pools) 
    4.  * addTaskSetManager: build the leaf nodes(TaskSetManagers) 
    5.  */  
    6. private[spark] trait SchedulableBuilder {  
    7.   def rootPool: Pool  
    8.   
    9.   def buildPools()  
    10.   
    11.   def addTaskSetManager(manager: Schedulable, properties: Properties)  
    12. }  

            从上面的英文注释我们就能知道,SchedulableBuilder是一个构造调度树的接口,它提供了一个成员变量Pool类型的rootPool和两个主要方法:
            1、buildPools()方法:构造调度树节点(调度池);

            2、addTaskSetManager()方法:构造叶子节点(TaskSetManagers)。

            下面,我们以FIFOSchedulableBuilder为例,简单说下。FIFOSchedulableBuilder中buildPools()是个空方法,没什么可说的,我们主要分析下它的buildPools()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def addTaskSetManager(manager: Schedulable, properties: Properties) {  
    2.     rootPool.addSchedulable(manager)  
    3.   }  

            可以看到,它实际上是调用的Pool的addSchedulable()方法。继续追踪:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def addSchedulable(schedulable: Schedulable) {  
    2.     require(schedulable != null)  
    3.       
    4.     // 将schedulable加入到schedulableQueue队列,队列为ConcurrentLinkedQueue类型  
    5.     schedulableQueue.add(schedulable)  
    6.       
    7.     // 将schedulable的name与schedulable的对应关系添加到schedulableNameToSchedulable集合,集合为ConcurrentHashMap类型  
    8.     schedulableNameToSchedulable.put(schedulable.name, schedulable)  
    9.       
    10.     // 将this赋值给schedulable的parent,即形成schedulable为this子节点(即截至目前时点的叶子节点)的树形结构  
    11.     schedulable.parent = this  
    12.   }  

            而翻看TaskSetManager的源码可以知道,TaskSetManager就实现了Schedulable这个trait(特质,类似java的接口),也就意味着TaskSetManager是可以被调度的,这也就回答了上面的问题2。

            好了,我们继续看最后一步,调用SchedulerBackend的reviveOffers()。问题又来了,问题不断啊。

            1、SchedulerBackend是什么?

            2、SchedulerBackend如何被初始化?

            3、SchedulerBackend的reviveOffers()到底做了什么?

            带着问题去学习终究是好的,它让我们有了暂时的目标。下面,我们一步步来分析。

            SchedulerBackend是Spark中一个可插拔组件,可插拔意味着它可以有多种实现方式,后续我们会概略讲讲。按照字面意思,它就是调度器的一个后台服务或者实现,其主要作用就是在物理机器或者说worker就绪后,能够提供其上的资源并将tasks加载到那些机器或者worker上。

            上文中我们已经预先提到过,在TaskSchedulerImpl的initialize()方法初始化schedulableBuilder时,同时也初始化了SchedulerBackend,即:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 赋值SchedulerBackend  
    2.     this.backend = backend  

            这个SchedulerBackend是被传递进来的,那么这时我们就要追溯到TaskSchedulerImpl实例化的时候了。在Spark应用环境的初始化时,其上下文信息SparkContext中存在以下代码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Create and start the scheduler  
    2.     val (sched, ts) = SparkContext.createTaskScheduler(this, master)  
    3.     _schedulerBackend = sched  
    4.     _taskScheduler = ts  

            createTaskScheduler()方法主要就是根据给定的Master URL创建一个TaskScheduler。大致代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Create a task scheduler based on a given master URL. 
    3.    * Return a 2-tuple of the scheduler backend and the task scheduler. 
    4.    * 根据给定的Master URL创建一个TaskScheduler。 
    5.    */  
    6.   private def createTaskScheduler(  
    7.       sc: SparkContext,  
    8.       master: String): (SchedulerBackend, TaskScheduler) = {  
    9.     import SparkMasterRegex._  
    10.   
    11.     // When running locally, don't try to re-execute tasks on failure.  
    12.     val MAX_LOCAL_TASK_FAILURES = 1  
    13.   
    14.     master match {  
    15.       case "local" =>  
    16.         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)  
    17.         val backend = new LocalBackend(sc.getConf, scheduler, 1)  
    18.         scheduler.initialize(backend)  
    19.         (backend, scheduler)  
    20.   
    21.       case LOCAL_N_REGEX(threads) =>  
    22.         def localCpuCount: Int = Runtime.getRuntime.availableProcessors()  
    23.         // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.  
    24.         val threadCount = if (threads == "*") localCpuCount else threads.toInt  
    25.         if (threadCount <= 0) {  
    26.           throw new SparkException(s"Asked to run locally with $threadCount threads")  
    27.         }  
    28.         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)  
    29.         val backend = new LocalBackend(sc.getConf, scheduler, threadCount)  
    30.         scheduler.initialize(backend)  
    31.         (backend, scheduler)  
    32.   
    33.       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>  
    34.         def localCpuCount: Int = Runtime.getRuntime.availableProcessors()  
    35.         // local[*, M] means the number of cores on the computer with M failures  
    36.         // local[N, M] means exactly N threads with M failures  
    37.         val threadCount = if (threads == "*") localCpuCount else threads.toInt  
    38.         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)  
    39.         val backend = new LocalBackend(sc.getConf, scheduler, threadCount)  
    40.         scheduler.initialize(backend)  
    41.         (backend, scheduler)  
    42.   
    43.       // Standalone模式  
    44.       case SPARK_REGEX(sparkUrl) =>  
    45.         
    46.         // 初始化TaskSchedulerImpl实例scheduler  
    47.         val scheduler = new TaskSchedulerImpl(sc)  
    48.         val masterUrls = sparkUrl.split(",").map("spark://" + _)  
    49.           
    50.         // 初始化一个SparkDeploySchedulerBackend实例backend  
    51.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  
    52.           
    53.         // 调用TaskSchedulerImpl的initialize()方法,  
    54.         // 为其成员变量SchedulerBackend赋值SparkDeploySchedulerBackend  
    55.         scheduler.initialize(backend)  
    56.         // 返回backend和scheduler  
    57.         (backend, scheduler)  
    58.   
    59.       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>  
    60.         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.  
    61.         val memoryPerSlaveInt = memoryPerSlave.toInt  
    62.         if (sc.executorMemory > memoryPerSlaveInt) {  
    63.           throw new SparkException(  
    64.             "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(  
    65.               memoryPerSlaveInt, sc.executorMemory))  
    66.         }  
    67.   
    68.         val scheduler = new TaskSchedulerImpl(sc)  
    69.         val localCluster = new LocalSparkCluster(  
    70.           numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)  
    71.         val masterUrls = localCluster.start()  
    72.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  
    73.         scheduler.initialize(backend)  
    74.         backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {  
    75.           localCluster.stop()  
    76.         }  
    77.         (backend, scheduler)  
    78.   
    79.       case "yarn-standalone" | "yarn-cluster" =>  
    80.         if (master == "yarn-standalone") {  
    81.           logWarning(  
    82.             ""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")  
    83.         }  
    84.         val scheduler = try {  
    85.           val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")  
    86.           val cons = clazz.getConstructor(classOf[SparkContext])  
    87.           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]  
    88.         } catch {  
    89.           // TODO: Enumerate the exact reasons why it can fail  
    90.           // But irrespective of it, it means we cannot proceed !  
    91.           case e: Exception => {  
    92.             throw new SparkException("YARN mode not available ?", e)  
    93.           }  
    94.         }  
    95.         val backend = try {  
    96.           val clazz =  
    97.             Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")  
    98.           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])  
    99.           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]  
    100.         } catch {  
    101.           case e: Exception => {  
    102.             throw new SparkException("YARN mode not available ?", e)  
    103.           }  
    104.         }  
    105.         scheduler.initialize(backend)  
    106.         (backend, scheduler)  
    107.   
    108.       case "yarn-client" =>  
    109.         val scheduler = try {  
    110.           val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")  
    111.           val cons = clazz.getConstructor(classOf[SparkContext])  
    112.           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]  
    113.   
    114.         } catch {  
    115.           case e: Exception => {  
    116.             throw new SparkException("YARN mode not available ?", e)  
    117.           }  
    118.         }  
    119.   
    120.         val backend = try {  
    121.           val clazz =  
    122.             Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")  
    123.           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])  
    124.           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]  
    125.         } catch {  
    126.           case e: Exception => {  
    127.             throw new SparkException("YARN mode not available ?", e)  
    128.           }  
    129.         }  
    130.   
    131.         scheduler.initialize(backend)  
    132.         (backend, scheduler)  
    133.   
    134.       case MESOS_REGEX(mesosUrl) =>  
    135.         MesosNativeLibrary.load()  
    136.         val scheduler = new TaskSchedulerImpl(sc)  
    137.         val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)  
    138.         val backend = if (coarseGrained) {  
    139.           new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)  
    140.         } else {  
    141.           new MesosSchedulerBackend(scheduler, sc, mesosUrl)  
    142.         }  
    143.         scheduler.initialize(backend)  
    144.         (backend, scheduler)  
    145.   
    146.       case SIMR_REGEX(simrUrl) =>  
    147.         val scheduler = new TaskSchedulerImpl(sc)  
    148.         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)  
    149.         scheduler.initialize(backend)  
    150.         (backend, scheduler)  
    151.   
    152.       case zkUrl if zkUrl.startsWith("zk://") =>  
    153.         logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +  
    154.           "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")  
    155.         createTaskScheduler(sc, "mesos://" + zkUrl)  
    156.   
    157.       case _ =>  
    158.         throw new SparkException("Could not parse Master URL: '" + master + "'")  
    159.     }  
    160.   }  

            可以看出,它是根据Spark的部署模式来确定创建何种TaskScheduler及SchedulerBackend的。我们就以常见的Standalone模式来说明,关键代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Standalone模式  
    2.       case SPARK_REGEX(sparkUrl) =>  
    3.         
    4.         // 初始化TaskSchedulerImpl实例scheduler  
    5.         val scheduler = new TaskSchedulerImpl(sc)  
    6.         val masterUrls = sparkUrl.split(",").map("spark://" + _)  
    7.           
    8.         // 初始化一个SparkDeploySchedulerBackend实例backend  
    9.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  
    10.           
    11.         // 调用TaskSchedulerImpl的initialize()方法,  
    12.         // 为其成员变量SchedulerBackend赋值SparkDeploySchedulerBackend  
    13.         scheduler.initialize(backend)  
    14.         // 返回backend和scheduler  
    15.         (backend, scheduler)  

            Standalone模式模式中,TaskScheduler的实现为TaskSchedulerImpl,而SchedulerBackend的实现为SparkDeploySchedulerBackend,并且在TaskScheduler生成后,随即调用其initialize()方法完成了初始化,也就确定了SchedulableBuilder和SchedulerBackend。

            至此,前两个是什么以及如何初始化的问题我们都已得到答案,下面再看最后一个关于做什么的问题:SchedulerBackend的reviveOffers()到底做了什么?还是以Standalone模式来说明。SparkDeploySchedulerBackend中没有提供此方法,我们只能寄希望于其父类CoarseGrainedSchedulerBackend,果不其然,在CoarseGrainedSchedulerBackend中我们找到了reviveOffers()方法。但是,代码很简单:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def reviveOffers() {  
    2.     // 调用driverEndpoint的send()方法,发送一个ReviveOffers消息  
    3.     driverEndpoint.send(ReviveOffers)  
    4.   }  

            我们继续看driverEndpoint是什么鬼。driverEndpoint是RPC中driver端Endpoint的引用,其类型为RpcEndpointRef。在CoarseGrainedSchedulerBackend启动时的start()方法中,对driverEndpoint进行了赋值:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // TODO (prashant) send conf instead of properties  
    2.     driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))  

            这个RpcEnv只是一个抽象类,它有两种实现,一个是基于AKKa的AkkaRpcEnv,另外一个则是基于Netty的NettyRpcEnv,默认的实现是Netty。通过下述RpcEnv的代码即可看出:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {  
    2.       
    3.     // 两种实现方式:  
    4.     // akka:org.apache.spark.rpc.akka.AkkaRpcEnvFactory  
    5.     // netty:org.apache.spark.rpc.netty.NettyRpcEnvFactory  
    6.     val rpcEnvNames = Map(  
    7.       "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",  
    8.       "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")  
    9.       
    10.     // 通过参数spark.rpc配置,默认为netty  
    11.     val rpcEnvName = conf.get("spark.rpc", "netty")  
    12.     val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)  
    13.     Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]  
    14.   }  

            下面,我们就看下Netty的概要实现,在NettyRpcEnv的setupEndpoint()方法中:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {  
    2.       
    3.     // 调用Dispatcher的registerRpcEndpoint()方法完成注册  
    4.     dispatcher.registerRpcEndpoint(name, endpoint)  
    5.   }  

            它是通过dispatcher来完成endpoint注册的,name为“CoarseGrainedScheduler”,RpcEndpoint为CoarseGrainedSchedulerBackend中通过createDriverEndpoint()方法创建的DriverEndpoint对象。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {  
    2.     new DriverEndpoint(rpcEnv, properties)  
    3.   }  

            那么这个DriverEndpoint是什么类呢?我们发现它继承自ThreadSafeRpcEndpoint,继而继承RpcEndpoint这个类。这里,我们只要知道这个RpcEndpoint是进程间消息传递调用的一个端点,定义了消息触发的函数。当一个消息到来时,方法调用顺序为  onStart, receive, onStop。它的生命周期为constructor -> onStart -> receive* -> onStop。

            为什么要用RpcEndpoint呢?很简单,Task的调度与执行是在一个分布式集群上进行的,自然需要进程间的通讯。

            继续分析,那么上面提到的driverEndpoint是如何赋值的呢?我们继续看Dispatcher的registerRpcEndpoint()方法,因为最终是由它向上返回RpcEndpointRef来完成driverEndpoint的赋值的。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 注册RpcEndpoint  
    2.   // name为“Master”,endpoint为Master对象  
    3.   def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {  
    4.       
    5.     // 创建RpcEndpointAddress  
    6.     val addr = RpcEndpointAddress(nettyEnv.address, name)  
    7.       
    8.     // 创建NettyRpcEndpointRef  
    9.     val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)  
    10.       
    11.     // 同步代码块  
    12.     synchronized {  
    13.       if (stopped) {  
    14.         throw new IllegalStateException("RpcEnv has been stopped")  
    15.       }  
    16.         
    17.       // ConcurrentHashMap的putIfAbsent()方法确保不会重复创建EndpointData  
    18.       if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {  
    19.         throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")  
    20.       }  
    21.         
    22.         
    23.       val data = endpoints.get(name)  
    24.       endpointRefs.put(data.endpoint, data.ref)  
    25.       receivers.offer(data)  // for the OnStart message  
    26.     }  
    27.     endpointRef  
    28.   }  

            返回的RpcEndpointRef为NettyRpcEndpointRef类型,而RpcEndpointRef则是一个远程RpcEndpoint的引用,通过它可以给远程RpcEndpoint发送消息,可以是同步可以是异步,它映射一个地址。这么看来,我们在远端(ps:另外的机器或者进程)注册了一个RpcEndpoint,即DriverEndpoint,而在本地端(当前机器或者进程)则持有一个RpcEndpoint的引用,即NettyRpcEndpointRef,可以由它来往远端发送消息,那么发送的是什么消息呢?我们现在返回CoarseGrainedSchedulerBackend中的reviveOffers()方法,发现发送的是ReviveOffers消息。这里只是发送,具体处理还要看远端的RpcEndpoint,即DriverEndpoint。通过上面我们可以知道,RpcEndpoint的服务流程为onStart()-->receive()--> onStop(),每当消息来临时,DriverEndpoint都会调用receive()方法来处理。关键代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 如果是ReviveOffers事件,则调用makeOffers()方法  
    2.       case ReviveOffers =>  
    3.         makeOffers()  

            继续追踪其makeOffers()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Make fake resource offers on all executors  
    2.     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)  
    3.     private def makeOffers() {  
    4.       // Filter out executors under killing  
    5.       // 过滤掉under killing的executors  
    6.       val activeExecutors = executorDataMap.filterKeys(executorIsAlive)  
    7.         
    8.       // 获取workOffers,即资源  
    9.       val workOffers = activeExecutors.map { case (id, executorData) =>  
    10.         // 创建WorkerOffer对象  
    11.         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
    12.       }.toSeq  
    13.         
    14.       // 调用scheduler的resourceOffers()方法,分配资源  
    15.       // 调用launchTasks()方法,启动tasks  
    16.       launchTasks(scheduler.resourceOffers(workOffers))  
    17.     }  

            好了,留个尾巴,明天再继续分析吧~

    博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50687992

  • 相关阅读:
    Unity做AR
    Linux怎么安装vim编译器
    Linux命令之tar
    Linux命令之ln
    Linux命令之grep
    Linux命令之less
    Linux命令之cd
    Linux命令之ll
    Linux命令之cp
    Linux命令之rm
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5274458.html
Copyright © 2011-2022 走看看