zoukankan      html  css  js  c++  java
  • Spark作业执行原理(五)——执行任务

            在Spark消息通信原理(三)(https://www.cnblogs.com/SysoCjs/p/11355900.html)中第(6)点提到过,Executor是任务执行的容器,executor接收到LaunchTask消息之后(其实是GoraseGrainedExecutorBackend接收到来自DriverEndpoint的LaunchTask消息后,调用Executor的launchTasks方法),就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。

            TaskRunner有一个run方法,方法里主要做的是:对发送过来的Task本身(ShuffleMapTask和ResultTask),以及它所依赖的jar等文件进行反序列,然后对反序列后的Task交给Task对象的run方法。由于Task是一个抽象类,具体实现交给两个子类ShuffleMapTask和ResultTask。

    TaskRunner.run方法的部分源码:

    override def run(): Unit = {
        //生成内存管理实例——taskMemoryManager,用于任务运行期间内存的管理
        val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
        val deserializeStartTime = System.currentTimeMillis()
     
        Thread.currentThread.setContextClassLoader(replClassLoader)
        val ser = env.closureSerializer.newInastance()
     
        //向Driver终端发送任务运行开始消息,通知Driver对状态进行更新
        execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
        var taskStart: Long = 0
        startGCTime = computeTotalGcTime()
     
        try{
            //对任务运行时所需要的文件、jar、代码等进行反序列
            val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
            updateDependencies(taskFiles, taskJars)
            task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
            task.setTaskMemoryManager(taskMemoryManager)
     
            //如果任务在反序列之前被kill掉,则抛出异常
            if(killed){
                throw new TaskKilledException
            }
     
            env.mapOutputTracker.updateEpoch(task.epoch)
     
            //调用Task的runTask方法,由于Task是一个抽象类,所以具体实现交给它的子类——ShuffleMapTask和ResultTask
            taskStart = System.currentTimeMillis()
            var threwException = true
            val value = try{
                val res = task.run(
                    taskAttemptId = taskId,
                    attemptNumber = attemptNumber,
                    metricsSystem = env.metricsSystem)
                res
            }finally{...}
            ...
        }
    }

            不同的Task实体类,它们处理计算的结果的方式上会有所差别。

            对于ShuffleMapTask,计算结果会写到BlockManager之中,最终返回给DAGScheduler的是一个MapStatus对象,该对象管理了ShuffleMapTask的相关存储信息,这些存储信息并不是计算结果本身,而是运算结果到BlockManager的相关联系,这些存储信息将会成为下一阶段的任务需要获得的输入数据时的依据。ShuffleMapTask.runTask部分源码:

    override def runTask(context: TaskContext):MapStatus = {
        val deserializeStartTime = System.currentTimeMills()
        //反序列化获取RDD和RDD的依赖
        val ser = SparkEnv.get.closureSerializer.newInastance()
     
        val (rdd, dep) = ser.derialize[(RDD[_], ShuffleDependency[_, _, _])](
            ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
     
        metrics = Some(context.taskMetrics)
        var writer:ShuffleWriter[Any, Any] = null
     
        try{
            val manager = SparkEnv.get.ShuffleManager
            writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
            //首先调用rdd.iterator,如果RDD已经Cache或者Checkpoint,则直接读取结果;否则计算。
            writer.write(rdd.iterator(partition,context).asInstanceOf[Iterator[_<:Product2[Any, Any]]])
            //关闭writer,返回结果,包含数据的location和size等元数据信息
            writer.stop(success = true).get
        }catch{...}
    }

            对于ResultTask,它的计算结果以func函数的形式返回。ResultTask.runTask部分源码:

    override def runTask(context:TaskContext):U = {
        //反序列化广播变量,得到RDD
        val deserializeStartTime = System.currentTimeMillis()
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val (rdd, func) = ser.deserialize[(RDD[T]. (TaskContext, Iterator[T]) => U)](
            ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserialiZeStartTime
        
        metrics = Some(context.taskMetrics)
        //返回
        func(context, rdd.iterator(partition, context))
    }

     

  • 相关阅读:
    CSS3 03. 3D变换、坐标系、透视perspective、transformZ、transform-style添加3D效果、backface-visibility元素背面可见、动画animation、@keyfarmes、多列布局
    CSS3 02. 边框、边框圆角、边框阴影、边框图片、渐变、线性渐变、径向渐变、背景、过渡transition、2D转换(缩放、位移、旋转、倾斜)
    CSS3 01. CSS3现状、属性选择器、伪类选择器、结构伪类、伪元素选择器、颜色、文本阴影shadow、盒子模型、私有化前缀
    HTML5 01. 布局、语义化标签、智能化表单、表单元素/标签/属性/事件、多媒体、类操作、自定义属性
    webstorm 突然不能用了?解决办法~
    jQuery
    jQuery
    jQuery
    JS-特效 ~ 05. 缓动框架兼容封装/回掉函数/兼容透明度/层级、旋转轮播图、正则表达式、验证表单注册账号、
    百问网WIFI模块驱动
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11357054.html
Copyright © 2011-2022 走看看