zoukankan      html  css  js  c++  java
  • Spark源码分析之五:Task调度(一)

    在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:

            1、Job的调度模型与运行反馈;

            2、Stage划分;

            3、Stage提交:对应TaskSet的生成。

            Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet。

            接下来我们要讲的第二阶段Task调度与执行,则是Spark中Job的物理调度,它实际上分为两个主要阶段:

            1、Task调度;

            2、Task运行。

            下面,我们分析下Task的调度。我们知道,在第一阶段的末尾,stage被提交后,每个stage被转化为一组task的集合--TaskSet,而紧接着,则调用taskScheduler.submitTasks()提交这些tasks,而TaskScheduler的主要职责,则是负责Job物理调度阶段--Task调度。TaskScheduler为scala中的一个trait,你可以简单的把它理解为Java中的接口,目前它仅仅有一个实现类TaskSchedulerImpl。

            TaskScheduler负责低层次任务的调度,每个TaskScheduler为一个特定的SparkContext调度tasks。这些调度器获取到由DAGScheduler为每个stage提交至他们的一组Tasks,并负责将这些tasks发送到集群,以执行它们,在它们失败时重试,并减轻掉队情况(类似MapReduce的推测执行原理吧,在这里留个疑问)。这些调度器返回一些事件events给DAGScheduler。其源码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Low-level task scheduler interface, currently implemented exclusively by 
    3.  * [[org.apache.spark.scheduler.TaskSchedulerImpl]]. 
    4.  * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks 
    5.  * for a single SparkContext. These schedulers get sets of tasks submitted to them from the 
    6.  * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running 
    7.  * them, retrying if there are failures, and mitigating stragglers. They return events to the 
    8.  * DAGScheduler. 
    9.  *  
    10.  */  
    11. private[spark] trait TaskScheduler {  
    12.   
    13.   private val appId = "spark-application-" + System.currentTimeMillis  
    14.   
    15.   def rootPool: Pool  
    16.   
    17.   def schedulingMode: SchedulingMode  
    18.   
    19.   def start(): Unit  
    20.   
    21.   // Invoked after system has successfully initialized (typically in spark context).  
    22.   // Yarn uses this to bootstrap allocation of resources based on preferred locations,  
    23.   // wait for slave registrations, etc.  
    24.   def postStartHook() { }  
    25.   
    26.   // Disconnect from the cluster.  
    27.   def stop(): Unit  
    28.   
    29.   // Submit a sequence of tasks to run.  
    30.   def submitTasks(taskSet: TaskSet): Unit  
    31.   
    32.   // Cancel a stage.  
    33.   def cancelTasks(stageId: Int, interruptThread: Boolean)  
    34.   
    35.   // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.  
    36.   def setDAGScheduler(dagScheduler: DAGScheduler): Unit  
    37.   
    38.   // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.  
    39.   def defaultParallelism(): Int  
    40.   
    41.   /** 
    42.    * Update metrics for in-progress tasks and let the master know that the BlockManager is still 
    43.    * alive. Return true if the driver knows about the given block manager. Otherwise, return false, 
    44.    * indicating that the block manager should re-register. 
    45.    */  
    46.   def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],  
    47.     blockManagerId: BlockManagerId): Boolean  
    48.   
    49.   /** 
    50.    * Get an application ID associated with the job. 
    51.    * 
    52.    * @return An application ID 
    53.    */  
    54.   def applicationId(): String = appId  
    55.   
    56.   /** 
    57.    * Process a lost executor 
    58.    */  
    59.   def executorLost(executorId: String, reason: ExecutorLossReason): Unit  
    60.   
    61.   /** 
    62.    * Get an application's attempt ID associated with the job. 
    63.    * 
    64.    * @return An application's Attempt ID 
    65.    */  
    66.   def applicationAttemptId(): Option[String]  
    67.   
    68. }  

            通过源码我们可以知道,TaskScheduler提供了实例化与销毁时必要的start()和stop()方法,并提供了提交Tasks与取消Tasks的submitTasks()和cancelTasks()方法,并且通过executorHeartbeatReceived()周期性的接收executor的心跳,更新运行中tasks的元信息,并让master知晓BlockManager仍然存活。

            好了,结合源码,我们一步步来看吧。

            首先,在DAGScheduler的submitMissingTasks()方法的最后,每个stage生成一组tasks后,即调用用TaskScheduler的submitTasks()方法提交task,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 利用taskScheduler.submitTasks()提交task  
    2.       taskScheduler.submitTasks(new TaskSet(  
    3.         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))  
    4.       // 记录提交时间  
    5.       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  

            那么我们先来看下TaskScheduler的submitTasks()方法,在其实现类TaskSchedulerImpl中,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def submitTasks(taskSet: TaskSet) {  
    2.       
    3.     // 获取TaskSet中的tasks  
    4.     val tasks = taskSet.tasks  
    5.     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")  
    6.       
    7.     // 使用synchronized进行同步  
    8.     this.synchronized {  
    9.         
    10.       // 创建TaskSetManager  
    11.       val manager = createTaskSetManager(taskSet, maxTaskFailures)  
    12.         
    13.       // 获取taskSet对应的stageId  
    14.       val stage = taskSet.stageId  
    15.         
    16.       // taskSetsByStageIdAndAttempt存储的是stageId->[taskSet.stageAttemptId->TaskSetManager]  
    17.       // 更新taskSetsByStageIdAndAttempt,将上述对应关系存入  
    18.       val stageTaskSets =  
    19.         taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])  
    20.       stageTaskSets(taskSet.stageAttemptId) = manager  
    21.         
    22.       // 查看是否存在冲突的taskSet,如果存在,抛出IllegalStateException异常  
    23.       val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>  
    24.         ts.taskSet != taskSet && !ts.isZombie  
    25.       }  
    26.       if (conflictingTaskSet) {  
    27.         throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +  
    28.           s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")  
    29.       }  
    30.         
    31.       // 将TaskSetManager添加到schedulableBuilder中  
    32.       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  
    33.   
    34.       // 如果不是本地任务,且不再接受任务  
    35.       if (!isLocal && !hasReceivedTask) {  
    36.         starvationTimer.scheduleAtFixedRate(new TimerTask() {  
    37.           override def run() {  
    38.             if (!hasLaunchedTask) {  
    39.               logWarning("Initial job has not accepted any resources; " +  
    40.                 "check your cluster UI to ensure that workers are registered " +  
    41.                 "and have sufficient resources")  
    42.             } else {  
    43.               this.cancel()  
    44.             }  
    45.           }  
    46.         }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)  
    47.       }  
    48.         
    49.       // 设置标志位hasReceivedTask为true  
    50.       hasReceivedTask = true  
    51.     }  
    52.       
    53.     // 最后调用SchedulerBackend的reviveOffers()  
    54.     backend.reviveOffers()  
    55.   }  

            该方法首先从入参TaskSet中获取tasks;

            接下来,在synchronized同步代码块内,主要完成以下几件事:

            1、创建TaskSetManager,TaskSetManager主要用来干什么呢,后面我们会分析;

            2、通过taskSet获取stageId;

            3、更新数据结构taskSetsByStageIdAndAttempt,将映射关系stageId->[taskSet.stageAttemptId->TaskSetManager]存入,这里的TaskSetManager就是上面创建的TaskSetManager,taskSet.stageAttemptId是怎么赋值的呢?为了保证叙述的完整性,还是先留个小小的疑问吧;

            4、查看是否存在冲突的taskSet,如果存在,抛出IllegalStateException异常;

            5、将TaskSetManager添加到schedulableBuilder中;

            6、最后调用SchedulerBackend的reviveOffers()。

            下面慢慢分析上述流程,首先这个TaskSetManager是干什么呢?通过名字可以简单的推论出,它是TaskSet的管理者,主要在TaskSchedulerImpl中调度同一个TaskSet中的tasks。该类追踪每个task,当它们失败时重试(直到限制的最大次数),并通过延迟调度处理位置感知调度。该类最主要的接口就是resourceOffer()方法,该方法会询问TaskSet,它是否想要在一个节点上运行一个task,并在TaskSet中的task状态变更时通知它(比如完成等)。

            现在再来看下taskSet的stageAttemptId,在DAGScheduler的submitMissingTasks()方法中调用TaskScheduler的submitTasks()方法提交task,构造TaskSet对象时,赋值给TaskSet的stageAttemptId字段的是stage.latestInfo.attemptId。而Stage的latestInfo是这样定义的:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** Returns the StageInfo for the most recent attempt for this stage. */  
    2.   // 返回该stage的最新尝试attempt的StageInfo  
    3.   def latestInfo: StageInfo = _latestInfo  

            即它是由_latestInfo来赋值的,那么_latestInfo呢?代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized 
    3.    * here, before any attempts have actually been created, because the DAGScheduler uses this 
    4.    * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts 
    5.    * have been created). 
    6.    * 指向stage最新一次尝试的StageInfo对象。 
    7.    * 在任何尝试实际发生之前,都需要在这里被初始化,因为当一个Job启动时(任何stage尝试发生时)DAGScheduler使用 
    8.    * 这个StageInfo告诉SparkListeners。 
    9.    */  
    10.   private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)  

            具体初始化过程我们不做过多讨论,我们只要知道,StageInfo中存在一个成员变量attemptId即可,而这个成员变量就是上面我们所说的taskSet的stageAttemptId。而StageInfo中attemptId的值,则是由Stage中nextAttemptId的值确定的,其定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** The ID to use for the next new attempt for this stage. */  
    2.   // 该stage下一次新尝试的id  
    3.   private var nextAttemptId: Int = 0  

             而它值的变化是怎么样的呢?答案就在Stage的makeNewStageAttempt()方法中,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */  
    2.   // 通过用一个最新的nextAttemptId创建的StageInfo对象来创建该stage的最新的一次尝试  
    3.   def makeNewStageAttempt(  
    4.       numPartitionsToCompute: Int,  
    5.       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {  
    6.       
    7.     // 构造_latestInfo  
    8.     _latestInfo = StageInfo.fromStage(  
    9.       this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)  
    10.       
    11.     // nextAttemptId自增  
    12.     nextAttemptId += 1  
    13.   }  

            什么时候调用makeNewStageAttempt()方法呢?还记得《Spark源码分析之Stage提交》一文的最后,真正提交stage的方法submitMissingTasks()中第6步,标记新的stage attempt,并发送一个SparkListenerStageSubmitted事件吗,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 标记新的stage attempt  
    2.     stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)  
    3.     // 发送一个SparkListenerStageSubmitted事件  
    4.     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  

            也就是说,在每次提交stage时,即会调用该方法,创建一个新的_latestInfo对象,并对nextAttemptId进行自增。

            好了,言归正传,继续往下看。第5步便是将TaskSetManager添加到schedulableBuilder中,那么这里就有两个问题:

            1、schedulableBuilder是什么?

            2、为什么要将TaskSetManager添加到schedulableBuilder中呢?

            我们首先看下schedulableBuilder的定义及初始化。其定义代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. var schedulableBuilder: SchedulableBuilder = null  

            而它的初始化则是在TaskSchedulerImpl的initialize()方法中。如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 初始化  
    2.   def initialize(backend: SchedulerBackend) {  
    3.     // 赋值SchedulerBackend  
    4.     this.backend = backend  
    5.       
    6.     // temporarily set rootPool name to empty  
    7.     // 临时将rootPool的名字设置为空  
    8.     rootPool = new Pool("", schedulingMode, 0, 0)  
    9.       
    10.     // 调度构造器,分两种,FIFO和FAIR  
    11.     schedulableBuilder = {  
    12.       schedulingMode match {  
    13.         case SchedulingMode.FIFO =>  
    14.           new FIFOSchedulableBuilder(rootPool)  
    15.         case SchedulingMode.FAIR =>  
    16.           new FairSchedulableBuilder(rootPool, conf)  
    17.       }  
    18.     }  
    19.     schedulableBuilder.buildPools()  
    20.   }  

            这个方法同时也初始化了TaskSchedulerImpl中SchedulerBackend类型的backend对象,这个对象在最后一步会用到,我们稍后再说。

            继续看schedulableBuilder,通过代码我们就能知道,这个schedulableBuilder是调度构造器,分FIFO和FAIR两种。至于这两种构造器的含义和区别,我们以后再分析。下面看下SchedulableBuilder的源码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * An interface to build Schedulable tree 
    3.  * buildPools: build the tree nodes(pools) 
    4.  * addTaskSetManager: build the leaf nodes(TaskSetManagers) 
    5.  */  
    6. private[spark] trait SchedulableBuilder {  
    7.   def rootPool: Pool  
    8.   
    9.   def buildPools()  
    10.   
    11.   def addTaskSetManager(manager: Schedulable, properties: Properties)  
    12. }  

            从上面的英文注释我们就能知道,SchedulableBuilder是一个构造调度树的接口,它提供了一个成员变量Pool类型的rootPool和两个主要方法:
            1、buildPools()方法:构造调度树节点(调度池);

            2、addTaskSetManager()方法:构造叶子节点(TaskSetManagers)。

            下面,我们以FIFOSchedulableBuilder为例,简单说下。FIFOSchedulableBuilder中buildPools()是个空方法,没什么可说的,我们主要分析下它的buildPools()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def addTaskSetManager(manager: Schedulable, properties: Properties) {  
    2.     rootPool.addSchedulable(manager)  
    3.   }  

            可以看到,它实际上是调用的Pool的addSchedulable()方法。继续追踪:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def addSchedulable(schedulable: Schedulable) {  
    2.     require(schedulable != null)  
    3.       
    4.     // 将schedulable加入到schedulableQueue队列,队列为ConcurrentLinkedQueue类型  
    5.     schedulableQueue.add(schedulable)  
    6.       
    7.     // 将schedulable的name与schedulable的对应关系添加到schedulableNameToSchedulable集合,集合为ConcurrentHashMap类型  
    8.     schedulableNameToSchedulable.put(schedulable.name, schedulable)  
    9.       
    10.     // 将this赋值给schedulable的parent,即形成schedulable为this子节点(即截至目前时点的叶子节点)的树形结构  
    11.     schedulable.parent = this  
    12.   }  

            而翻看TaskSetManager的源码可以知道,TaskSetManager就实现了Schedulable这个trait(特质,类似java的接口),也就意味着TaskSetManager是可以被调度的,这也就回答了上面的问题2。

            好了,我们继续看最后一步,调用SchedulerBackend的reviveOffers()。问题又来了,问题不断啊。

            1、SchedulerBackend是什么?

            2、SchedulerBackend如何被初始化?

            3、SchedulerBackend的reviveOffers()到底做了什么?

            带着问题去学习终究是好的,它让我们有了暂时的目标。下面,我们一步步来分析。

            SchedulerBackend是Spark中一个可插拔组件,可插拔意味着它可以有多种实现方式,后续我们会概略讲讲。按照字面意思,它就是调度器的一个后台服务或者实现,其主要作用就是在物理机器或者说worker就绪后,能够提供其上的资源并将tasks加载到那些机器或者worker上。

            上文中我们已经预先提到过,在TaskSchedulerImpl的initialize()方法初始化schedulableBuilder时,同时也初始化了SchedulerBackend,即:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 赋值SchedulerBackend  
    2.     this.backend = backend  

            这个SchedulerBackend是被传递进来的,那么这时我们就要追溯到TaskSchedulerImpl实例化的时候了。在Spark应用环境的初始化时,其上下文信息SparkContext中存在以下代码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Create and start the scheduler  
    2.     val (sched, ts) = SparkContext.createTaskScheduler(this, master)  
    3.     _schedulerBackend = sched  
    4.     _taskScheduler = ts  

            createTaskScheduler()方法主要就是根据给定的Master URL创建一个TaskScheduler。大致代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Create a task scheduler based on a given master URL. 
    3.    * Return a 2-tuple of the scheduler backend and the task scheduler. 
    4.    * 根据给定的Master URL创建一个TaskScheduler。 
    5.    */  
    6.   private def createTaskScheduler(  
    7.       sc: SparkContext,  
    8.       master: String): (SchedulerBackend, TaskScheduler) = {  
    9.     import SparkMasterRegex._  
    10.   
    11.     // When running locally, don't try to re-execute tasks on failure.  
    12.     val MAX_LOCAL_TASK_FAILURES = 1  
    13.   
    14.     master match {  
    15.       case "local" =>  
    16.         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)  
    17.         val backend = new LocalBackend(sc.getConf, scheduler, 1)  
    18.         scheduler.initialize(backend)  
    19.         (backend, scheduler)  
    20.   
    21.       case LOCAL_N_REGEX(threads) =>  
    22.         def localCpuCount: Int = Runtime.getRuntime.availableProcessors()  
    23.         // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.  
    24.         val threadCount = if (threads == "*") localCpuCount else threads.toInt  
    25.         if (threadCount <= 0) {  
    26.           throw new SparkException(s"Asked to run locally with $threadCount threads")  
    27.         }  
    28.         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)  
    29.         val backend = new LocalBackend(sc.getConf, scheduler, threadCount)  
    30.         scheduler.initialize(backend)  
    31.         (backend, scheduler)  
    32.   
    33.       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>  
    34.         def localCpuCount: Int = Runtime.getRuntime.availableProcessors()  
    35.         // local[*, M] means the number of cores on the computer with M failures  
    36.         // local[N, M] means exactly N threads with M failures  
    37.         val threadCount = if (threads == "*") localCpuCount else threads.toInt  
    38.         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)  
    39.         val backend = new LocalBackend(sc.getConf, scheduler, threadCount)  
    40.         scheduler.initialize(backend)  
    41.         (backend, scheduler)  
    42.   
    43.       // Standalone模式  
    44.       case SPARK_REGEX(sparkUrl) =>  
    45.         
    46.         // 初始化TaskSchedulerImpl实例scheduler  
    47.         val scheduler = new TaskSchedulerImpl(sc)  
    48.         val masterUrls = sparkUrl.split(",").map("spark://" + _)  
    49.           
    50.         // 初始化一个SparkDeploySchedulerBackend实例backend  
    51.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  
    52.           
    53.         // 调用TaskSchedulerImpl的initialize()方法,  
    54.         // 为其成员变量SchedulerBackend赋值SparkDeploySchedulerBackend  
    55.         scheduler.initialize(backend)  
    56.         // 返回backend和scheduler  
    57.         (backend, scheduler)  
    58.   
    59.       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>  
    60.         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.  
    61.         val memoryPerSlaveInt = memoryPerSlave.toInt  
    62.         if (sc.executorMemory > memoryPerSlaveInt) {  
    63.           throw new SparkException(  
    64.             "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(  
    65.               memoryPerSlaveInt, sc.executorMemory))  
    66.         }  
    67.   
    68.         val scheduler = new TaskSchedulerImpl(sc)  
    69.         val localCluster = new LocalSparkCluster(  
    70.           numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)  
    71.         val masterUrls = localCluster.start()  
    72.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  
    73.         scheduler.initialize(backend)  
    74.         backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {  
    75.           localCluster.stop()  
    76.         }  
    77.         (backend, scheduler)  
    78.   
    79.       case "yarn-standalone" | "yarn-cluster" =>  
    80.         if (master == "yarn-standalone") {  
    81.           logWarning(  
    82.             ""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")  
    83.         }  
    84.         val scheduler = try {  
    85.           val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")  
    86.           val cons = clazz.getConstructor(classOf[SparkContext])  
    87.           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]  
    88.         } catch {  
    89.           // TODO: Enumerate the exact reasons why it can fail  
    90.           // But irrespective of it, it means we cannot proceed !  
    91.           case e: Exception => {  
    92.             throw new SparkException("YARN mode not available ?", e)  
    93.           }  
    94.         }  
    95.         val backend = try {  
    96.           val clazz =  
    97.             Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")  
    98.           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])  
    99.           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]  
    100.         } catch {  
    101.           case e: Exception => {  
    102.             throw new SparkException("YARN mode not available ?", e)  
    103.           }  
    104.         }  
    105.         scheduler.initialize(backend)  
    106.         (backend, scheduler)  
    107.   
    108.       case "yarn-client" =>  
    109.         val scheduler = try {  
    110.           val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")  
    111.           val cons = clazz.getConstructor(classOf[SparkContext])  
    112.           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]  
    113.   
    114.         } catch {  
    115.           case e: Exception => {  
    116.             throw new SparkException("YARN mode not available ?", e)  
    117.           }  
    118.         }  
    119.   
    120.         val backend = try {  
    121.           val clazz =  
    122.             Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")  
    123.           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])  
    124.           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]  
    125.         } catch {  
    126.           case e: Exception => {  
    127.             throw new SparkException("YARN mode not available ?", e)  
    128.           }  
    129.         }  
    130.   
    131.         scheduler.initialize(backend)  
    132.         (backend, scheduler)  
    133.   
    134.       case MESOS_REGEX(mesosUrl) =>  
    135.         MesosNativeLibrary.load()  
    136.         val scheduler = new TaskSchedulerImpl(sc)  
    137.         val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)  
    138.         val backend = if (coarseGrained) {  
    139.           new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)  
    140.         } else {  
    141.           new MesosSchedulerBackend(scheduler, sc, mesosUrl)  
    142.         }  
    143.         scheduler.initialize(backend)  
    144.         (backend, scheduler)  
    145.   
    146.       case SIMR_REGEX(simrUrl) =>  
    147.         val scheduler = new TaskSchedulerImpl(sc)  
    148.         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)  
    149.         scheduler.initialize(backend)  
    150.         (backend, scheduler)  
    151.   
    152.       case zkUrl if zkUrl.startsWith("zk://") =>  
    153.         logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +  
    154.           "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")  
    155.         createTaskScheduler(sc, "mesos://" + zkUrl)  
    156.   
    157.       case _ =>  
    158.         throw new SparkException("Could not parse Master URL: '" + master + "'")  
    159.     }  
    160.   }  

            可以看出,它是根据Spark的部署模式来确定创建何种TaskScheduler及SchedulerBackend的。我们就以常见的Standalone模式来说明,关键代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Standalone模式  
    2.       case SPARK_REGEX(sparkUrl) =>  
    3.         
    4.         // 初始化TaskSchedulerImpl实例scheduler  
    5.         val scheduler = new TaskSchedulerImpl(sc)  
    6.         val masterUrls = sparkUrl.split(",").map("spark://" + _)  
    7.           
    8.         // 初始化一个SparkDeploySchedulerBackend实例backend  
    9.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  
    10.           
    11.         // 调用TaskSchedulerImpl的initialize()方法,  
    12.         // 为其成员变量SchedulerBackend赋值SparkDeploySchedulerBackend  
    13.         scheduler.initialize(backend)  
    14.         // 返回backend和scheduler  
    15.         (backend, scheduler)  

            Standalone模式模式中,TaskScheduler的实现为TaskSchedulerImpl,而SchedulerBackend的实现为SparkDeploySchedulerBackend,并且在TaskScheduler生成后,随即调用其initialize()方法完成了初始化,也就确定了SchedulableBuilder和SchedulerBackend。

            至此,前两个是什么以及如何初始化的问题我们都已得到答案,下面再看最后一个关于做什么的问题:SchedulerBackend的reviveOffers()到底做了什么?还是以Standalone模式来说明。SparkDeploySchedulerBackend中没有提供此方法,我们只能寄希望于其父类CoarseGrainedSchedulerBackend,果不其然,在CoarseGrainedSchedulerBackend中我们找到了reviveOffers()方法。但是,代码很简单:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def reviveOffers() {  
    2.     // 调用driverEndpoint的send()方法,发送一个ReviveOffers消息  
    3.     driverEndpoint.send(ReviveOffers)  
    4.   }  

            我们继续看driverEndpoint是什么鬼。driverEndpoint是RPC中driver端Endpoint的引用,其类型为RpcEndpointRef。在CoarseGrainedSchedulerBackend启动时的start()方法中,对driverEndpoint进行了赋值:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // TODO (prashant) send conf instead of properties  
    2.     driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))  

            这个RpcEnv只是一个抽象类,它有两种实现,一个是基于AKKa的AkkaRpcEnv,另外一个则是基于Netty的NettyRpcEnv,默认的实现是Netty。通过下述RpcEnv的代码即可看出:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {  
    2.       
    3.     // 两种实现方式:  
    4.     // akka:org.apache.spark.rpc.akka.AkkaRpcEnvFactory  
    5.     // netty:org.apache.spark.rpc.netty.NettyRpcEnvFactory  
    6.     val rpcEnvNames = Map(  
    7.       "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",  
    8.       "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")  
    9.       
    10.     // 通过参数spark.rpc配置,默认为netty  
    11.     val rpcEnvName = conf.get("spark.rpc", "netty")  
    12.     val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)  
    13.     Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]  
    14.   }  

            下面,我们就看下Netty的概要实现,在NettyRpcEnv的setupEndpoint()方法中:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {  
    2.       
    3.     // 调用Dispatcher的registerRpcEndpoint()方法完成注册  
    4.     dispatcher.registerRpcEndpoint(name, endpoint)  
    5.   }  

            它是通过dispatcher来完成endpoint注册的,name为“CoarseGrainedScheduler”,RpcEndpoint为CoarseGrainedSchedulerBackend中通过createDriverEndpoint()方法创建的DriverEndpoint对象。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {  
    2.     new DriverEndpoint(rpcEnv, properties)  
    3.   }  

            那么这个DriverEndpoint是什么类呢?我们发现它继承自ThreadSafeRpcEndpoint,继而继承RpcEndpoint这个类。这里,我们只要知道这个RpcEndpoint是进程间消息传递调用的一个端点,定义了消息触发的函数。当一个消息到来时,方法调用顺序为  onStart, receive, onStop。它的生命周期为constructor -> onStart -> receive* -> onStop。

            为什么要用RpcEndpoint呢?很简单,Task的调度与执行是在一个分布式集群上进行的,自然需要进程间的通讯。

            继续分析,那么上面提到的driverEndpoint是如何赋值的呢?我们继续看Dispatcher的registerRpcEndpoint()方法,因为最终是由它向上返回RpcEndpointRef来完成driverEndpoint的赋值的。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 注册RpcEndpoint  
    2.   // name为“Master”,endpoint为Master对象  
    3.   def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {  
    4.       
    5.     // 创建RpcEndpointAddress  
    6.     val addr = RpcEndpointAddress(nettyEnv.address, name)  
    7.       
    8.     // 创建NettyRpcEndpointRef  
    9.     val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)  
    10.       
    11.     // 同步代码块  
    12.     synchronized {  
    13.       if (stopped) {  
    14.         throw new IllegalStateException("RpcEnv has been stopped")  
    15.       }  
    16.         
    17.       // ConcurrentHashMap的putIfAbsent()方法确保不会重复创建EndpointData  
    18.       if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {  
    19.         throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")  
    20.       }  
    21.         
    22.         
    23.       val data = endpoints.get(name)  
    24.       endpointRefs.put(data.endpoint, data.ref)  
    25.       receivers.offer(data)  // for the OnStart message  
    26.     }  
    27.     endpointRef  
    28.   }  

            返回的RpcEndpointRef为NettyRpcEndpointRef类型,而RpcEndpointRef则是一个远程RpcEndpoint的引用,通过它可以给远程RpcEndpoint发送消息,可以是同步可以是异步,它映射一个地址。这么看来,我们在远端(ps:另外的机器或者进程)注册了一个RpcEndpoint,即DriverEndpoint,而在本地端(当前机器或者进程)则持有一个RpcEndpoint的引用,即NettyRpcEndpointRef,可以由它来往远端发送消息,那么发送的是什么消息呢?我们现在返回CoarseGrainedSchedulerBackend中的reviveOffers()方法,发现发送的是ReviveOffers消息。这里只是发送,具体处理还要看远端的RpcEndpoint,即DriverEndpoint。通过上面我们可以知道,RpcEndpoint的服务流程为onStart()-->receive()--> onStop(),每当消息来临时,DriverEndpoint都会调用receive()方法来处理。关键代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 如果是ReviveOffers事件,则调用makeOffers()方法  
    2.       case ReviveOffers =>  
    3.         makeOffers()  

            继续追踪其makeOffers()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // Make fake resource offers on all executors  
    2.     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)  
    3.     private def makeOffers() {  
    4.       // Filter out executors under killing  
    5.       // 过滤掉under killing的executors  
    6.       val activeExecutors = executorDataMap.filterKeys(executorIsAlive)  
    7.         
    8.       // 获取workOffers,即资源  
    9.       val workOffers = activeExecutors.map { case (id, executorData) =>  
    10.         // 创建WorkerOffer对象  
    11.         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
    12.       }.toSeq  
    13.         
    14.       // 调用scheduler的resourceOffers()方法,分配资源  
    15.       // 调用launchTasks()方法,启动tasks  
    16.       launchTasks(scheduler.resourceOffers(workOffers))  
    17.     }  

            好了,留个尾巴,明天再继续分析吧~

    博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50687992

  • 相关阅读:
    GNU make manual 翻译(九十九)
    GNU make manual 翻译( 九十五)
    Shell的 for 循环小例子
    makefile中对目录遍历的小例子
    GNU make manual 翻译(九十三)
    GNU make manual 翻译( 一百)
    GNU make manual 翻译( 九十七)
    GNU make manual 翻译( 九十八)
    mapserver4.8.3 的readme.win32的中文翻译文件
    遥控器编程
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5274458.html
Copyright © 2011-2022 走看看