zoukankan      html  css  js  c++  java
  • 小记--------spark的worker原理分析及源码分析

     
    Worker类源码位置: org.apache.spark.deploy.worker
     
     
    /**
    *启动driver的源码分析
    */
    case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
     
    //创建DriverRunner线程
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
     
    //把DriverRunner线程加入Drivers的hashset中
      drivers(driverId) = driver
     
    //启动driver
      driver.start() //详细代码见:代码1
     
     
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
     
     
    代码1
    /** Starts a thread to run and manage the driver. */
    private[worker] def start() = {
     
      //DriverRunner机制分析
      //启动一个java线程
      new Thread("DriverRunner for " + driverId) {
        override def run() {
          var shutdownHook: AnyRef = null
          try {
            shutdownHook = ShutdownHookManager.addShutdownHook { () =>
              logInfo(s"Worker shutting down, killing driver $driverId")
              kill()
            }
     
            // prepare driver jars and run driver
            // 在此处进行第一步:创建DriverRunner的工作目录
            // 第二步,下载用户上传的jar(我们编写完的spark应用程序,如果是java,用maven打个jar包,如果是scala,那么会用export将它导出为jar包)
            //第三步 构建ProcessBuilder
            val exitCode = prepareAndRunDriver()//详细代码见:代码2
     
     
            // set final state depending on if forcibly killed and process exit code
            // 对driver的退出状态做一些处理
            finalState = if (exitCode == 0) {
              Some(DriverState.FINISHED)
            } else if (killed) {
              Some(DriverState.KILLED)
            } else {
              Some(DriverState.FAILED)
            }
          } catch {
            case e: Exception =>
              kill()
              finalState = Some(DriverState.ERROR)
              finalException = Some(e)
          } finally {
            if (shutdownHook != null) {
              ShutdownHookManager.removeShutdownHook(shutdownHook)
            }
          }
     
     
          // notify worker of final driver state, possible exception
            // 这个DriverRunner这个线程,向它所属的worker的actor,发送一个DriverStateChanged的事件 
          worker.send(DriverStateChanged(driverId, finalState.get, finalException))//详细代码见:代码3
        }
      }.start()
    }
     
     
     
    代码2
    private[worker] def prepareAndRunDriver(): Int = {
      val driverDir = createWorkingDirectory()//创建DriverRunner的工作目录
      val localJarFilename = downloadUserJar(driverDir)//第二步,下载用户上传的jar
     
     
      def substituteVariables(argument: String): String = argument match {
        case "{{WORKER_URL}}" => workerUrl
        case "{{USER_JAR}}" => localJarFilename
        case other => other
      }
     
     
      // TODO: If we add ability to submit multiple jars they should also be added here
     
      // 构建ProcessBuilder
      // 传入了driver的启动命令,需要的内存大小等信息
      val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
        driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
     
     
      runDriver(builder, driverDir, driverDesc.supervise)
    }
     
     
    代码3
    //driver执行完以后,driverrunner线程会发送一个状态给worker
    //然后worker实际上会将DriverStateChanged消息发送给Master
    case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
      handleDriverStateChanged(driverStateChanged)//详细代码见:代码4
     
     
    代码4
    private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
      val driverId = driverStateChanged.driverId
      val exception = driverStateChanged.exception
      val state = driverStateChanged.state
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FAILED =>
          logWarning(s"Driver $driverId exited with failure")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
        case _ =>
          logDebug(s"Driver $driverId changed state to $state")
      }
     
    //worker把DriverStateChanged消息发送给Master
    // Master会对状态进行修改
      sendToMaster(driverStateChanged)
     
    //将driver从本地缓存中移除
      val driver = drivers.remove(driverId).get
     
    //将driver加入完成driver的队列
      finishedDrivers(driverId) = driver
      trimFinishedDriversIfNecessary()
     
    //将driver的内存和CPU进行释放
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores
    }
     
    /**
    *启动Executor的源码分析
    */
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
     
     
     
          // Create the executor's working directory    
          // 创建executor本地工作目录
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }
     
     
          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
     
            //创建ExecutorRunner
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
     
        //把executorRunner加入本地缓存
          executors(appId + "/" + execId) = manager
     
        //启动ExecutorRunner
          manager.start()//详细代码:见代码5
     
        //加上Executor需要使用的CPU 内存的资源
          coresUsed += cores_
          memoryUsed += memory_
     
        //向master返回一个ExecutorStateChanged事件,用于master修改状态
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }
     
     
    代码5
    private[worker] def start() {
     
        //创建一个java线程
      workerThread = new Thread("ExecutorRunner for " + fullId) {
        override def run() { fetchAndRunExecutor() }//详细代码见代码6
      }
      workerThread.start()
      // Shutdown hook that kills actors on shutdown.
      shutdownHook = ShutdownHookManager.addShutdownHook { () =>
        // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
        // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
        if (state == ExecutorState.RUNNING) {
          state = ExecutorState.FAILED
        }
        killProcess(Some("Worker shutting down")) }
    }
     
     
     
    代码6
    /**
    * Download and run the executor described in our ApplicationDescription
    */
    private def fetchAndRunExecutor() {
      try {
        // Launch the process
     
        //封装一个ProcessBuilder
        val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
          memory, sparkHome.getAbsolutePath, substituteVariables)
        val command = builder.command()
        val formattedCommand = command.asScala.mkString(""", "" "", """)
        logInfo(s"Launch command: $formattedCommand")
     
     
        builder.directory(executorDir)
        builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
        // In case we are running this from within the Spark Shell, avoid creating a "scala"
        // parent process for the executor command
        builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
     
     
        // Add webUI log urls
        val baseUrl =
          if (conf.getBoolean("spark.ui.reverseProxy", false)) {
            s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
          } else {
            s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
          }
        builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
        builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
     
     
        process = builder.start()
     
        //重定向到输出流文件(将是stdout和stderr)
        //将executor的InputStream和ErrorStream,输出的信息
        //分贝重定向到本地工作目录的stdout文件,和stderr文件中
        val header = "Spark Executor Command: %s
    %s
    
    ".format(
          formattedCommand, "=" * 40)
     
     
        // Redirect its stdout and stderr to files
        val stdout = new File(executorDir, "stdout")
        stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
     
     
        val stderr = new File(executorDir, "stderr")
        Files.write(header, stderr, StandardCharsets.UTF_8)
        stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
     
     
        // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
        // or with nonzero exit code
        // 调用Proess的waitFor()方法,启动executor进程
        val exitCode = process.waitFor()
     
        // executor执行完之后拿到返回值状态
        state = ExecutorState.EXITED
        val message = "Command exited with code " + exitCode
       
     //向ExecutorRunner线程所属的Worker actor,发送ExecutorStateChanged消息
        worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))//详细代码见:代码7
      } catch {
        case interrupted: InterruptedException =>
          logInfo("Runner thread for executor " + fullId + " interrupted")
          state = ExecutorState.KILLED
          killProcess(None)
        case e: Exception =>
          logError("Error running executor", e)
          state = ExecutorState.FAILED
          killProcess(Some(e.toString))
      }
    }
     
    代码7
    //向master发送executorstatechanged事件
    case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
      handleExecutorStateChanged(executorStateChanged)//详细代码见:代码8
     
     
    代码8
    private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
      Unit = {
     
    // 直接向master也发送一个executorstatechanged消息
      sendToMaster(executorStateChanged)
      val state = executorStateChanged.state
     
    // 如果executor状态是finished
      if (ExecutorState.isFinished(state)) {
        val appId = executorStateChanged.appId
        val fullId = appId + "/" + executorStateChanged.execId
        val message = executorStateChanged.message
        val exitStatus = executorStateChanged.exitStatus
        executors.get(fullId) match {
          case Some(executor) =>
            logInfo("Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
     
            // 将executor从内存中移除
            executors -= fullId
            finishedExecutors(fullId) = executor
            trimFinishedExecutorsIfNecessary()
     
            // 释放executor占用的内存和CPU资源
            coresUsed -= executor.cores
            memoryUsed -= executor.memory
          case None =>
            logInfo("Unknown Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
        }
        maybeCleanupApplication(appId)
      }
    }
     
  • 相关阅读:
    Odoo Documentation : Fields
    Odoo models.py BaseModel
    Odoo Documentation : Environment
    Odoo Documentation : Recordsets
    Odoo中使用的部分表名及用途
    Odoo启动过程
    6779. Can you answer these queries VII
    1874 素数和最大
    3150 Pibonacci数
    2817 Tangent的愤怒
  • 原文地址:https://www.cnblogs.com/yzqyxq/p/12054358.html
Copyright © 2011-2022 走看看