zoukankan      html  css  js  c++  java
  • 【原】 Spark中Worker源码分析(二)

    继续前一篇的内容。前一篇内容为:
    Spark中Worker源码分析(一)http://www.cnblogs.com/yourarebest/p/5300202.html

    4.receive方法,
    receive方法主要分为以下14种情况:

    (1)worker向master注册成功后,详见代码
    (2)worker向master发送心跳消息,如果还没有注册到master上,该消息将被忽略,详见代码
    (3)worker的工作空间的清理,详见代码
    (4)更换master,详见代码
    (5)worker注册失败,详见代码
    (6)再次连接worker,详见代码
    (7)创建executor,详见代码
    (8)executor的转态发生改变时,详见代码
    (9)kill executor,详见代码
    (10)创建driver,详见代码
    (11)kill driver,详见代码
    (12)driver的状态发生变化时,详见代码
    (13)将worker注册到master上,详见代码
    (14)app执行完毕,详见代码
    worker与master相关的交互为(1)(2)(4)(6)(13)
    worker与driver相关的交互为(10)(11)(12)
    worker与executor相关的交互为(3)(7)(8)(9)(14),需要说明的是(3)(14)它们的完成都与executor有着密切的联系。



    <code>
    override def receive: PartialFunction[Any, Unit] = {
        //(1)注册成功的Woker
        case RegisteredWorker(masterRef, masterWebUiUrl) =>
          logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
          registered = true
          changeMaster(masterRef, masterWebUiUrl)
          //守护线程15s发送一次心跳消息
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(SendHeartbeat)
            }
          }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
          //如果允许清理
          if (CLEANUP_ENABLED) {
            logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
            forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                //守护线程30min清理app文件夹
                self.send(WorkDirCleanup)
              }
            }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
          }
        //(2)worker向master发送心跳消息,如果还没有注册到master上,该消息将被忽略
        case SendHeartbeat =>
          if (connected) { sendToMaster(Heartbeat(workerId, self)) }
        //(3)worker的工作空间的清理
        case WorkDirCleanup =>
             //为了加快独立将来独立线程的清理工作,不要占用worker rpcEndpoint的端口号,拷贝ids所以它可以被清理线程使用
          val appIds = executors.values.map(_.appId).toSet
          val cleanupFuture = concurrent.future {
            val appDirs = workDir.listFiles()
            if (appDirs == null) {
              throw new IOException("ERROR: Failed to list files in " + appDirs)
            }
            appDirs.filter { dir =>
              //目录正在被app使用-当清理时检查app是否在运行
              val appIdFromDir = dir.getName
              val isAppStillRunning = appIds.contains(appIdFromDir)
              dir.isDirectory && !isAppStillRunning &&
              !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
            }.foreach { dir =>
              logInfo(s"Removing directory: ${dir.getPath}")
              Utils.deleteRecursively(dir)
            }
          }(cleanupThreadExecutor)
          cleanupFuture.onFailure {
            case e: Throwable =>
              logError("App dir cleanup failed: " + e.getMessage, e)
          }(cleanupThreadExecutor)
        //(4)更换master
        case MasterChanged(masterRef, masterWebUiUrl) =>
          logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
          changeMaster(masterRef, masterWebUiUrl)
          val execs = executors.values.
            map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
          masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
        //(5)worker注册失败
        case RegisterWorkerFailed(message) =>
          if (!registered) {
            logError("Worker registration failed: " + message)
            System.exit(1)
          }
        //(6)再次连接Worker
        case ReconnectWorker(masterUrl) =>
          logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
          //再次将worker注册到masters上
          registerWithMaster()
        //(7)创建Executor
        case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>if (masterUrl != activeMasterUrl) {
            logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
          } else {
            try {
              logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
              //创建executor的工作目录
              val executorDir = new File(workDir, appId + "/" + execId)
              if (!executorDir.mkdirs()) {
                throw new IOException("Failed to create directory " + executorDir)
              }
              //为executors创建本地目录,通过SPARK_EXECUTOR_DIRS环境变量设置,当app执行完后并删除
              val appLocalDirs = appDirectories.get(appId).getOrElse {
                Utils.getOrCreateLocalRootDirs(conf).map { dir =>
                  val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                  Utils.chmod700(appDir)
                  appDir.getAbsolutePath()
                }.toSeq
              }
              appDirectories(appId) = appLocalDirs
              val manager = new ExecutorRunner(
                appId,
                execId,
                appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
                cores_,
                memory_,
                self,
                workerId,
                host,
                webUi.boundPort,
                publicAddress,
                sparkHome,
                executorDir,
                workerUri,
                conf,
                appLocalDirs, ExecutorState.LOADING)
              executors(appId + "/" + execId) = manager
              manager.start()
              coresUsed += cores_
              memoryUsed += memory_
              sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
            } catch {
              case e: Exception => {
                logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
                if (executors.contains(appId + "/" + execId)) {
                  executors(appId + "/" + execId).kill()
                  executors -= appId + "/" + execId
                }
                sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
                  Some(e.toString), None))
              }
            }
          }
        //(8)executor的转态发生改变时
        case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
          handleExecutorStateChanged(executorStateChanged)
        //(9)kill executor
        case KillExecutor(masterUrl, appId, execId) =>
          if (masterUrl != activeMasterUrl) {
            logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
          } else {
            val fullId = appId + "/" + execId
            executors.get(fullId) match {
              case Some(executor) =>
                logInfo("Asked to kill executor " + fullId)
                executor.kill()
              case None =>
                logInfo("Asked to kill unknown executor " + fullId)
            }
          }
        //(10)创建Driver
        case LaunchDriver(driverId, driverDesc) => {
          logInfo(s"Asked to launch driver $driverId")
          val driver = new DriverRunner(
            conf,
            driverId,
            workDir,
            sparkHome,
            driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
            self,
            workerUri,
            securityMgr)
          drivers(driverId) = driver
          driver.start(
          coresUsed += driverDesc.cores
          memoryUsed += driverDesc.mem
        }
        //(11)kill Driver
        case KillDriver(driverId) => {
          logInfo(s"Asked to kill driver $driverId")
          drivers.get(driverId) match {
            case Some(runner) =>
              runner.kill()
            case None =>
              logError(s"Asked to kill unknown driver $driverId")
          }
        }
        //(12)driver的状态发生变化时
        case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
          handleDriverStateChanged(driverStateChanged)
        }
        //(13)将worker注册到master上
        case ReregisterWithMaster =>
          reregisterWithMaster()
        //(14)app执行完毕
        case ApplicationFinished(id) =>
          finishedApps += id
          //删除执行完的app在执行过程中创建的本地文件
          maybeCleanupApplication(id)
      }
    </code>

  • 相关阅读:
    关于Js异常
    gitea windows 安装
    spring boot 错误页面配置
    mysql 常用用函数
    nginx 安装 tomcat pfx 格式证书
    git pull 报错
    maven 打 jar 包,包含 xml, 包含 额外 jar
    git clone 分支代码
    git 切换远程分支
    mycat 在 mysql 8.0 下 无法连接 bug
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5300219.html
Copyright © 2011-2022 走看看