zoukankan      html  css  js  c++  java
  • Spark技术内幕:Client,Master和Worker 通信源代码解析

    Spark的Cluster Manager能够有几种部署模式:

    1. Standlone
    2. Mesos
    3. YARN
    4. EC2
    5. Local

    在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并终于完毕计算。具体阐述能够阅读《Spark:大数据的电花火石! 》。

    那么Standalone模式下,Client。Master和Worker是怎样进行通信,注冊并开启服务的呢?


    1. node之间的RPC - akka

    模块间通信有非常多成熟的实现。如今非常多成熟的Framework已经早已经让我们摆脱原始的Socket编程了。简单归类。能够归纳为基于消息的传递和基于资源共享的同步机制。

    基于消息的传递的机制应用比較广泛的有Message Queue。

    Message Queue, 是一种应用程序相应用程序的通信方法。

    应用程序通过读写出入队列的消息(针相应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信。而不是通过直接调用彼此来通信。直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同一时候运行的要求。当中较为成熟的MQ产品有IBM WEBSPHERE MQ和RabbitMQ(AMQP的开源实现,如今由Pivotal维护)。

    还有不得不提的是ZeroMQ,一个致力于进入Linux内核的基于Socket的编程框架。官方的说法: “ZeroMQ是一个简单好用的传输层,像框架一样的一个socket library,它使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库。可在多个线程、内核和主机盒之间弹性伸缩。

    ZMQ的明白目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。

    Spark在非常多模块之间的通信选择是Scala原生支持的akka,一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。akka有以下5个特性:

    1. 易于构建并行和分布式应用 (Simple Concurrency & Distribution):  Akka在设计时採用了异步通讯和分布式架构,并对上层进行抽象。如Actors、Futures ,STM等。
    2. 可靠性(Resilient by Design): 系统具备自愈能力。在本地/远程都有监护。
    3. 高性能(High Performance):在单机中每秒可发送50,000,000个消息。

      内存占用小。1GB内存中可保存2,500,000个actors。

    4. 弹性,无中心(Elastic — Decentralized):自适应的负责均衡,路由,分区。配置
    5. 可扩展(Extensible):能够使用Akka 扩展包进行扩展。

    在Spark中的Client,Master和Worker实际上都是一个actor。拿Client来说:

    import akka.actor._
    import akka.pattern.ask
    import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
    
    private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
      var masterActor: ActorSelection = _
      val timeout = AkkaUtils.askTimeout(conf)
    
      override def preStart() = {
        masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
    
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    
        println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
    
        driverArgs.cmd match {
          case "launch" =>
            ...
            masterActor ! RequestSubmitDriver(driverDescription)
    
          case "kill" =>
            val driverId = driverArgs.driverId
            val killFuture = masterActor ! RequestKillDriver(driverId)
        }
      }
    
      override def receive = {
    
        case SubmitDriverResponse(success, driverId, message) =>
          println(message)
          if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
    
        case KillDriverResponse(driverId, success, message) =>
          println(message)
          if (success) pollAndReportStatus(driverId) else System.exit(-1)
    
        case DisassociatedEvent(_, remoteAddress, _) =>
          println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
          System.exit(-1)
    
        case AssociationErrorEvent(cause, _, remoteAddress, _) =>
          println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
          println(s"Cause was: $cause")
          System.exit(-1)
      }
    }
    
    /**
     * Executable utility for starting and terminating drivers inside of a standalone cluster.
     */
    object Client {
      def main(args: Array[String]) {
        println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
        println("Use ./bin/spark-submit with "--master spark://host:port"")
    
        val conf = new SparkConf()
        val driverArgs = new ClientArguments(args)
    
        if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
          conf.set("spark.akka.logLifecycleEvents", "true")
        }
        conf.set("spark.akka.askTimeout", "10")
        conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
        Logger.getRootLogger.setLevel(driverArgs.logLevel)
    
        // TODO: See if we can initialize akka so return messages are sent back using the same TCP
        //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
        val (actorSystem, _) = AkkaUtils.createActorSystem(
          "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    
        actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
    
        actorSystem.awaitTermination()
      }
    }

    当中第19行的含义就是向Master提交Driver的请求。

    masterActor ! RequestSubmitDriver(driverDescription)

    而Master将在receive里处理这个请求。

    当然了27行到44行的是处理Client Actor收到的消息。

    能够看出。通过akka。能够非常easy高效的处理模块间的通信。这能够说是Spark RPC的一大特色。


    2. Client。Master和Workerq启动通信具体解释

    源代码位置:spark-1.0.0coresrcmainscalaorgapachesparkdeploy。主要涉及的类:Client.scala, Master.scala和Worker.scala。

    这三大模块之间的通信框架例如以下图。

    Standalone模式下存在的角色:

    1. Client:负责提交作业到Master。

    2. Master:接收Client提交的作业,管理Worker。并命令Worker启动Driver和Executor。

    3. Worker:负责管理本节点的资源,定期向Master汇报心跳。接收Master的命令。比方启动Driver和Executor。

    实际上。Master和Worker要处理的消息要比这多得多,本图仅仅是反映了集群启动和向集群提交运算时候的主要消息处理。

    接下来将分别走读这三大角色的源代码。


    2.1 Client源代码解析

    Client启动:

    object Client {
      def main(args: Array[String]) {
        println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
        println("Use ./bin/spark-submit with "--master spark://host:port"")
    
        val conf = new SparkConf()
        val driverArgs = new ClientArguments(args)
    
        if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
          conf.set("spark.akka.logLifecycleEvents", "true")
        }
        conf.set("spark.akka.askTimeout", "10")
        conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
        Logger.getRootLogger.setLevel(driverArgs.logLevel)
    
        // TODO: See if we can initialize akka so return messages are sent back using the same TCP
        //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
        val (actorSystem, _) = AkkaUtils.createActorSystem(
          "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
        // 使用ClientActor初始化actorSystem
        actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
        //启动并等待actorSystem的结束
        actorSystem.awaitTermination()
      }
    }

    从行21能够看出,核心实现是由ClientActor实现的。

    Client的Actor是akka.Actor的一个扩展。对于Actor。从它对recevie的override就能够看出它须要处理的消息。

      override def receive = {
    
        case SubmitDriverResponse(success, driverId, message) =>
          println(message)
          if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
    
        case KillDriverResponse(driverId, success, message) =>
          println(message)
          if (success) pollAndReportStatus(driverId) else System.exit(-1)
    
        case DisassociatedEvent(_, remoteAddress, _) =>
          println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
          System.exit(-1)
    
        case AssociationErrorEvent(cause, _, remoteAddress, _) =>
          println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
          println(s"Cause was: $cause")
          System.exit(-1)
      }


    2.2 Master的源代码分析

    源代码分析详见凝视。

     override def receive = {
        case ElectedLeader => {
          // 被选为Master,首先推断是否该Master原来为active,假设是那么进行Recovery。
        }
        case CompleteRecovery => completeRecovery() // 删除没有响应的worker和app,而且将全部没有worker的Driver分配worker
        case RevokedLeadership => {
          // Master将关闭。

    } case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { // 假设该Master不是active,不做不论什么操作,返回 // 假设注冊过该worker id,向sender返回错误 sender ! RegisterWorkerFailed("Duplicate worker ID") // 注冊worker,假设worker注冊成功则返回成功的消息而且进行调度 sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() // 假设worker注冊失败,发送消息到sender sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress) } case RequestSubmitDriver(description) => { // 假设master不是active,返回错误 sender ! SubmitDriverResponse(false, None, msg) // 否则创建driver,返回成功的消息 sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}") } } case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { // 假设master不是active,返回错误 val msg = s"Can only kill drivers in ALIVE state. Current state: $state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => //假设driver仍然在等待队列,从等待队列删除而且更新driver状态为KILLED } else { // 通知worker kill driver id的driver。结果会由workder发消息给master ! DriverStateChanged d.worker.foreach { w => w.actor ! KillDriver(driverId) } } // 注意,此时driver不一定被kill,master仅仅是通知了worker去kill driver。 sender ! KillDriverResponse(driverId, success = true, msg) case None => // driver已经被kill,直接返回结果 sender ! KillDriverResponse(driverId, success = false, msg) } } } case RequestDriverStatus(driverId) => { // 查找请求的driver,假设找到则返回driver的状态 (drivers ++ completedDrivers).find(_.id == driverId) match { case Some(driver) => sender ! DriverStatusResponse(found = true, Some(driver.state), driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) case None => sender ! DriverStatusResponse(found = false, None, None, None, None) } } case RegisterApplication(description) => { //假设是standby,那么忽略这个消息 //否则注冊application;返回结果而且開始调度 } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { // 通过idToApp获得app。然后通过app获得executors,从而通过execId获得executor val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { exec.state = state exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { val appInfo = idToApp(appId) // Remove this executor from the worker and app logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) } } } case DriverStateChanged(driverId, state, exception) => { // 假设Driver的state为ERROR | FINISHED | KILLED | FAILED。 删除它。 } case Heartbeat(workerId) => { // 更新worker的时间戳 workerInfo.lastHeartbeat = System.currentTimeMillis() } case MasterChangeAcknowledged(appId) => { // 将appId的app的状态置为WAITING。为切换Master做准备。

    } case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { // 通过workerId查找到worker。那么worker的state置为ALIVE, // 而且查找状态为idDefined的executors。而且将这些executors都增加到app中, // 然后保存这些app到worker中。

    能够理解为Worker在Master端的Recovery idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) worker.addExecutor(execInfo) execInfo.copyState(exec) } // 将全部的driver设置为RUNNING然后增加到worker中。 for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } } } case DisassociatedEvent(_, address, _) => { // 这个请求是Worker或者是App发送的。删除address相应的Worker和App // 假设Recovery能够结束,那么结束Recovery } case RequestMasterState => { //向sender返回master的状态 sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state) } case CheckForWorkerTimeOut => { //删除超时的Worker } case RequestWebUIPort => { //向sender返回web ui的端口号 sender ! WebUIPortResponse(webUi.boundPort) } }



    2.3 Worker 源代码解析

    通过对Client和Master的源代码解析。相信你也知道怎样去分析Worker是怎样和Master进行通信的了,没错。答案就在以下:

    override def receive

    參考资料:

    Spark源代码1.0.0。


    请您支持:假设您读到这里。相信本文对您有所帮助。请点击投票支持一下吧。

    假设您已经在投票页面,请点击以下的投一票吧!

    BTW,即使您没有CSDN的帐号,能够使用第三方登录的,包含微博。QQ,Gmail。GitHub,百度,等。


  • 相关阅读:
    Windows Phone 一步一步从入门到精通
    备忘录模式(Memento)
    开放封闭原则(OCP)
    建造者模式(Bulider)
    原型模式(Prototype)
    Windows Workflow Foundation(WF) 一步一步从入门到精通
    模板方法模式
    代理模式(Proxy)
    装饰模式(Decorator)
    迪米特法则(LoD)最少知识原则
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/6939681.html
Copyright © 2011-2022 走看看