zoukankan      html  css  js  c++  java
  • Spark作业执行原理(六)——获取执行结果

            对于Executor的计算结果,会根据结果的大小使用不同的处理策略:

    1. 计算结果在(0,128MB-200KB)区间内:通过Netty直接发送给Driver终端;
    2. 计算结果在[128MB, 1GB]区间内:将结果以taskId为编号存入到BlockManager中,然后通过Netty把编号发送给Driver终端;阈值可通过Netty框架传输参数设置spark.akka.frameSize,默认值是128MB,200KB是Netty预留空间reservedSizeBytes的值;
    3. 计算结果在(1GB,∞)区间内:直接丢弃,可通过spark.driver.maxResultSize配置;

           任务执行完成之后,TaskRunner将任务的执行结果发送给DriverEndpoint,DriverEndpoint接收到信息后,交给TaskSchedulerImpl的statusUpdate方法进行处理,该方法根据不同的任务状态有不同的结果获取策略:

    1. 如果状态类型是TaskState.FINISHED,那么进一步调用TaskResultGetter的enqueueSuccessfulTask。enqueueSuccessfulTask会判断类型,如果IndirectTaskResult,则 需要通过blockid远程来获取结果(sparkEnv.blockManager.getRemoteBytes(blockId));如果是DirectTaskResult,则无需远程获取结果;
    2. 如果状态类型是:TaskState.FAILED、TaskState.KILLED或TaskState.LOST,则调用TaskResultGetter的enqueueFailedTask,特别地,对于TaskState.LOST,还需要将其所在的Executor标记为failed,并根据更新后的Executor重新调度

            如果任务是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游调度阶段,事实上,对于ShuffleMapTask,其结果是一个MapStatus对象,序列化之后存入DirectTaskResult或者IndirectTaskResult中。而DAGScheduler的handleTaskCompletion方法获取这个结果,并把MapStatus注册到MapOutputTrackerMaster中。

    case smt:ShuffleMapTask =>
        val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
        updateAccumulators(event)
        ...
        mapOutputTracker.registerMapOutputs(
            shuffleStage.shuffleDep.shuffleId,
            shuffleStage.outputLocs.map(list => if(list.isEmpty) null else list.head),
            changeEpoch = true)

            对于ResultTask,判断作业是否完成,如果完成,则标记改作业完成,清除作业依赖,并发送消息给系统监听总线,告知作业执行完毕。

    case rt:ResultTask[_, _] =>
        val resultStage = stage.asInstanceOf[ResultStage]
        resultStage.resultOfJob match{
            case Some(job) =>
                if(!job.finished(rt.outputId)){
                    updateAccumulators(event)
                    job.finished(rt.outputId) = true
                    job.numFinished += 1
     
                    if(job.numFinished == job.numPartitions){
                        //标记改作业完成
                        markStageAsFinished(resultStage)
                        //清除依赖资源
                        cleanupStateForJobAndIndepentStates(job)
                        //发送消息给系统监听总线,告知作业执行完毕
                        listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                    }
                }
        }
  • 相关阅读:
    Siri
    ArcSDE和Geodatabase10.1抢先版谍照介绍(3)——ArcToolbox工具和其他功能
    ‘马太’效应
    Linux琐屑下Resin JSP MySQL的布置和设置配备陈设2
    平安防御:分级防御对Linux服务器的攻击
    Linux无法解析域名的处理活动
    Ubuntu Linux Server的用户僻静后果分析
    大概要领确保确保Linux体系中SSH的安适性
    无错版Vsftpd Mysql Pam设置虚拟用户要领
    Redhat Linux AS4下的LAMP与Discuz装置1
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11466061.html
Copyright © 2011-2022 走看看