zoukankan      html  css  js  c++  java
  • spark 笔记 7: DAGScheduler

    在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的。这是一个很重要的类。在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了。另外,应该先看看DAG相关的概念和论文 http://en.wikipedia.org/wiki/Directed_acyclic_graph    http://www.netlib.org/utk/people/JackDongarra/PAPERS/DAGuE_technical_report.pdf 


    ===========================Job 提交流程======================================
    DAGSchedulerEventProcessActor::submitJob   --每个action都会调用到一个submitJob的操作
        -> send: JobSubmitted --它发送一个消息给DAGScheduler(因为提交job的机器可能不是master?)
            -> handleJobSubmitted   --DAGScheduler处理接收到的消息
                -> newStage   --创建一个stage
                -> new ActiveJob   ---找到一个active状态的
                -> [runLocally]}  --如果是简单的job,直接在本地执行。
                localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
                ->runLocally(job)  --don't block the DAGScheduler event loop or other concurrent jobs
                    ->runLocallyWithinThread(job)  --创建新的线程执行本地job,不阻塞DAG进程
                        ->TaskContext(job.finalStage.idjob.partitions(0)0runningLocally = true)
                        ->result = job.func(taskContextrdd.iterator(splittaskContext))  执行job
                        ->job.listener.taskSucceeded(0result)  --通知监听者job结果
                        ->listenerBus.post(SparkListenerJobEnd(job.jobIdjobResult))  --通知job结束
                ->submitStage(finalStage)   -- Submits stage, but first recursively submits any missing parents递归提交
                    -> activeJobForStage   --Finds the earliest-created active job that needs the stage。在jobIdToActiveJob找
                    -> getMissingParentStages   --如果一个stage依赖于一个shuffle stage,这个RDD就是missing的
                         ->waitingForVisit.push(stage.rdd)
                ->waitingForVisit.pop()
                ->getShuffleMapStage
                    ->registerShuffleDependencies 将所有父节点的shuffle注册到shuffleToMapStage和mapOutputTracker
                        ->getAncestorShuffleDependencies :返回一个栈,里面装着包含shuffle的父依赖节点;
                        ->newOrUsedStage  --给RDD创建shuffle stage;如果存在,使用老的loc覆盖新的loc
                            ->mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) or
                            ->mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.size)
                        ->shuffleToMapStage(currentShufDep.shuffleId) = stage  --加入DAG的hash属性中
                    ->newOrUsedStage -- 给当前RDD创建shuffle stage
                    ->shuffleToMapStage(shuffleDep.shuffleId) = stage   --加入DAG的hash属性中
                ->NarrowDependency  ->waitingForVisit.push(narrowDep.rdd) --narrowDeps的不分析,直接加入栈去找它的父节点。
                    -> submitMissingTasks  --Called when stage's parents are available and we can now do its task。这个stage没有依赖缺失了。
                -> stage.pendingTasks.clear() 清空正在执行的task。
                -> partitionsToCompute = ? --First figure out the indexes of partition ids to compute. 
                    找出需要执行的分片。shuffle要执行更多分片
                ->runningStages += stage  更新running记录
                ->listenerBus.post(SparkListenerStageSubmitted(stage.latestInfoproperties))  --通知应用程序stage被提交。
                ->Broadcasted binary for the task, used to dispatch tasks to executors. serialized copy of the RDD and for each task,
                    which means each task gets a different copy of the RDD, This is necessary in Hadoop 
                    where the JobConf/Configuration object is not thread-safe
                         ->// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
                ->// For ResultTask, serialize and broadcast (rdd, func).
                ->new ShuffleMapTask(stage.idtaskBinarypartlocs)  创建task
                ->new ResultTask(stage.idtaskBinarypartlocsid)
                -> Preemptively serialize a task to make sure it can be serialized. For catch exception.
                ->stage.pendingTasks ++= tasks
                ->taskScheduler.submitTasks  --将task提交到taskScheduler
                   -> submitStage(parent) --(递归)如果能找到一个stage是missing状态,那就将它的依赖节点submit
    ======================end=========================================
    每个job都有一个DAG调度器,跟踪RDD和Stage的实例化,并寻找一个最优(?)的调度来执行这个job。它提交一个taskSet给TaskScheduler在集群上执行task。
    /**
    * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
    * stages for each job
    , keeps track of which RDDs and stage outputs are materialized, and finds a
    * minimal schedule to run the job
    . It then submits stages as TaskSets to an underlying
    * TaskScheduler implementation that runs them on the cluster.
    *
    * In addition to coming up with a DAG of stages, this class also determines the preferred
    * locations to run
    each task on, based on the current cache status, and passes these to the
    * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
    * lost, in which case old stages may need to be resubmitted
    . Failures *within* a stage that are
    * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
    * a small number of times before cancelling the whole stage.
    *
    */
    package org.apache.spark.scheduler
    private[spark]
    class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = SystemClock)
    extends Logging {
    状态机(actor 消息响应):
    private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
    extends Actor with Logging {

    override def preStart() {
    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
    // valid when the messages arrive
    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
    }

    /**
    * The main event loop of the DAG scheduler.
    */
    def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
    listener, properties)

    case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
    dagScheduler.handleExecutorLost(execId)

    case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
    dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
    }

    重要的属性:
    private val nextStageId = new AtomicInteger(0)
    private[scheduler] val nextJobId = new AtomicInteger(0)
    private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
    private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
    private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
    private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
    // Stages we need to run whose parents aren't done
    private[scheduler] val waitingStages = new HashSet[Stage]
    // Stages we are running right now
    private[scheduler] val runningStages = new HashSet[Stage]
    // Stages that must be resubmitted due to fetch failures
    private[scheduler] val failedStages = new HashSet[Stage]
    private[scheduler] val activeJobs = new HashSet[ActiveJob]
    // Contains the locations that each RDD's partitions are cached on
    private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
    private val dagSchedulerActorSupervisor =
    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
    // A closure serializer that we reuse.
    // This is only safe because DAGScheduler runs in a single thread.
    private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

    private[scheduler] var eventProcessActor: ActorRef = _

    private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    allowLocal: Boolean,
    callSite: CallSite,
    listener: JobListener,
    properties: Properties = null)
    {
    /** Submits stage, but first recursively submits any missing parents. */
    private def submitStage(stage: Stage) {
    /** Called when stage's parents are available and we can now do its task. */
    private def submitMissingTasks(stage: Stage, jobId: Int) {

    /** Finds the earliest-created active job that needs the stage */
    // TODO: Probably should actually find among the active jobs that need this
    // stage the one with the highest priority (highest-priority pool, earliest created).
    // That should take care of at least part of the priority inversion problem with
    // cross-job dependencies.
    private def activeJobForStage(stage: Stage): Option[Int] = {
    val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted
    jobsThatUseStage.find(jobIdToActiveJob.contains)
    }

    /**
    * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
    * architecture where any thread can post an event (e.g. a task finishing or a new job being
    * submitted) but there is a single "logic" thread that reads these events and takes decisions.
    * This greatly simplifies synchronization.
    */
    private[scheduler] sealed trait DAGSchedulerEvent
    /**
    * Asynchronously passes SparkListenerEvents to registered SparkListeners.
    *
    * Until start() is called, all posted events are only buffered. Only after this listener bus
    * has started will events be actually propagated to all attached listeners. This listener bus
    * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
    */
    private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
    /**
    * A SparkListenerEvent bus that relays events to its listeners
    */
    private[spark] trait SparkListenerBus extends Logging {

    // SparkListeners attached to this event bus
    protected val sparkListeners = new ArrayBuffer[SparkListener]
    with mutable.SynchronizedBuffer[SparkListener]

    def addListener(listener: SparkListener) {
    sparkListeners += listener
    }

    /**
    * Post an event to all attached listeners.
    * This does nothing if the event is SparkListenerShutdown.
    */
    def postToAll(event: SparkListenerEvent) {

    /**
    * Apply the given function to all attached listeners, catching and logging any exception.
    */
    private def foreachListener(f: SparkListener => Unit): Unit = {
    sparkListeners.foreach { listener =>
    try {
    f(listener)
    } catch {
    case e: Exception =>
    logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
    }
    }
    }

    }











  • 相关阅读:
    Spring + MySQL + Mybatis + Redis【二级缓存】执行流程分析
    Spring + MySQL + Mybatis + Redis【二级缓存】
    MyBatis的笔记
    Spring事务:一种编程式事务,三种声明式事务
    笔记
    mybatis-generator自定义注释生成
    做准备的笔记
    常用DOS命令和Linux命令
    数据库MongoDB查询语句--持续更新
    SpringBoot集成websocket实现后端向页面发送消息
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4245262.html
Copyright © 2011-2022 走看看