zoukankan      html  css  js  c++  java
  • 小记--------spark-worker原理分析及源码分析

     
    Worker类源码位置: org.apache.spark.deploy.worker
     
     
     1 /**
     2 *启动driver的源码分析
     3 */
     4 case LaunchDriver(driverId, driverDesc) =>
     5   logInfo(s"Asked to launch driver $driverId")
     6  
     7 //创建DriverRunner线程
     8   val driver = new DriverRunner(
     9     conf,
    10     driverId,
    11     workDir,
    12     sparkHome,
    13     driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    14     self,
    15     workerUri,
    16     securityMgr)
    17  
    18 //把DriverRunner线程加入Drivers的hashset中
    19   drivers(driverId) = driver
    20  
    21 //启动driver
    22   driver.start() //详细代码见:代码1
    23  
    24  
    25   coresUsed += driverDesc.cores
    26   memoryUsed += driverDesc.mem
    27  
     
    代码1
     1 /** Starts a thread to run and manage the driver. */
     2 private[worker] def start() = {
     3  
     4   //DriverRunner机制分析
     5   //启动一个java线程
     6   new Thread("DriverRunner for " + driverId) {
     7     override def run() {
     8       var shutdownHook: AnyRef = null
     9       try {
    10         shutdownHook = ShutdownHookManager.addShutdownHook { () =>
    11           logInfo(s"Worker shutting down, killing driver $driverId")
    12           kill()
    13         }
    14  
    15         // prepare driver jars and run driver
    16         // 在此处进行第一步:创建DriverRunner的工作目录
    17         // 第二步,下载用户上传的jar(我们编写完的spark应用程序,如果是java,用maven打个jar包,如果是scala,那么会用export将它导出为jar包)
    18         //第三步 构建ProcessBuilder
    19         val exitCode = prepareAndRunDriver()//详细代码见:代码2
    20  
    21  
    22         // set final state depending on if forcibly killed and process exit code
    23         // 对driver的退出状态做一些处理
    24         finalState = if (exitCode == 0) {
    25           Some(DriverState.FINISHED)
    26         } else if (killed) {
    27           Some(DriverState.KILLED)
    28         } else {
    29           Some(DriverState.FAILED)
    30         }
    31       } catch {
    32         case e: Exception =>
    33           kill()
    34           finalState = Some(DriverState.ERROR)
    35           finalException = Some(e)
    36       } finally {
    37         if (shutdownHook != null) {
    38           ShutdownHookManager.removeShutdownHook(shutdownHook)
    39         }
    40       }
    41  
    42  
    43       // notify worker of final driver state, possible exception
    44         // 这个DriverRunner这个线程,向它所属的worker的actor,发送一个DriverStateChanged的事件 
    45       worker.send(DriverStateChanged(driverId, finalState.get, finalException))//详细代码见:代码3
    46     }
    47   }.start()
    48 }
    49  
    View Code
     
     
    代码2
     1 private[worker] def prepareAndRunDriver(): Int = {
     2   val driverDir = createWorkingDirectory()//创建DriverRunner的工作目录
     3   val localJarFilename = downloadUserJar(driverDir)//第二步,下载用户上传的jar
     4  
     5  
     6   def substituteVariables(argument: String): String = argument match {
     7     case "{{WORKER_URL}}" => workerUrl
     8     case "{{USER_JAR}}" => localJarFilename
     9     case other => other
    10   }
    11  
    12  
    13   // TODO: If we add ability to submit multiple jars they should also be added here
    14  
    15   // 构建ProcessBuilder
    16   // 传入了driver的启动命令,需要的内存大小等信息
    17   val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
    18     driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
    19  
    20  
    21   runDriver(builder, driverDir, driverDesc.supervise)
    22 }
    23  
    View Code
     
    代码3
    1 //driver执行完以后,driverrunner线程会发送一个状态给worker
    2 //然后worker实际上会将DriverStateChanged消息发送给Master
    3 case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
    4   handleDriverStateChanged(driverStateChanged)//详细代码见:代码4
    5  
     
    代码4
      1 private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
      2   val driverId = driverStateChanged.driverId
      3   val exception = driverStateChanged.exception
      4   val state = driverStateChanged.state
      5   state match {
      6     case DriverState.ERROR =>
      7       logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      8     case DriverState.FAILED =>
      9       logWarning(s"Driver $driverId exited with failure")
     10     case DriverState.FINISHED =>
     11       logInfo(s"Driver $driverId exited successfully")
     12     case DriverState.KILLED =>
     13       logInfo(s"Driver $driverId was killed by user")
     14     case _ =>
     15       logDebug(s"Driver $driverId changed state to $state")
     16   }
     17  
     18 //worker把DriverStateChanged消息发送给Master
     19 // Master会对状态进行修改
     20   sendToMaster(driverStateChanged)
     21  
     22 //将driver从本地缓存中移除
     23   val driver = drivers.remove(driverId).get
     24  
     25 //将driver加入完成driver的队列
     26   finishedDrivers(driverId) = driver
     27   trimFinishedDriversIfNecessary()
     28  
     29 //将driver的内存和CPU进行释放
     30   memoryUsed -= driver.driverDesc.mem
     31   coresUsed -= driver.driverDesc.cores
     32 }
     33  
     34 /**
     35 *启动Executor的源码分析
     36 */
     37 case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
     38   if (masterUrl != activeMasterUrl) {
     39     logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
     40   } else {
     41     try {
     42       logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
     43  
     44  
     45  
     46       // Create the executor's working directory    
     47       // 创建executor本地工作目录
     48       val executorDir = new File(workDir, appId + "/" + execId)
     49       if (!executorDir.mkdirs()) {
     50         throw new IOException("Failed to create directory " + executorDir)
     51       }
     52  
     53  
     54       // Create local dirs for the executor. These are passed to the executor via the
     55       // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
     56       // application finishes.
     57       val appLocalDirs = appDirectories.getOrElse(appId,
     58         Utils.getOrCreateLocalRootDirs(conf).map { dir =>
     59           val appDir = Utils.createDirectory(dir, namePrefix = "executor")
     60           Utils.chmod700(appDir)
     61           appDir.getAbsolutePath()
     62         }.toSeq)
     63       appDirectories(appId) = appLocalDirs
     64  
     65         //创建ExecutorRunner
     66       val manager = new ExecutorRunner(
     67         appId,
     68         execId,
     69         appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
     70         cores_,
     71         memory_,
     72         self,
     73         workerId,
     74         host,
     75         webUi.boundPort,
     76         publicAddress,
     77         sparkHome,
     78         executorDir,
     79         workerUri,
     80         conf,
     81         appLocalDirs, ExecutorState.RUNNING)
     82  
     83     //把executorRunner加入本地缓存
     84       executors(appId + "/" + execId) = manager
     85  
     86     //启动ExecutorRunner
     87       manager.start()//详细代码:见代码5
     88  
     89     //加上Executor需要使用的CPU 内存的资源
     90       coresUsed += cores_
     91       memoryUsed += memory_
     92  
     93     //向master返回一个ExecutorStateChanged事件,用于master修改状态
     94       sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
     95     } catch {
     96       case e: Exception =>
     97         logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
     98         if (executors.contains(appId + "/" + execId)) {
     99           executors(appId + "/" + execId).kill()
    100           executors -= appId + "/" + execId
    101         }
    102         sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
    103           Some(e.toString), None))
    104     }
    105   }
    106  
    View Code
     
    代码5
     1 private[worker] def start() {
     2  
     3     //创建一个java线程
     4   workerThread = new Thread("ExecutorRunner for " + fullId) {
     5     override def run() { fetchAndRunExecutor() }//详细代码见代码6
     6   }
     7   workerThread.start()
     8   // Shutdown hook that kills actors on shutdown.
     9   shutdownHook = ShutdownHookManager.addShutdownHook { () =>
    10     // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
    11     // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
    12     if (state == ExecutorState.RUNNING) {
    13       state = ExecutorState.FAILED
    14     }
    15     killProcess(Some("Worker shutting down")) }
    16 }
    17  
     
     
    代码6
     1 /**
     2 * Download and run the executor described in our ApplicationDescription
     3 */
     4 private def fetchAndRunExecutor() {
     5   try {
     6     // Launch the process
     7  
     8     //封装一个ProcessBuilder
     9     val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
    10       memory, sparkHome.getAbsolutePath, substituteVariables)
    11     val command = builder.command()
    12     val formattedCommand = command.asScala.mkString(""", "" "", """)
    13     logInfo(s"Launch command: $formattedCommand")
    14  
    15  
    16     builder.directory(executorDir)
    17     builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
    18     // In case we are running this from within the Spark Shell, avoid creating a "scala"
    19     // parent process for the executor command
    20     builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    21  
    22  
    23     // Add webUI log urls
    24     val baseUrl =
    25       if (conf.getBoolean("spark.ui.reverseProxy", false)) {
    26         s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
    27       } else {
    28         s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
    29       }
    30     builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
    31     builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    32  
    33  
    34     process = builder.start()
    35  
    36     //重定向到输出流文件(将是stdout和stderr)
    37     //将executor的InputStream和ErrorStream,输出的信息
    38     //分贝重定向到本地工作目录的stdout文件,和stderr文件中
    39     val header = "Spark Executor Command: %s
    %s
    
    ".format(
    40       formattedCommand, "=" * 40)
    41  
    42  
    43     // Redirect its stdout and stderr to files
    44     val stdout = new File(executorDir, "stdout")
    45     stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
    46  
    47  
    48     val stderr = new File(executorDir, "stderr")
    49     Files.write(header, stderr, StandardCharsets.UTF_8)
    50     stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
    51  
    52  
    53     // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
    54     // or with nonzero exit code
    55     // 调用Proess的waitFor()方法,启动executor进程
    56     val exitCode = process.waitFor()
    57  
    58     // executor执行完之后拿到返回值状态
    59     state = ExecutorState.EXITED
    60     val message = "Command exited with code " + exitCode
    61    
    62  //向ExecutorRunner线程所属的Worker actor,发送ExecutorStateChanged消息
    63     worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))//详细代码见:代码7
    64   } catch {
    65     case interrupted: InterruptedException =>
    66       logInfo("Runner thread for executor " + fullId + " interrupted")
    67       state = ExecutorState.KILLED
    68       killProcess(None)
    69     case e: Exception =>
    70       logError("Error running executor", e)
    71       state = ExecutorState.FAILED
    72       killProcess(Some(e.toString))
    73   }
    74 }
    View Code
     
    代码7
    //向master发送executorstatechanged事件
    case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
      handleExecutorStateChanged(executorStateChanged)//详细代码见:代码8
     
     
    代码8
     1 private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
     2   Unit = {
     3  
     4 // 直接向master也发送一个executorstatechanged消息
     5   sendToMaster(executorStateChanged)
     6   val state = executorStateChanged.state
     7  
     8 // 如果executor状态是finished
     9   if (ExecutorState.isFinished(state)) {
    10     val appId = executorStateChanged.appId
    11     val fullId = appId + "/" + executorStateChanged.execId
    12     val message = executorStateChanged.message
    13     val exitStatus = executorStateChanged.exitStatus
    14     executors.get(fullId) match {
    15       case Some(executor) =>
    16         logInfo("Executor " + fullId + " finished with state " + state +
    17           message.map(" message " + _).getOrElse("") +
    18           exitStatus.map(" exitStatus " + _).getOrElse(""))
    19  
    20         // 将executor从内存中移除
    21         executors -= fullId
    22         finishedExecutors(fullId) = executor
    23         trimFinishedExecutorsIfNecessary()
    24  
    25         // 释放executor占用的内存和CPU资源
    26         coresUsed -= executor.cores
    27         memoryUsed -= executor.memory
    28       case None =>
    29         logInfo("Unknown Executor " + fullId + " finished with state " + state +
    30           message.map(" message " + _).getOrElse("") +
    31           exitStatus.map(" exitStatus " + _).getOrElse(""))
    32     }
    33     maybeCleanupApplication(appId)
    34   }
    35 }
    View Code
     
     
  • 相关阅读:
    【转】批处理第三方命令行工具汇总(2014-11-10更新)
    爬虫问题汇总 + 解决
    ocs的沟通平台
    DSCP 与IP 优先级IP优先级
    IPSec协议
    GRE 协议简介
    使用VLC创建组播流
    华为olt ma5680t常用命令详解
    Python统计列表中的重复项出现的次数的方法
    Kafka
  • 原文地址:https://www.cnblogs.com/yzqyxq/p/12343636.html
Copyright © 2011-2022 走看看