zoukankan      html  css  js  c++  java
  • Spark里边:Worker源代码分析和架构

    首先由Spark图表理解Worker于Spark中的作用和地位:


    Worker所起的作用有下面几个:

    1. 接受Master的指令,启动或者杀掉Executor

    2. 接受Master的指令,启动或者杀掉Driver

    3. 报告Executor/Driver的状态到Master

    4. 心跳到Master。心跳超时则Master觉得Worker已经挂了不能工作了

    5. 向GUI报告Worker的状态


    说白了,Worker就是整个集群真正干活的。首先看一下Worker重要的数据结构:

      val executors = new HashMap[String, ExecutorRunner]
      val finishedExecutors = new HashMap[String, ExecutorRunner]
      val drivers = new HashMap[String, DriverRunner]
      val finishedDrivers = new HashMap[String, DriverRunner]

    这些Hash Map存储了名字和实体时间的相应关系,方便通过名字直接找到实体进行调用。

    看一下怎样启动Executor:

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
          if (masterUrl != activeMasterUrl) {
            logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
          } else {
            try {
              logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
              val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
                self, workerId, host,
                appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
                workDir, akkaUrl, ExecutorState.RUNNING)
              executors(appId + "/" + execId) = manager
              manager.start()
              coresUsed += cores_
              memoryUsed += memory_
              masterLock.synchronized {
                master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
              }
            } catch {
              case e: Exception => {
                logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
                if (executors.contains(appId + "/" + execId)) {
                  executors(appId + "/" + execId).kill()
                  executors -= appId + "/" + execId
                }
                masterLock.synchronized {
                  master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
                }
              }
            }


    1行到3行是验证该命令是否发自一个合法的Master。7到10行定义了一个ExecutorRunner,实际上系统并没有一个类叫做Executor。我们所说的Executor实际上是由ExecutorRunner实现的,这个名字起得也比較贴切。11行将新建的executor放到上面提到的Hash Map中。

    然后12行启动这个Executor。13行和14行将如今已经使用的core和memory进行的统计。15到17行实际上是向Master报告Executor的状态。这里须要加锁。

    假设在这过程中有异常抛出,那么须要check是否是executor已经加到Hash Map中,假设有则首先停止它。然后从Hash Map中删除它。而且向Master report Executor是FAILED的。Master会又一次启动新的Executor。


    接下来看一下Driver的Hash Map的使用。通过KillDriver:

        case KillDriver(driverId) => {
          logInfo(s"Asked to kill driver $driverId")
          drivers.get(driverId) match {
            case Some(runner) =>
              runner.kill()
            case None =>
              logError(s"Asked to kill unknown driver $driverId")
          }
        }

    这个KillDirver的命令实际上由Master发出的。而Master实际上接收了Client的kill driver的命令。这个也能够看出Scala语言的简洁性。




    版权声明:本文博主原创文章,博客,未经同意不得转载。

  • 相关阅读:
    历史书单
    《Tornado介绍》—— 读后总结
    基于TensorFlow的深度学习系列教程 2——常量Constant
    深度学习Tensorflow生产环境部署(下·模型部署篇)
    深度学习Tensorflow生产环境部署(上·环境准备篇)
    Crontab和sudo中无法使用TensorFlow ImportError libcublas.so.9.0
    基于TensorFlow的深度学习系列教程 1——Hello World!
    想要接触人工智能吗?先要学会如何阅读论文
    《数据挖掘与数据化运营实战 思路、方法、技巧与应用》—— 读书笔记
    《新参者》—— 读后总结
  • 原文地址:https://www.cnblogs.com/blfshiye/p/4809409.html
Copyright © 2011-2022 走看看