zoukankan      html  css  js  c++  java
  • Spark作业执行原理(二)——划分调度阶段

            Spark调度阶段的划分是由DAGScheduler实现,DAGScheduler会从最后一个RDD出发,根据RDD的lineage使用广度优先算法遍历整个依赖树(总共使用了两次,一次是遍历区分ResultStage范围;另一次则是遍历获取ShuffleMapStage划分依据,用来划分每个ShuffleMapStage范围),从而划分调度阶段,调度阶段的划分依据是以是否进行shuffle操作进行的。

            真正的stage划分代码,是从handleJobSubmitted方法中根据最后一个RDD实例化ResultStage对象开始,实例化过程中,finalRDD使用getParentStages找出其依赖的祖先RDD是否存在Shuffle操作,如果没有存在Shuffle操作,则本次作业只有一个ResultStage;如果存在Shuffle操作,则本次作业除了一个ResultStage之外,还至少一个ShuffleMapStage。

    handleJobSubmitted部分源码:

    private[scheduler] def handleJobSubmitted(jobId:Int, finalRDD:RDD[_], func:(TaskContext, Iterator[_]) => _, partitions:Array[Int], callSite:CallSite, listener:JobListener, properties:Properties){
        //定义一个ResultStage类型对象,用于存储DAG划分出来的最后一个Stage
        val finalStage:ResultStage = null
        try{
            finalStage = new ResultStage(finalRDD, func, partitions, jobId, callSite)
        }catch{...}
     
        //根据最后一个阶段生成作业
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        
        ...
     
        //提交作业
        submitStage(finalStage)
        submitWaitingStages()
    }

            上面代码在实例化ResultStage时,传入了一个finalRDD,其实这个finalRDD会被传到getParentStagesAndId的方法中,在该方法中调用getParentStages,生成最后一个调度阶段finalStage(这里第一次使用广度优先算法)。

    private def getParentStages(rdd:RDD[_], firstJobId:Int):List[Stage] = {
        val parents = new HashSet[Stage]     //   parents是一个元素类型为Stage的HashSet集合
        val visited = new HashSet[RDD[_]]    //用于存放已经访问过的RDD
     
        //存放非ShuffleDependency的RDD
        val waitingForVisit = new Stack[RDD[_]]
     
        //广度优先遍历,根据当前所依赖的RDD类型,进行不同的操作
        def visit(r:RDD[_]){
            if(!visited(r)){
                visited += r    //将当前RDD标记为已访问,即存放到visited的HashSet集合里面
                for (dep <- r.dependencies){
                    //当前RDD所依赖的父RDD类型为ShuffleDepedency时,需要向前遍历,获取ShuffleMapStage
                    case shufDep:ShuffleDependency[_, _, _] =>
                        parents += getShuffleMapStage(shufDep, firstJobId)
                    case _ =>
                        waitingForVisit.push(dep.rdd)
                }
            }
        }
     
        waitingFoVisit.push(rdd)    
        //开始遍历Stack中的rdd
        while(waitingForVisit.nonEmpty){
            visit(waitingForVisit.pop())
        }
        parents.toList    //返回parents
        
    }

            上面代码显示,如果当前遍历的RDD,其所依赖的父RDD的类型是ShuffleDependency类型时,需要往前遍历,找出所有ShuffleMapStage(或者说找出所有划分ShuffleMapStage的依据——RDD),该算法也是用到了广度优先遍历算法,跟getParentStage类似,具体由getAncestorShuffleDependencies方法实现。

    getAncestorShuffleDependencies方法部分源码:

    private def getAncestorShuffleDependencies(rdd:RDD[_]):Stack[ShuffleDependency[_, _, _]] = {
        val parents = new Stack[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
     
        //用于存放非ShuffleDependency类型的RDD
        val waitingForVisit = new Stack[RDD[_]]
        def visit(r:RDD[_]){
            if(!visited(r)){
                visited += r    //标记当前rdd已经被访问过,即加入visited 中
                for(dep <- r.dependencies){
                    case shufDep:ShuffleDependency[_, _, _] =>
                        if(!shuffleToMapStage.contains(shufDep.shuffleId)){
                            parents.push(shufDep)    //shuffle依据放进Stack中
                        }
                    case _ =>    //不操作
                }
            }
        }
     
        //向前遍历依赖树,获取所有的类型为ShuffleDependency的RDD,作为划分阶段的依据
        waitingForVisit.push(rdd)
        while(waitingForVisit.nonEmpty){
            visit(waitingForVisit.pop())
        }
        parents    //返回parents
    }

            getAncestorShuffleDependencies方法其实只是找出了ShuffleDependency类型的RDD,而这些RDD就是划分各个ShuffleMapStage的依据。

            当所有阶段的划分操作完成后,这些阶段就会建立起依赖关系。该依赖关系是通过调度阶段属性parents:List[Stage]来定义,通过该属性可以获取当前阶段所有祖先阶段,可以根据这些信息按顺序提交调度阶段进行运行。

    下面是一张Spark调度阶段的Stage划分图:

    Spark调度阶段Stage划分流程:

    1. 在SparkContext中触发提交作业时,会调用DAGScheduler的handleJobSubmitted方法,在该方法中会先找到最后一个RDD(即RDD7),并调用getParentStages方法;
    2. 在getParentStages方法中判断RDD7所依赖的父RDD是否存在Shuffle操作,上图RDD6属于ShuffleDepedency类型,则对RDD6进行下一步操作;
    3. 通过getAncestorShuffleDependencies方法,对RDD6进行向前遍历,寻找所有的划分依据,向前遍历,发现只有RDD4,所以RDD3->RDD4被划分成一个ShuffleMapStage0,RDD5->RDD6被划分成ShuffleMapStage1;
    4. 最后,剩下的生成ResultStage2,一共3个阶段,在提交阶段按顺序运行。
  • 相关阅读:
    SecureCRT 连接 虚拟机Linux 命令
    如何使用secureCRT连接vmware中的虚拟主机?
    SecureCRT8.1+SecureCRT_keygen完成注册
    常用python机器学习库总结
    Torch7在Ubuntu下的安装与配置
    朴素贝叶斯算法 & 应用实例
    编写MR代码中,JAVA注意事项
    march.
    Docker CentOS 7.2镜像systemd问题解决办法
    Docker 基础命令 简要入门
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11355800.html
Copyright © 2011-2022 走看看