zoukankan      html  css  js  c++  java
  • Spark作业执行原理(三)——提交调度阶段

            在上一篇划分调度阶段中的handleJobSubmitted方法中,提到finalStage的生成,在生成finalStage的同时,建立起所有Stage的依赖关系,然后通过finalStage生成一个作业实例,在该作业实例中按照顺序提交调度阶段进行执行,在执行过程中监听总线获取作业、阶段执行的情况。

    回顾handleJobSubmitted方法中部分源码:

    //根据最后一个阶段生成作业
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        
        ...
     
        //提交作业
        submitStage(finalStage)
        submitWaitingStages()

            作业的提交阶段从submitStage方法开始,在submitStage方法中调用getMissingParentStages获取finalStage的父调度阶段,如果不存在父调度阶段,则使用submitMissingTasks方法提交执行;如果存在,则把父调度阶段放进waitingStages列表中,通过递归的方式调用submitStage方法。通过这样的逻辑,就可以根据stage的依赖关系,从最前面的stage开始执行作业,一直到最后一个。

    submitStage方法部分源码:

    private def submitStage(stage: Stage){
        val jobId = activeJobForStage(stage)
        if(jobId.isDefined){
            logDebug("submitStage("+stage+")")
            if(!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)){
                //获取父调度阶段,但并不是通过调度阶段的依赖关系,而是通过Stage的判断依据来获取父调度阶段
                val missing = getMissingParentStages(stage).sortBy(_.id)
                if(missing.isEmpty){
                    //如果不存在父调度阶段,调用submitMissingTasks()提交
                    logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
                    submitMissingTasks(stage, jobId.get)
                }else{
                    //如果存在父调度阶段,将当前阶段放进等待列表,同时递归调用submitStage方法,直至找到最前面的没有父调度阶段的Stage
                    for(parent <- missing){
                        submitStage(parent)
                    }
                    waitingStages += stage
                }
            }
        }else{
            abortStage(stage, "No active job for stage " + stage.id, None)
        }
    }

            鉴于递归的逻辑,当最开始的调度阶段完成后,相继提交后续调度阶段,但注意一个问题,调度当前阶段时,必须依赖父调度阶段的状态,显然,父调度阶段的成功与否直接影响后续阶段的调度,所以,在调度后续阶段前,先判断当前调度阶段所依赖的父调度阶段的结果是否可用(即运行是否成功):如果可用,则提交当前调度阶段;如果不可用,则尝试提交结果不可用的父调度阶段。至于什么时候进行是否可用判断呢?这个工作交给是在ShuffleMapTask完成时(即已经交给executor执行了)进行,DAGScheduler会检查调度阶段的所有任务是否都完成:如果执行失败,则重新提交该阶段;如果所有任务成功,则扫描等待调度阶段列表,检查列表中的阶段的父调度阶段是否存在未完成,如果不存在,则表明该调度阶段准备就绪,生成实例并提交运行。

    提交调度阶段流运行顺序:

    为了方便理解,对上一篇最后的图添加了一个stage。

    1. 在submitStage方法中,先创建作业实例,然后判断该调度阶段是否存在父调度阶段,由于ResultStage3有两个父调度阶段ShuffleMapStage0和ShuffleMapStage2,所以ResultStage3会先放进waitingStages中;
    2. 然后递归调用submitStage,发现ShuffleMapStage0没有父调度阶段,而ShuffleMapStage2有一个父调度阶段ShuffleMapStage1,所以ShuffleMapStage2会被放进waitingStages中,再之,ShuffleMapStage1也没有父调度阶段,则ShuffleMapStage0和ShuffleMapStage1会被放到执行列表中,作为第一次调度使用submitMissingTasks方法,提交运行;
    3. Executor执行完成时会发送消息,通知DAGScheduler更新状态并检查运行情况,如果发现有任务执行失败,则重新提交调度阶段;如果所有任务执行成功,则继续提交下一次调度阶段。这里进入第二次调度阶段,首先扫描等待队列的stage是否有父调度阶段没有完成,显然ResultStage3还有ShuffleMapStage2没有完成,所以ResultStage3继续放在等待队列,ShuffleMapStage2则没有父调度阶段,可以放在运行队列中,作为第二次调度提交;
    4. 此时,ShuffleMapStage2执行完毕,ResultStage3已经没有父调度阶段,可以作为第三次调度提交。
  • 相关阅读:
    此查询使用的不是 ANSI 外部联接运算符
    centos重启命令
    updatePanel 加载完成后回调JS
    建站推荐十个免费的CMS内容管理系统(Php+mysql)
    [转]最值得拥有的免费Bootstrap后台管理模板
    Got a packet bigger than 'max_allowed_packet' bytes”
    ECshop商城程序常见的96个小问题汇总
    linux 命令
    mysql 存储过程
    千万级记录的Discuz论坛导致MySQL CPU 100%的优化笔记
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11355900.html
Copyright © 2011-2022 走看看