zoukankan      html  css  js  c++  java
  • Spark源码阅读(1): Stage划分

    Spark中job由action动作生成,那么stage是如何划分的呢?一般的解答是根据宽窄依赖划分。那么我们深入源码看看吧

    一个action 例如count,会在多次runJob中传递,最终会到一个函数

    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    dagScheduler是DAGScheduler的一个实例,因此,后面的工作都交给DAGScheduler进行。

    在dagScheduler.runJob主要是调用submitJob函数

    submitJob的主要工作是向job scheduler 提交一个job并且创建一个JobWaiter对象。 这个JobWaiter对象一直阻塞到job完成或者取消。
    其中提交动作实际上是将Job提交给一个DAGSchedulerEventProcessLoop,在这个里面处理job的运行。主要代码如下那么DAGSchedulerEventProcessLoop是如何处理的呢?

    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
    

      

    DAGSchedulerEventProcessLoop中回去接受,然后去查看是哪种情况。从这里可以看出,

    还有很多操作会被塞进这个Loop中。这么做的原因呢?(解耦?)

    override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    doOnReceive(event)
    } finally {
    timerContext.stop()
    }
    }
    

      

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
    case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)
    
    case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)
    
    ...
    }
    

      

    DAGSchedulerEventProcessLoop的父类EventLoop有开线程,因此上述处理工作会在另一个线程中进行。

    private val eventThread = new Thread(name) {
    setDaemon(true)
    
    override def run(): Unit = {
    try {
    while (!stopped.get) {
    ...
    }
    

      

    另外,其实所有的处理工作还是在DAGScheduler中进行。接下来深入handleJobSubmitted
    这个给出全部代码。主要逻辑是利用最后一个RDD去生成ResultStage。生成之后创建ActiveJob并记录相关信息,并通过submitStage(finalStage)处理

    private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
    var finalStage: ResultStage = null
    try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
    } catch {
    case e: Exception =>
    logWarning("Creating new stage failed due to exception - job: " + jobId, e)
    listener.jobFailed(e)
    return
    }
    if (finalStage != null) {
    val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.resultOfJob = Some(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
    }
    submitWaitingStages()
    }
    

      

    接下来分两个脉络进行(1)newResultStage 和 (2)submitStage

    (1) newResultStage
    在这里要生成一个ResultStage,这个stage的创建是需要其父stage的信息的,所以通过getParentStagesAndId获取

    private def newResultStage(
    rdd: RDD[_],
    numTasks: Int,
    jobId: Int,
    callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)
    
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
    }
    

      

    这个里面由getParentStages进行。利用一个栈来处理未访问的rdd,首先是末尾的rdd,然后看其依赖。
    如果是一个ShuffleDependency

    private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
    if (!visited(r)) {
    visited += r
    // Kind of ugly: need to register RDDs with the cache here since
    // we can't do it in its constructor because # of partitions is unknown
    for (dep <- r.dependencies) {
    dep match {
    case shufDep: ShuffleDependency[_, _, _] =>
    parents += getShuffleMapStage(shufDep, firstJobId)
    case _ =>
    waitingForVisit.push(dep.rdd)
    }
    }
    }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.pop())
    }
    parents.toList
    }
    

      


    这里面通过判断一个RDD的依赖是不是Shuffle的来进行。像reduceByKey这样的操作。RDD1是转换前的,RDD2为转换后的,
    那么RDD2的依赖就是ShuffleDependency, 这个ShuffleDependency对象中也有RDD,其RDD就是RDD1.
    如果是ShuffleDependency的话就通过getShuffleDependency来获得。
    那么这段代码的大致原理就是,先把末尾的RDD加入stack中,也就是waitingForVisit, 然后获取是ShuffleDependency的stage。
    所以根据ShuffleDependency来划分stage。但是好像还没有看到如何将中间的那些RDD放到一个stage中。

    继续深入getShuffleMapStage,

    private def getShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) => stage
    case None =>
    // We are going to register ancestor shuffle dependencies
    registerShuffleDependencies(shuffleDep, firstJobId)
    // Then register current shuffleDep
    val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
    shuffleToMapStage(shuffleDep.shuffleId) = stage
    
    stage
    }
    }
    

      

    如果这个ShuffleDependency已经被生成过ShuffleMapStage, 那么直接获取,如果没有则需要注册。

    深入registerShuffleDependencies这个函数,

    private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
    val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) # 获取这个ShuffleDependency前面的还没在
    # shuffleToMapStage中注册的ShuffleDependency
    while (parentsWithNoMapStage.nonEmpty) {
    val currentShufDep = parentsWithNoMapStage.pop()
    val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
    shuffleToMapStage(currentShufDep.shuffleId) = stage
    }
    }
    

      


    那么newOrUsedShuffleStage这个函数是干嘛的呢?用于生成一个stage,并注册到shuffleToMapStage



    (2) submitStage

  • 相关阅读:
    马云演讲:给自己一个梦想,给自己一个承诺,给自己一个坚持!
    转:如何成为一个伟大的开发者
    数据挖掘之七种常用的方法
    windows命令行
    100万亿意味着什么?
    ubuntu环境配置
    Ubuntu runlevel修改
    Ubuntu 用户及组管理
    Git学习笔记
    Git详解之三 Git分支
  • 原文地址:https://www.cnblogs.com/yxzfscg/p/5093816.html
Copyright © 2011-2022 走看看