zoukankan      html  css  js  c++  java
  • 小记--------spark的worker原理分析及源码分析

     
    Worker类源码位置: org.apache.spark.deploy.worker
     
     
    /**
    *启动driver的源码分析
    */
    case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
     
    //创建DriverRunner线程
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
     
    //把DriverRunner线程加入Drivers的hashset中
      drivers(driverId) = driver
     
    //启动driver
      driver.start() //详细代码见:代码1
     
     
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
     
     
    代码1
    /** Starts a thread to run and manage the driver. */
    private[worker] def start() = {
     
      //DriverRunner机制分析
      //启动一个java线程
      new Thread("DriverRunner for " + driverId) {
        override def run() {
          var shutdownHook: AnyRef = null
          try {
            shutdownHook = ShutdownHookManager.addShutdownHook { () =>
              logInfo(s"Worker shutting down, killing driver $driverId")
              kill()
            }
     
            // prepare driver jars and run driver
            // 在此处进行第一步:创建DriverRunner的工作目录
            // 第二步,下载用户上传的jar(我们编写完的spark应用程序,如果是java,用maven打个jar包,如果是scala,那么会用export将它导出为jar包)
            //第三步 构建ProcessBuilder
            val exitCode = prepareAndRunDriver()//详细代码见:代码2
     
     
            // set final state depending on if forcibly killed and process exit code
            // 对driver的退出状态做一些处理
            finalState = if (exitCode == 0) {
              Some(DriverState.FINISHED)
            } else if (killed) {
              Some(DriverState.KILLED)
            } else {
              Some(DriverState.FAILED)
            }
          } catch {
            case e: Exception =>
              kill()
              finalState = Some(DriverState.ERROR)
              finalException = Some(e)
          } finally {
            if (shutdownHook != null) {
              ShutdownHookManager.removeShutdownHook(shutdownHook)
            }
          }
     
     
          // notify worker of final driver state, possible exception
            // 这个DriverRunner这个线程,向它所属的worker的actor,发送一个DriverStateChanged的事件 
          worker.send(DriverStateChanged(driverId, finalState.get, finalException))//详细代码见:代码3
        }
      }.start()
    }
     
     
     
    代码2
    private[worker] def prepareAndRunDriver(): Int = {
      val driverDir = createWorkingDirectory()//创建DriverRunner的工作目录
      val localJarFilename = downloadUserJar(driverDir)//第二步,下载用户上传的jar
     
     
      def substituteVariables(argument: String): String = argument match {
        case "{{WORKER_URL}}" => workerUrl
        case "{{USER_JAR}}" => localJarFilename
        case other => other
      }
     
     
      // TODO: If we add ability to submit multiple jars they should also be added here
     
      // 构建ProcessBuilder
      // 传入了driver的启动命令,需要的内存大小等信息
      val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
        driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
     
     
      runDriver(builder, driverDir, driverDesc.supervise)
    }
     
     
    代码3
    //driver执行完以后,driverrunner线程会发送一个状态给worker
    //然后worker实际上会将DriverStateChanged消息发送给Master
    case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
      handleDriverStateChanged(driverStateChanged)//详细代码见:代码4
     
     
    代码4
    private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
      val driverId = driverStateChanged.driverId
      val exception = driverStateChanged.exception
      val state = driverStateChanged.state
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FAILED =>
          logWarning(s"Driver $driverId exited with failure")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
        case _ =>
          logDebug(s"Driver $driverId changed state to $state")
      }
     
    //worker把DriverStateChanged消息发送给Master
    // Master会对状态进行修改
      sendToMaster(driverStateChanged)
     
    //将driver从本地缓存中移除
      val driver = drivers.remove(driverId).get
     
    //将driver加入完成driver的队列
      finishedDrivers(driverId) = driver
      trimFinishedDriversIfNecessary()
     
    //将driver的内存和CPU进行释放
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores
    }
     
    /**
    *启动Executor的源码分析
    */
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
     
     
     
          // Create the executor's working directory    
          // 创建executor本地工作目录
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }
     
     
          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
     
            //创建ExecutorRunner
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
     
        //把executorRunner加入本地缓存
          executors(appId + "/" + execId) = manager
     
        //启动ExecutorRunner
          manager.start()//详细代码:见代码5
     
        //加上Executor需要使用的CPU 内存的资源
          coresUsed += cores_
          memoryUsed += memory_
     
        //向master返回一个ExecutorStateChanged事件,用于master修改状态
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }
     
     
    代码5
    private[worker] def start() {
     
        //创建一个java线程
      workerThread = new Thread("ExecutorRunner for " + fullId) {
        override def run() { fetchAndRunExecutor() }//详细代码见代码6
      }
      workerThread.start()
      // Shutdown hook that kills actors on shutdown.
      shutdownHook = ShutdownHookManager.addShutdownHook { () =>
        // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
        // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
        if (state == ExecutorState.RUNNING) {
          state = ExecutorState.FAILED
        }
        killProcess(Some("Worker shutting down")) }
    }
     
     
     
    代码6
    /**
    * Download and run the executor described in our ApplicationDescription
    */
    private def fetchAndRunExecutor() {
      try {
        // Launch the process
     
        //封装一个ProcessBuilder
        val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
          memory, sparkHome.getAbsolutePath, substituteVariables)
        val command = builder.command()
        val formattedCommand = command.asScala.mkString(""", "" "", """)
        logInfo(s"Launch command: $formattedCommand")
     
     
        builder.directory(executorDir)
        builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
        // In case we are running this from within the Spark Shell, avoid creating a "scala"
        // parent process for the executor command
        builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
     
     
        // Add webUI log urls
        val baseUrl =
          if (conf.getBoolean("spark.ui.reverseProxy", false)) {
            s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
          } else {
            s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
          }
        builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
        builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
     
     
        process = builder.start()
     
        //重定向到输出流文件(将是stdout和stderr)
        //将executor的InputStream和ErrorStream,输出的信息
        //分贝重定向到本地工作目录的stdout文件,和stderr文件中
        val header = "Spark Executor Command: %s
    %s
    
    ".format(
          formattedCommand, "=" * 40)
     
     
        // Redirect its stdout and stderr to files
        val stdout = new File(executorDir, "stdout")
        stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
     
     
        val stderr = new File(executorDir, "stderr")
        Files.write(header, stderr, StandardCharsets.UTF_8)
        stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
     
     
        // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
        // or with nonzero exit code
        // 调用Proess的waitFor()方法,启动executor进程
        val exitCode = process.waitFor()
     
        // executor执行完之后拿到返回值状态
        state = ExecutorState.EXITED
        val message = "Command exited with code " + exitCode
       
     //向ExecutorRunner线程所属的Worker actor,发送ExecutorStateChanged消息
        worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))//详细代码见:代码7
      } catch {
        case interrupted: InterruptedException =>
          logInfo("Runner thread for executor " + fullId + " interrupted")
          state = ExecutorState.KILLED
          killProcess(None)
        case e: Exception =>
          logError("Error running executor", e)
          state = ExecutorState.FAILED
          killProcess(Some(e.toString))
      }
    }
     
    代码7
    //向master发送executorstatechanged事件
    case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
      handleExecutorStateChanged(executorStateChanged)//详细代码见:代码8
     
     
    代码8
    private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
      Unit = {
     
    // 直接向master也发送一个executorstatechanged消息
      sendToMaster(executorStateChanged)
      val state = executorStateChanged.state
     
    // 如果executor状态是finished
      if (ExecutorState.isFinished(state)) {
        val appId = executorStateChanged.appId
        val fullId = appId + "/" + executorStateChanged.execId
        val message = executorStateChanged.message
        val exitStatus = executorStateChanged.exitStatus
        executors.get(fullId) match {
          case Some(executor) =>
            logInfo("Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
     
            // 将executor从内存中移除
            executors -= fullId
            finishedExecutors(fullId) = executor
            trimFinishedExecutorsIfNecessary()
     
            // 释放executor占用的内存和CPU资源
            coresUsed -= executor.cores
            memoryUsed -= executor.memory
          case None =>
            logInfo("Unknown Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
        }
        maybeCleanupApplication(appId)
      }
    }
     
  • 相关阅读:
    一行代码搞定Dubbo接口调用
    测试周期内测试进度报告规范
    jq 一个强悍的json格式化查看工具
    浅析Docker容器的应用场景
    HDU 4432 Sum of divisors (水题,进制转换)
    HDU 4431 Mahjong (DFS,暴力枚举,剪枝)
    CodeForces 589B Layer Cake (暴力)
    CodeForces 589J Cleaner Robot (DFS,或BFS)
    CodeForces 589I Lottery (暴力,水题)
    CodeForces 589D Boulevard (数学,相遇)
  • 原文地址:https://www.cnblogs.com/yzqyxq/p/12054358.html
Copyright © 2011-2022 走看看