zoukankan      html  css  js  c++  java
  • Spark源码分析 – Executor

    ExecutorBackend

    很简单的接口

    package org.apache.spark.executor
    /**
     * A pluggable interface used by the Executor to send updates to the cluster scheduler.
     */
    private[spark] trait ExecutorBackend {
      def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
    }

     

    StandaloneExecutorBackend

    维护executor, 并负责注册executor以及executor和driver之间的通信

    private[spark] class StandaloneExecutorBackend(
        driverUrl: String,
        executorId: String,
        hostPort: String,
        cores: Int)
      extends Actor
      with ExecutorBackend
      with Logging {
      var executor: Executor = null
      var driver: ActorRef = null
    
      override def preStart() {
        logInfo("Connecting to driver: " + driverUrl)
        driver = context.actorFor(driverUrl) // 创建driver actor ref, 以便于和driver通信
        driver ! RegisterExecutor(executorId, hostPort, cores) // 向driver注册executor
      }
    
      override def receive = {
        case RegisteredExecutor(sparkProperties) =>
          logInfo("Successfully registered with driver")
          // Make this host instead of hostPort ? 
          executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) // 当注册成功后, 创建Executor
    
        case RegisterExecutorFailed(message) =>
          logError("Slave registration failed: " + message)
          System.exit(1)
    
        case LaunchTask(taskDesc) =>
          logInfo("Got assigned task " + taskDesc.taskId)
          if (executor == null) {
            logError("Received launchTask but executor was null")
            System.exit(1)
          } else {
            executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) // 调用executor.launchTask,启动task
          }
    
        case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
          logError("Driver terminated or disconnected! Shutting down.")
          System.exit(1)
      }
    
      override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
        driver ! StatusUpdate(executorId, taskId, state, data) // 当task状态变化时, 报告给driver actor
      }
    }

    Executor

    对于Executor, 维护一个threadPool, 可以run多个task, 取决于core的个数
    所以对于launchTask, 就是在threadPool中挑个thread去run TaskRunner

    private[spark] class Executor(
        executorId: String,
        slaveHostname: String,
        properties: Seq[(String, String)])
      extends Logging
    {
      // Initialize Spark environment (using system properties read above)
      val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
      SparkEnv.set(env)

      // Start worker thread pool
      val threadPool = new ThreadPoolExecutor(
        1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
    
      def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
        threadPool.execute(new TaskRunner(context, taskId, serializedTask))
      }
    }

     

    TaskRunner

      class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
        extends Runnable {
    
        override def run() {
          try {
            SparkEnv.set(env)
            Accumulators.clear()
            val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 反序列化
            updateDependencies(taskFiles, taskJars)
            val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // 反序列化 
            attemptedTask = Some(task)
            logInfo("Its epoch is " + task.epoch)
            env.mapOutputTracker.updateEpoch(task.epoch)
            taskStart = System.currentTimeMillis()
            val value = task.run(taskId.toInt)  // 调用task.run执行真正的逻辑
            val taskFinish = System.currentTimeMillis()
            val accumUpdates = Accumulators.values
            val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) // 生成TaskResult
            val serializedResult = ser.serialize(result) // 将TaskResult序列化
            logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
            context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) // 将任务完成和taskresult,通过statusUpdate报告给driver
            logInfo("Finished task ID " + taskId)
          } catch { // 处理各种fail, 同样也要用statusUpdate event通知driver
            case ffe: FetchFailedException => {
              val reason = ffe.toTaskEndReason
              context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
            }
    
            case t: Throwable => {
              val serviceTime = (System.currentTimeMillis() - taskStart).toInt
              val metrics = attemptedTask.flatMap(t => t.metrics)
              for (m <- metrics) {
                m.executorRunTime = serviceTime
                m.jvmGCTime = getTotalGCTime - startGCTime
              }
              val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
              context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
    
              // TODO: Should we exit the whole executor here? On the one hand, the failed task may
              // have left some weird state around depending on when the exception was thrown, but on
              // the other hand, maybe we could detect that when future tasks fail and exit then.
              logError("Exception in task ID " + taskId, t)
              //System.exit(1)
            }
          }
        }
      }
  • 相关阅读:
    GC选择之CMS 并发标记清除
    JVM内存概览与GC初步
    Shell 判断语句
    SUID SGID
    maven package
    ACL权限控制列表
    账户与密码管理
    Ubuntu与Centos在登陆安全方面的比较
    【PL/SQL Developer】动态执行表不可访问,本会话的自动统计被禁止
    【Centos7】Delete virtual bridge
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3509207.html
Copyright © 2011-2022 走看看