zoukankan      html  css  js  c++  java
  • Spark作业执行原理(六)——获取执行结果

            对于Executor的计算结果,会根据结果的大小使用不同的处理策略:

    1. 计算结果在(0,128MB-200KB)区间内:通过Netty直接发送给Driver终端;
    2. 计算结果在[128MB, 1GB]区间内:将结果以taskId为编号存入到BlockManager中,然后通过Netty把编号发送给Driver终端;阈值可通过Netty框架传输参数设置spark.akka.frameSize,默认值是128MB,200KB是Netty预留空间reservedSizeBytes的值;
    3. 计算结果在(1GB,∞)区间内:直接丢弃,可通过spark.driver.maxResultSize配置;

           任务执行完成之后,TaskRunner将任务的执行结果发送给DriverEndpoint,DriverEndpoint接收到信息后,交给TaskSchedulerImpl的statusUpdate方法进行处理,该方法根据不同的任务状态有不同的结果获取策略:

    1. 如果状态类型是TaskState.FINISHED,那么进一步调用TaskResultGetter的enqueueSuccessfulTask。enqueueSuccessfulTask会判断类型,如果IndirectTaskResult,则 需要通过blockid远程来获取结果(sparkEnv.blockManager.getRemoteBytes(blockId));如果是DirectTaskResult,则无需远程获取结果;
    2. 如果状态类型是:TaskState.FAILED、TaskState.KILLED或TaskState.LOST,则调用TaskResultGetter的enqueueFailedTask,特别地,对于TaskState.LOST,还需要将其所在的Executor标记为failed,并根据更新后的Executor重新调度

            如果任务是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游调度阶段,事实上,对于ShuffleMapTask,其结果是一个MapStatus对象,序列化之后存入DirectTaskResult或者IndirectTaskResult中。而DAGScheduler的handleTaskCompletion方法获取这个结果,并把MapStatus注册到MapOutputTrackerMaster中。

    case smt:ShuffleMapTask =>
        val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
        updateAccumulators(event)
        ...
        mapOutputTracker.registerMapOutputs(
            shuffleStage.shuffleDep.shuffleId,
            shuffleStage.outputLocs.map(list => if(list.isEmpty) null else list.head),
            changeEpoch = true)

            对于ResultTask,判断作业是否完成,如果完成,则标记改作业完成,清除作业依赖,并发送消息给系统监听总线,告知作业执行完毕。

    case rt:ResultTask[_, _] =>
        val resultStage = stage.asInstanceOf[ResultStage]
        resultStage.resultOfJob match{
            case Some(job) =>
                if(!job.finished(rt.outputId)){
                    updateAccumulators(event)
                    job.finished(rt.outputId) = true
                    job.numFinished += 1
     
                    if(job.numFinished == job.numPartitions){
                        //标记改作业完成
                        markStageAsFinished(resultStage)
                        //清除依赖资源
                        cleanupStateForJobAndIndepentStates(job)
                        //发送消息给系统监听总线,告知作业执行完毕
                        listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                    }
                }
        }
  • 相关阅读:
    SQL带参数拼接
    ASP.NET+ashx+jQuery动态添加删除表格
    ASP.NET中常用重置数据的方法
    多表联合查询
    zTree在Asp.Net中的使用
    ASP.NET中常用方法
    DropDownList
    (转)一个form表单实现提交多个action
    svn简单用法
    每日三问
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11466061.html
Copyright © 2011-2022 走看看