zoukankan      html  css  js  c++  java
  • Spark源码学习1.1——DAGScheduler.scala

        本文以Spark1.1.0版本为基础。

        经过前一段时间的学习,基本上能够对Spark的工作流程有一个了解,但是具体的细节还是需要阅读源码,而且后续的科研过程中也肯定要修改源码的,所以最近开始Spark的源码的学习。首先以重要文件为基础分别分析,然后再整体的分析。

    (一)DAGScheduler.scala文件的主要功能

        DAGScheduler是划分Job为stage的调度,它是在作业所需要的数据已经被分为RDD之后执行的。DAGScheduler将Job划分为DAG图,以stage为图的结点,明确了各个stage之间的依赖关系,然后依次提交stage和Job。

    (二)DAGScheduler.scala中的类和方法

    1、initializeEventProcessActor()

        该方法初始化eventProcessActor,也就是说,它可以阻塞线程直到supervisor启动,而在这整个过程中eventProcessActor都不会空闲。

    2、executorHeartbeatReceived(execId,taskMetrics,blockManagerId)

        该方法有三个参数,是用来接收心跳信息的。心跳信息是各个节点向master提供当前状态的,通知master当前节点是正在运行任务的,而运行任务的单位正是executor。

        关于executor的方法还有:executorLost()处理executor执行失败,executorAdd()处理executor添加和tasksetFailed()设置任务失败。

    3、getcacheLocs(RDD[_])

        该方法查询本地缓存,检查本地存在哪些RDD,RDD的本地性是关系到stage分配的重要因素,在后文的函数中会实现。

        关于Cache的方法还有:clearcacheLocs()清除缓存。

    4、getShuffleMapStage(ShuffleDependence[_,_,_],JobId)

        该方法是stage建立的关键,它明确stage的建立方法是以shuffle为边界的,如果出现wide依赖关系,那么就是shuffle的边界,就可以建立新的stage了,建立stage的方法为newOrUsedStage。

    5、newStage(RDD[_],numTask,shuffleDependence,JobId,callsite)和newOrUsedStage(RDD[_],numTask,shuffleDependence,JobId,callsite)

        这两个方法以同样的参数建立Stage,不同的是前者是建立无依赖关系的stage,而后者是由父系stage建立的有一定依赖关系的stage。

    6、getParentStage(RDD[_],JobId)

        该方法根据参数中的RDD数组(也就是一个stage中的所有RDD),查询当前RDD所属的Stage。

    7、registerShuffleDependencies(shuffleDependencies[_,_,_],JobId)

        该方法扫描依赖关系,利用getAncestorShuffleDependencies返回的RDD关系栈将依赖关系加入到Stage中。

    8、getAncestorShuffleDependencies(RDD[_])

        该方法建立RDD之间的依赖关系。

    9、getMissingParentStages(stage)

        该方法在getShuffleMapStage的基础上确定Stage的父系Stage,利用的也是RDD之间的依赖关系。

    10、updataJobIdStageIdMaps(JobId,stage)

        该方法用来更新Stage中的StageId和JobId。

    11、cleanupStateForJobAndIndependentStages(Job)

        该方法清除一个活动的Job的所有状态,以及其所属的不再需要的stage的状态。

    12、submit[T,U](RDD[T],func,partitions,callsite,allowlocal,resulthandler,properties)

        向任务调度器提交一个Job,并同时差生一个Jobwaiter,这个Jobwaiter可以用来保证Job执行时其他Job是阻塞的,也可以用来取消Job。

    13、runJob[T,U](RDD[T],func,partitions,callsite,allowlocal,resulthandler,properties)

        这个方法其实只是对Job提交之后的一个返回信息的处理,如果submitJob方法没有返回异常,就表示执行正确了,否则报错并加入日志。

    14、runApproxiateJob[T,U,R](RDD[T],func,partitions,evaluator,callsite,timeout,properties)

        该方法执行当前Job的下一个Job。

    15、取消任务或者Stage的相关方法:doCancelAllJobs(),cancelStage(stageId)

    16、重新提交失败的或者等待的Stage的方法:resubmitFailedStages(),submitWaitingStages()

    17、runLocally(Job)和runLocallyWithinThread(job)

        前者建立新的线程,启动本地执行;后者在线程中实际执行Job。

    18、handle***()

        这一些以handle开头的方法特点是进行了多次判定。handleJobSubmitted()方法对已经提交的Job进行操作,找到最后一个的Stage,检查Job执行的本地性等,然后将Job中的最后一个Stage提交,提交方法为submitStage()。submitStage()会根据最后一个Stage依次提交父系Stage,这其中需要考虑到丢失的task。handleTaskCompletion()方法在于处理Task执行后的各种状况,失败、阻塞或者失去联系,重新提交Task。详细的代码涉及诸多方法和类。

    19、getPreferredLocsInternal(RDD,partition,visited)

        根据Cache和输入RDD的位置和依赖关系递归查询最适合的位置。

  • 相关阅读:
    J
    I
    uva122 二叉树的实现和层次遍历(bfs)
    A
    HDU 波峰
    2239: 童年的圣诞树
    1734: 堆(DFS)
    1731: 矩阵(前缀和)
    1733: 旋转图像(模拟)
    1728: 社交网络(概率问题 组合数/排列数)
  • 原文地址:https://www.cnblogs.com/zx247549135/p/4124820.html
Copyright © 2011-2022 走看看