zoukankan      html  css  js  c++  java
  • Spark源码分析之二:Job的调度模型与运行反馈

     在《Spark源码分析之Job提交运行总流程概述》一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:

            1、Job的调度模型与运行反馈;

            2、Stage划分;

            3、Stage提交:对应TaskSet的生成。

            今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

            首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Run an action job on the given RDD and pass all the results to the resultHandler function as 
    3.    * they arrive. 
    4.    * 
    5.    * @param rdd target RDD to run tasks on 
    6.    * @param func a function to run on each partition of the RDD 
    7.    * @param partitions set of partitions to run on; some jobs may not want to compute on all 
    8.    *   partitions of the target RDD, e.g. for operations like first() 
    9.    * @param callSite where in the user program this job was called 
    10.    * @param resultHandler callback to pass each result to 
    11.    * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 
    12.    * 
    13.    * @throws Exception when the job fails 
    14.    */  
    15.   def runJob[T, U](  
    16.       rdd: RDD[T],  
    17.       func: (TaskContext, Iterator[T]) => U,  
    18.       partitions: Seq[Int],  
    19.       callSite: CallSite,  
    20.       resultHandler: (Int, U) => Unit,  
    21.       properties: Properties): Unit = {  
    22.         
    23.     // 开始时间  
    24.     val start = System.nanoTime  
    25.       
    26.     // 调用submitJob()方法,提交Job,返回JobWaiter  
    27.     // rdd为最后一个rdd,即target RDD to run tasks on  
    28.     // func为该rdd上每个分区需要执行的函数,a function to run on each partition of the RDD  
    29.     // partitions为该rdd上需要执行操作的分区集合,set of partitions to run on  
    30.     // callSite为用户程序job被调用的地方,where in the user program this job was called  
    31.     val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)  
    32.       
    33.     // JobWaiter调用awaitResult()方法等待结果  
    34.     waiter.awaitResult() match {  
    35.       case JobSucceeded => // Job运行成功  
    36.         logInfo("Job %d finished: %s, took %f s".format  
    37.           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
    38.       case JobFailed(exception: Exception) =>// Job运行失败  
    39.         logInfo("Job %d failed: %s, took %f s".format  
    40.           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
    41.         // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.  
    42.         val callerStackTrace = Thread.currentThread().getStackTrace.tail  
    43.         exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)  
    44.         throw exception  
    45.     }  
    46.   }  

            runJob()方法就做了三件事:

            首先,获取开始时间,方便最后计算Job执行时间;

            其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;

            最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。

            awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. def awaitResult(): JobResult = synchronized {  
    2.       
    3.     // 循环,如果标志位_jobFinished为false,则一直循环,否则退出,返回JobResult  
    4.     while (!_jobFinished) {  
    5.       this.wait()  
    6.     }  
    7.     return jobResult  
    8.   }  

            而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 任务运行完成  
    2.   override def taskSucceeded(index: Int, result: Any): Unit = synchronized {  
    3.     if (_jobFinished) {  
    4.       throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")  
    5.     }  
    6.     resultHandler(index, result.asInstanceOf[T])  
    7.     finishedTasks += 1  
    8.     // 已完成Task数目是否等于总Task数目  
    9.     if (finishedTasks == totalTasks) {  
    10.       // 设置标志位_jobFinished为ture  
    11.       _jobFinished = true  
    12.       // 作业运行结果为成功  
    13.       jobResult = JobSucceeded  
    14.       this.notifyAll()  
    15.     }  
    16.   }  
    17.   
    18.   // 作业失败  
    19.   override def jobFailed(exception: Exception): Unit = synchronized {  
    20.     // 设置标志位_jobFinished为ture  
    21.     _jobFinished = true  
    22.     // 作业运行结果为失败  
    23.     jobResult = JobFailed(exception)  
    24.     this.notifyAll()  
    25.   }  



            接下来,看看submitJob()方法,代码定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Submit an action job to the scheduler. 
    3.    * 
    4.    * @param rdd target RDD to run tasks on 
    5.    * @param func a function to run on each partition of the RDD 
    6.    * @param partitions set of partitions to run on; some jobs may not want to compute on all 
    7.    *   partitions of the target RDD, e.g. for operations like first() 
    8.    * @param callSite where in the user program this job was called 
    9.    * @param resultHandler callback to pass each result to 
    10.    * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 
    11.    * 
    12.    * @return a JobWaiter object that can be used to block until the job finishes executing 
    13.    *         or can be used to cancel the job. 
    14.    * 
    15.    * @throws IllegalArgumentException when partitions ids are illegal 
    16.    */  
    17.   def submitJob[T, U](  
    18.       rdd: RDD[T],  
    19.       func: (TaskContext, Iterator[T]) => U,  
    20.       partitions: Seq[Int],  
    21.       callSite: CallSite,  
    22.       resultHandler: (Int, U) => Unit,  
    23.       properties: Properties): JobWaiter[U] = {  
    24.       
    25.     // Check to make sure we are not launching a task on a partition that does not exist.  
    26.     // 检测rdd分区以确保我们不会在一个不存在的partition上launch一个task  
    27.     val maxPartitions = rdd.partitions.length  
    28.     partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>  
    29.       throw new IllegalArgumentException(  
    30.         "Attempting to access a non-existent partition: " + p + ". " +  
    31.           "Total number of partitions: " + maxPartitions)  
    32.     }  
    33.   
    34.     // 为Job生成一个jobId,jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增  
    35.     val jobId = nextJobId.getAndIncrement()  
    36.       
    37.     // 如果partitions大小为0,即没有需要执行任务的分区,快速返回  
    38.     if (partitions.size == 0) {  
    39.       // Return immediately if the job is running 0 tasks  
    40.       return new JobWaiter[U](this, jobId, 0, resultHandler)  
    41.     }  
    42.   
    43.     assert(partitions.size > 0)  
    44.       
    45.     // func转化下,否则JobSubmitted无法接受这个func参数,T转变为_  
    46.     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  
    47.       
    48.     // 创建一个JobWaiter对象  
    49.     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)  
    50.       
    51.     // eventProcessLoop加入一个JobSubmitted事件到事件队列中  
    52.     eventProcessLoop.post(JobSubmitted(  
    53.       jobId, rdd, func2, partitions.toArray, callSite, waiter,  
    54.       SerializationUtils.clone(properties)))  
    55.       
    56.     // 返回JobWaiter  
    57.     waiter  
    58.   }  

            submitJob()方法一共做了5件事情:

            第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;

            第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;

            第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;

            第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;

            第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。

            这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 创建DAGSchedulerEventProcessLoop类型的成员变量eventProcessLoop  
    2.   private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)  

            DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * An event loop to receive events from the caller and process all events in the event thread. It 
    3.  * will start an exclusive event thread to process all events. 
    4.  * EventLoop用来接收来自调用者的事件并在event thread中除了所有的事件。它将开启一个专门的事件处理线程处理所有的事件。 
    5.  * 
    6.  * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can 
    7.  * handle events in time to avoid the potential OOM. 
    8.  */  
    9. private[spark] abstract class EventLoop[E](name: String) extends Logging {  
    10.     
    11.   // LinkedBlockingDeque类型的事件队列,队列元素为E类型  
    12.   private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  
    13.   
    14.   // 标志位  
    15.   private val stopped = new AtomicBoolean(false)  
    16.   
    17.   // 事件处理线程  
    18.   private val eventThread = new Thread(name) {  
    19.     // 设置为后台线程  
    20.     setDaemon(true)  
    21.   
    22.     override def run(): Unit = {  
    23.       try {  
    24.         // 如果标志位stopped没有被设置为true,一直循环  
    25.         while (!stopped.get) {  
    26.           // 从事件队列中take一条事件  
    27.           val event = eventQueue.take()  
    28.           try {  
    29.             // 调用onReceive()方法进行处理  
    30.             onReceive(event)  
    31.           } catch {  
    32.             case NonFatal(e) => {  
    33.               try {  
    34.                 onError(e)  
    35.               } catch {  
    36.                 case NonFatal(e) => logError("Unexpected error in " + name, e)  
    37.               }  
    38.             }  
    39.           }  
    40.         }  
    41.       } catch {  
    42.         case ie: InterruptedException => // exit even if eventQueue is not empty  
    43.         case NonFatal(e) => logError("Unexpected error in " + name, e)  
    44.       }  
    45.     }  
    46.   
    47.   }  
    48.   
    49.   def start(): Unit = {  
    50.     if (stopped.get) {  
    51.       throw new IllegalStateException(name + " has already been stopped")  
    52.     }  
    53.     // Call onStart before starting the event thread to make sure it happens before onReceive  
    54.     onStart()  
    55.     eventThread.start()  
    56.   }  
    57.   
    58.   def stop(): Unit = {  
    59.     if (stopped.compareAndSet(false, true)) {  
    60.       eventThread.interrupt()  
    61.       var onStopCalled = false  
    62.       try {  
    63.         eventThread.join()  
    64.         // Call onStop after the event thread exits to make sure onReceive happens before onStop  
    65.         onStopCalled = true  
    66.         onStop()  
    67.       } catch {  
    68.         case ie: InterruptedException =>  
    69.           Thread.currentThread().interrupt()  
    70.           if (!onStopCalled) {  
    71.             // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since  
    72.             // it's already called.  
    73.             onStop()  
    74.           }  
    75.       }  
    76.     } else {  
    77.       // Keep quiet to allow calling `stop` multiple times.  
    78.     }  
    79.   }  
    80.   
    81.   /** 
    82.    * Put the event into the event queue. The event thread will process it later. 
    83.    * 将事件加入到时间队列。事件线程过会会处理它。 
    84.    */  
    85.   def post(event: E): Unit = {  
    86.     // 将事件加入到待处理队列  
    87.     eventQueue.put(event)  
    88.   }  
    89.   
    90.   /** 
    91.    * Return if the event thread has already been started but not yet stopped. 
    92.    */  
    93.   def isActive: Boolean = eventThread.isAlive  
    94.   
    95.   /** 
    96.    * Invoked when `start()` is called but before the event thread starts. 
    97.    */  
    98.   protected def onStart(): Unit = {}  
    99.   
    100.   /** 
    101.    * Invoked when `stop()` is called and the event thread exits. 
    102.    */  
    103.   protected def onStop(): Unit = {}  
    104.   
    105.   /** 
    106.    * Invoked in the event thread when polling events from the event queue. 
    107.    * 
    108.    * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked 
    109.    * and cannot process events in time. If you want to call some blocking actions, run them in 
    110.    * another thread. 
    111.    */  
    112.   protected def onReceive(event: E): Unit  
    113.   
    114.   /** 
    115.    * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError` 
    116.    * will be ignored. 
    117.    */  
    118.   protected def onError(e: Throwable): Unit  
    119.   
    120. }  

            我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // LinkedBlockingDeque类型的事件队列,队列元素为E类型  
    2.   private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  

            并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 事件处理线程  
    2.   private val eventThread = new Thread(name) {  
    3.     // 设置为后台线程  
    4.     setDaemon(true)  
    5.   
    6.     override def run(): Unit = {  
    7.       try {  
    8.         // 如果标志位stopped没有被设置为true,一直循环  
    9.         while (!stopped.get) {  
    10.           // 从事件队列中take一条事件  
    11.           val event = eventQueue.take()  
    12.           try {  
    13.             // 调用onReceive()方法进行处理  
    14.             onReceive(event)  
    15.           } catch {  
    16.             case NonFatal(e) => {  
    17.               try {  
    18.                 onError(e)  
    19.               } catch {  
    20.                 case NonFatal(e) => logError("Unexpected error in " + name, e)  
    21.               }  
    22.             }  
    23.           }  
    24.         }  
    25.       } catch {  
    26.         case ie: InterruptedException => // exit even if eventQueue is not empty  
    27.         case NonFatal(e) => logError("Unexpected error in " + name, e)  
    28.       }  
    29.     }  
    30.   
    31.   }  

            那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * Put the event into the event queue. The event thread will process it later. 
    3.    * 将事件加入到时间队列。事件线程过会会处理它。 
    4.    */  
    5.   def post(event: E): Unit = {  
    6.     // 将事件加入到待处理队列  
    7.     eventQueue.put(event)  
    8.   }  

            言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.    * The main event loop of the DAG scheduler. 
    3.    * DAGScheduler中事件主循环 
    4.    */  
    5.   override def onReceive(event: DAGSchedulerEvent): Unit = {  
    6.     val timerContext = timer.time()  
    7.     try {  
    8.       // 调用doOnReceive()方法,将DAGSchedulerEvent类型的event传递进去  
    9.       doOnReceive(event)  
    10.     } finally {  
    11.       timerContext.stop()  
    12.     }  
    13.   }  

            继续看doOnReceive()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 事件处理调度函数  
    2.   private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {  
    3.       
    4.     // 如果是JobSubmitted事件,调用dagScheduler.handleJobSubmitted()方法处理  
    5.     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>  
    6.       dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)  
    7.   
    8.     // 如果是MapStageSubmitted事件,调用dagScheduler.handleMapStageSubmitted()方法处理  
    9.     case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>  
    10.       dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)  
    11.   
    12.     case StageCancelled(stageId) =>  
    13.       dagScheduler.handleStageCancellation(stageId)  
    14.   
    15.     case JobCancelled(jobId) =>  
    16.       dagScheduler.handleJobCancellation(jobId)  
    17.   
    18.     case JobGroupCancelled(groupId) =>  
    19.       dagScheduler.handleJobGroupCancelled(groupId)  
    20.   
    21.     case AllJobsCancelled =>  
    22.       dagScheduler.doCancelAllJobs()  
    23.   
    24.     case ExecutorAdded(execId, host) =>  
    25.       dagScheduler.handleExecutorAdded(execId, host)  
    26.   
    27.     case ExecutorLost(execId) =>  
    28.       dagScheduler.handleExecutorLost(execId, fetchFailed = false)  
    29.   
    30.     case BeginEvent(task, taskInfo) =>  
    31.       dagScheduler.handleBeginEvent(task, taskInfo)  
    32.   
    33.     case GettingResultEvent(taskInfo) =>  
    34.       dagScheduler.handleGetTaskResult(taskInfo)  
    35.   
    36.     case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>  
    37.       dagScheduler.handleTaskCompletion(completion)  
    38.   
    39.     case TaskSetFailed(taskSet, reason, exception) =>  
    40.       dagScheduler.handleTaskSetFailed(taskSet, reason, exception)  
    41.   
    42.     case ResubmitFailedStages =>  
    43.       dagScheduler.resubmitFailedStages()  
    44.   }  

            对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。

            好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~

    博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50667966

  • 相关阅读:
    【转】CUDA5/CentOS6.4
    【转】centos 6.4 samba 安装配置
    【转】Install MATLAB 2013a on CentOS 6.4 x64 with mode silent
    【转】Getting xrdp to work on CentOS 6.4
    【VLFeat】使用matlab版本计算HOG
    Unofficial Windows Binaries for Python Extension Packages
    March 06th, 2018 Week 10th Tuesday
    March 05th, 2018 Week 10th Monday
    March 04th, 2018 Week 10th Sunday
    March 03rd, 2018 Week 9th Saturday
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5274454.html
Copyright © 2011-2022 走看看