zoukankan      html  css  js  c++  java
  • Spark-RDD-DAG解析

    1.原理说明

    有向无环图:如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个

    有向无环图(DAG图)

    在Spark中对任务进行排队,形成一个集合就是DAG图,每一个顶点就是一个任务,每一条边代表一个依赖关系

    通过DAG可以对计算流程进行优化,比如将单一节点的计算操作合并,对涉及shuffle操作的步骤划分stage等

    DAG生成的重点是对Stage的划分,划分依据是RDD的依赖关系,对宽依赖会进行Stage的切分

    DAG生成的源码在DAGScheduler.scala类:

    /**
     * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
     * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
     * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
     * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
     * tasks that can run right away based on the data that's already on the cluster (e.g. map output
     * files from previous stages), though it may fail if this data becomes unavailable.
     *
     * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with
     * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks
     * in each stage, but operations with shuffle dependencies require multiple stages (one to write a
     * set of map output files, and another to read those files after a barrier). In the end, every
     * stage will have only shuffle dependencies on other stages, and may compute multiple operations
     * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
     * various RDDs (MappedRDD, FilteredRDD, etc).
     *
     * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
     * locations to run each task on, based on the current cache status, and passes these to the
     * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
     * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
     * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
     * a small number of times before cancelling the whole stage.
     *
     * When looking through this code, there are several key concepts:
     *
     *  - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler.
     *    For example, when the user calls an action, like count(), a job will be submitted through
     *    submitJob. Each Job may require the execution of multiple stages to build intermediate data.
     *
     *  - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each
     *    task computes the same function on partitions of the same RDD. Stages are separated at shuffle
     *    boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
     *    fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
     *    executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
     *    Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
     *
     *  - Tasks are individual units of work, each sent to one machine.
     *
     *  - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them
     *    and likewise remembers which shuffle map stages have already produced output files to avoid
     *    redoing the map side of a shuffle.
     *
     *  - Preferred locations: the DAGScheduler also computes where to run each task in a stage based
     *    on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
     *
     *  - Cleanup: all data structures are cleared when the running jobs that depend on them finish,
     *    to prevent memory leaks in a long-running application.
     *
     * To recover from failures, the same stage might need to run multiple times, which are called
     * "attempts". If the TaskScheduler reports that a task failed because a map output file from a
     * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a
     * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small
     * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost
     * stage(s) that compute the missing tasks. As part of this process, we might also have to create
     * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since
     * tasks from the old attempt of a stage could still be running, care must be taken to map any
     * events received in the correct Stage object.
     *
     * Here's a checklist to use when making or reviewing changes to this class:
     *
     *  - All data structures should be cleared when the jobs involving them end to avoid indefinite
     *    accumulation of state in long-running programs.
     *
     *  - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
     *    include the new structure. This will help to catch memory leaks.
     */
    private[spark]
    class DAGScheduler(
        private[scheduler] val sc: SparkContext,
        private[scheduler] val taskScheduler: TaskScheduler,
        listenerBus: LiveListenerBus,
        mapOutputTracker: MapOutputTrackerMaster,
        blockManagerMaster: BlockManagerMaster,
        env: SparkEnv,
        clock: Clock = new SystemClock())
      extends Logging {
        
        /**
       * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
       * the provided firstJobId.
       * 获取或创建一个给定RDD的父Stages列表,根据给定的firstJobId创建新的Stages
       */
      private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
      /**
       * Returns shuffle dependencies that are immediate parents of the given RDD.
       * 返回给定RDD的父节点中直接的shuffle依赖,不会返回更多
       * This function will not return more distant ancestors.  For example, if C has a shuffle
       * dependency on B which has a shuffle dependency on A:
       *
       * A <-- B <-- C
       *
       * calling this function with rdd C will only return the B <-- C dependency.
       *
       * This function is scheduler-visible for the purpose of unit testing.
       */
      private[scheduler] def getShuffleDependencies(
          rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        val waitingForVisit = new Stack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
              case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
              case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
          }
        }
        parents
      }
    

    2.实例解析

        val conf = new SparkConf().setAppName("Demo1").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val lines = sc.textFile("/Users/jordan95225/IdeaProjects/MyLearning/Spark/src/main/resources/data.txt")
        //操作1
        val words: RDD[String] = lines.flatMap(lines => lines.split(" "))
        //操作2
        val pairs: RDD[(String, Int)] = words.map(word => (word, 1))
        //操作3
        val wordCounts: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
        wordCounts.collect().foreach(println)
        sc.stop()
    

    程序运行之前,DAG调度器会将整个流程设为一个Stage,包含3个操作,5个RDD(读取文件、flatMap、map、reduceByKey local阶段操作和shuffle阶段操作),然后回溯整个流程,发现在shuffleRDD与MapPartitionRDD中存在Shuffle操作,切开形成两个Stage,接着一直往前回溯发现都不存在Shuffle,归为同一个Stage,回溯完成后,形成DAG,包含两个Stage

  • 相关阅读:
    7.15的一些小练习
    关于turtle画蟒蛇小实例
    Javascript我学之四作用域
    Javascript我学之三函数的参数
    Javascript我学之二函数定义
    JavaScript我学之一变量类型
    叩响C#之门-继承
    《C#高级编程》之泛型--1创建泛型类
    VB6 仿.netWinfrom控件 Anchor属性类
    新写了XmlFile类
  • 原文地址:https://www.cnblogs.com/jordan95225/p/13458180.html
Copyright © 2011-2022 走看看