zoukankan      html  css  js  c++  java
  • spark 笔记 12: Executor,task最后的归宿

    spark的Executor是执行task的容器。和java的executor概念类似。

    ===================start executor runs task============================
    ->CoarseGrainedExecutorBackend::receiveWithLogging --接收CoarseGrainedSchedulerBackend发来的消息
    ->case LaunchTask(data) =>  处理启动task的消息
    ->val taskDesc = ser.deserialize[TaskDescription](data.value) --将受到的taskDescription反序列化
    ->executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask) --调用executor的launchTask方法

      
    ->Executor::launchTask(    --Executor执行task的方法
    ->val tr = new TaskRunner(context, taskId, taskName, serializedTask)  --创建一个新的task,这样可以放到新的线程中执行
    ->override def run() {
    ->val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)  --解析task字段
    ->updateDependencies(taskFiles, taskJars)  //Download any missing dependencies if we receive a new set of files 
    //and JARs from theSparkContext. Also adds any new JARs we fetched to the class loader. 更新并补全依赖
    ->for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp)   --获取依赖文件
    ->Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
    ->for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp)  --获取依赖jar包
    ->Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
    ->val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
    ->urlClassLoader.addURL(url)
    ->task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)  --反序列化task
    ->val value = task.run(taskId.toInt)  --直接调用task的run函数
    ->val valueBytes = resultSer.serialize(value) --序列化任务结果
    ->val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)  生成会送给的task结果
    ->val serializedDirectResult = ser.serialize(directResult)   --序列化回送的结果
    ->execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)  --回送结果
                                 ->driver ! StatusUpdate(executorId, taskId, state, data) --CoarseGrainedSchedulerBackend实现
    ->env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for shuffles
    ->env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()// Release memory used by this thread for unrolling blocks
    ->runningTasks.remove(taskId)
    ->runningTasks.put(taskId, tr)
    ->threadPool.execute(tr)
      ===========================end======================

    /**
    * Spark executor used with Mesos, YARN, and the standalone scheduler.
    */
    private[spark] class Executor(
    executorId: String,
    slaveHostname: String,
    properties: Seq[(String, String)],
    isLocal: Boolean = false)
    extends Logging
    {
    重要属性:
    // Maintains the list of running tasks.
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    通过心跳发送任务状态到master
    def startDriverHeartbeater() {
    终于看到熟悉的executor了。这就是最终我们要执行的东西。
    def launchTask(
    context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
    }
    class TaskRunner(
    execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)

    CoarseGrainedExecutorBackend: 是直接与master的CoarseGrainedSchedulerBackend类对位的,它们直接通信来实现任务的传递和结果回送功能。
    private[spark] class CoarseGrainedExecutorBackend(
    driverUrl: String,
    executorId: String,
    hostPort: String,
    cores: Int,
    sparkProperties: Seq[(String, String)])
    extends Actor with ActorLogReceive with ExecutorBackend with Logging {






  • 相关阅读:
    [Cerc2013]Magical GCD
    UVA 1393 Highways
    UVA 10214 Trees in a Wood
    [SDOI2010]大陆争霸
    Oracle逻辑读详解
    DBA_2PC_PENDING (转)
    oracle autotrace使用
    升级oracle 9i到10g
    VMware 虚拟机中添加新硬盘的方法(转载)
    ERROR 2002 (HY000): Can’t connect to local MySQL server through socket ‘/var mysql (转)
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4247690.html
Copyright © 2011-2022 走看看