zoukankan      html  css  js  c++  java
  • Spark技术内幕:Client,Master和Worker 通信源码解析

    http://blog.csdn.net/anzhsoft/article/details/30802603

    Spark的Cluster Manager可以有几种部署模式:

    1. Standlone
    2. Mesos
    3. YARN
    4. EC2
    5. Local

    在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并最终完成计算。具体阐述可以阅读《Spark:大数据的电花火石! 》。

    那么Standalone模式下,Client,Master和Worker是如何进行通信,注册并开启服务的呢?


    1. node之间的RPC - akka

    模块间通信有很多成熟的实现,现在很多成熟的Framework已经早已经让我们摆脱原始的Socket编程了。简单归类,可以归纳为基于消息的传递和基于资源共享的同步机制。

    基于消息的传递的机制应用比较广泛的有Message Queue。Message Queue, 是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ和RabbitMQ(AMQP的开源实现,现在由Pivotal维护)。

    还有不得不提的是ZeroMQ,一个致力于进入Linux内核的基于Socket的编程框架。官方的说法: “ZeroMQ是一个简单好用的传输层,像框架一样的一个socket library,它使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。

    Spark在很多模块之间的通信选择是Scala原生支持的akka,一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。akka有以下5个特性:

    1. 易于构建并行和分布式应用 (Simple Concurrency & Distribution):  Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。
    2. 可靠性(Resilient by Design): 系统具备自愈能力,在本地/远程都有监护。
    3. 高性能(High Performance):在单机中每秒可发送50,000,000个消息。内存占用小,1GB内存中可保存2,500,000个actors。
    4. 弹性,无中心(Elastic — Decentralized):自适应的负责均衡,路由,分区,配置
    5. 可扩展(Extensible):可以使用Akka 扩展包进行扩展。

    在Spark中的Client,Master和Worker实际上都是一个actor,拿Client来说:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. import akka.actor._  
    2. import akka.pattern.ask  
    3. import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}  
    4.   
    5. private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {  
    6.   var masterActor: ActorSelection = _  
    7.   val timeout = AkkaUtils.askTimeout(conf)  
    8.   
    9.   override def preStart() = {  
    10.     masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))  
    11.   
    12.     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])  
    13.   
    14.     println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")  
    15.   
    16.     driverArgs.cmd match {  
    17.       case "launch" =>  
    18.         ...  
    19.         masterActor ! RequestSubmitDriver(driverDescription)  
    20.   
    21.       case "kill" =>  
    22.         val driverId = driverArgs.driverId  
    23.         val killFuture = masterActor ! RequestKillDriver(driverId)  
    24.     }  
    25.   }  
    26.   
    27.   override def receive = {  
    28.   
    29.     case SubmitDriverResponse(success, driverId, message) =>  
    30.       println(message)  
    31.       if (success) pollAndReportStatus(driverId.get) else System.exit(-1)  
    32.   
    33.     case KillDriverResponse(driverId, success, message) =>  
    34.       println(message)  
    35.       if (success) pollAndReportStatus(driverId) else System.exit(-1)  
    36.   
    37.     case DisassociatedEvent(_, remoteAddress, _) =>  
    38.       println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")  
    39.       System.exit(-1)  
    40.   
    41.     case AssociationErrorEvent(cause, _, remoteAddress, _) =>  
    42.       println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")  
    43.       println(s"Cause was: $cause")  
    44.       System.exit(-1)  
    45.   }  
    46. }  
    47.   
    48. /** 
    49.  * Executable utility for starting and terminating drivers inside of a standalone cluster. 
    50.  */  
    51. object Client {  
    52.   def main(args: Array[String]) {  
    53.     println("WARNING: This client is deprecated and will be removed in a future version of Spark.")  
    54.     println("Use ./bin/spark-submit with "--master spark://host:port"")  
    55.   
    56.     val conf = new SparkConf()  
    57.     val driverArgs = new ClientArguments(args)  
    58.   
    59.     if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {  
    60.       conf.set("spark.akka.logLifecycleEvents""true")  
    61.     }  
    62.     conf.set("spark.akka.askTimeout""10")  
    63.     conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN""WARNING"))  
    64.     Logger.getRootLogger.setLevel(driverArgs.logLevel)  
    65.   
    66.     // TODO: See if we can initialize akka so return messages are sent back using the same TCP  
    67.     //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.  
    68.     val (actorSystem, _) = AkkaUtils.createActorSystem(  
    69.       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))  
    70.   
    71.     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))  
    72.   
    73.     actorSystem.awaitTermination()  
    74.   }  
    75. }  

    其中第19行的含义就是向Master提交Driver的请求,

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. masterActor ! RequestSubmitDriver(driverDescription)  

    而Master将在receive里处理这个请求。当然了27行到44行的是处理Client Actor收到的消息。

    可以看出,通过akka,可以非常简单高效的处理模块间的通信,这可以说是Spark RPC的一大特色。


    2. Client,Master和Workerq启动通信详解

    源码位置:spark-1.0.0coresrcmainscalaorgapachesparkdeploy。主要涉及的类:Client.scala, Master.scala和Worker.scala。这三大模块之间的通信框架如下图。

    Standalone模式下存在的角色:

    1. Client:负责提交作业到Master。

    2. Master:接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。

    3. Worker:负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,比如启动Driver和Executor。

    实际上,Master和Worker要处理的消息要比这多得多,本图只是反映了集群启动和向集群提交运算时候的主要消息处理。

    接下来将分别走读这三大角色的源码。


    2.1 Client源码解析

    Client启动:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. object Client {  
    2.   def main(args: Array[String]) {  
    3.     println("WARNING: This client is deprecated and will be removed in a future version of Spark.")  
    4.     println("Use ./bin/spark-submit with "--master spark://host:port"")  
    5.   
    6.     val conf = new SparkConf()  
    7.     val driverArgs = new ClientArguments(args)  
    8.   
    9.     if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {  
    10.       conf.set("spark.akka.logLifecycleEvents""true")  
    11.     }  
    12.     conf.set("spark.akka.askTimeout""10")  
    13.     conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN""WARNING"))  
    14.     Logger.getRootLogger.setLevel(driverArgs.logLevel)  
    15.   
    16.     // TODO: See if we can initialize akka so return messages are sent back using the same TCP  
    17.     //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.  
    18.     val (actorSystem, _) = AkkaUtils.createActorSystem(  
    19.       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))  
    20.     // 使用ClientActor初始化actorSystem  
    21.     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))  
    22.     //启动并等待actorSystem的结束  
    23.     actorSystem.awaitTermination()  
    24.   }  
    25. }  

    从行21可以看出,核心实现是由ClientActor实现的。Client的Actor是akka.Actor的一个扩展。对于Actor,从它对recevie的override就可以看出它需要处理的消息。

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. override def receive = {  
    2.   
    3.   case SubmitDriverResponse(success, driverId, message) =>  
    4.     println(message)  
    5.     if (success) pollAndReportStatus(driverId.get) else System.exit(-1)  
    6.   
    7.   case KillDriverResponse(driverId, success, message) =>  
    8.     println(message)  
    9.     if (success) pollAndReportStatus(driverId) else System.exit(-1)  
    10.   
    11.   case DisassociatedEvent(_, remoteAddress, _) =>  
    12.     println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")  
    13.     System.exit(-1)  
    14.   
    15.   case AssociationErrorEvent(cause, _, remoteAddress, _) =>  
    16.     println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")  
    17.     println(s"Cause was: $cause")  
    18.     System.exit(-1)  
    19. }  


    2.2 Master的源码分析

    源码分析详见注释。

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. override def receive = {  
    2.    case ElectedLeader => {  
    3.      // 被选为Master,首先判断是否该Master原来为active,如果是那么进行Recovery。  
    4.    }  
    5.    case CompleteRecovery => completeRecovery() // 删除没有响应的worker和app,并且将所有没有worker的Driver分配worker  
    6.    case RevokedLeadership => {  
    7.      // Master将关闭。  
    8.    }  
    9.    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>  
    10.    {        
    11.      // 如果该Master不是active,不做任何操作,返回  
    12.      // 如果注册过该worker id,向sender返回错误  
    13.      sender ! RegisterWorkerFailed("Duplicate worker ID")  
    14.      // 注册worker,如果worker注册成功则返回成功的消息并且进行调度  
    15.      sender ! RegisteredWorker(masterUrl, masterWebUiUrl)  
    16.      schedule()  
    17.      // 如果worker注册失败,发送消息到sender  
    18.      sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)  
    19.    }  
    20.    case RequestSubmitDriver(description) => {  
    21.        // 如果master不是active,返回错误  
    22.        sender ! SubmitDriverResponse(false, None, msg)  
    23.        // 否则创建driver,返回成功的消息  
    24.        sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")  
    25.      }  
    26.    }  
    27.    case RequestKillDriver(driverId) => {  
    28.      if (state != RecoveryState.ALIVE) {  
    29.        // 如果master不是active,返回错误  
    30.        val msg = s"Can only kill drivers in ALIVE state. Current state: $state."  
    31.        sender ! KillDriverResponse(driverId, success = false, msg)  
    32.      } else {  
    33.        logInfo("Asked to kill driver " + driverId)  
    34.        val driver = drivers.find(_.id == driverId)  
    35.        driver match {  
    36.          case Some(d) =>  
    37.              //如果driver仍然在等待队列,从等待队列删除并且更新driver状态为KILLED  
    38.            } else {  
    39.              // 通知worker kill driver id的driver。结果会由workder发消息给master ! DriverStateChanged  
    40.              d.worker.foreach { w => w.actor ! KillDriver(driverId) }  
    41.            }  
    42.            // 注意,此时driver不一定被kill,master只是通知了worker去kill driver。  
    43.            sender ! KillDriverResponse(driverId, success = true, msg)  
    44.          case None =>  
    45.            // driver已经被kill,直接返回结果  
    46.            sender ! KillDriverResponse(driverId, success = false, msg)  
    47.        }  
    48.      }  
    49.    }  
    50.    case RequestDriverStatus(driverId) => {  
    51.      // 查找请求的driver,如果找到则返回driver的状态  
    52.      (drivers ++ completedDrivers).find(_.id == driverId) match {  
    53.        case Some(driver) =>  
    54.          sender ! DriverStatusResponse(found = true, Some(driver.state),  
    55.            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)  
    56.        case None =>  
    57.          sender ! DriverStatusResponse(found = false, None, None, None, None)  
    58.      }  
    59.    }  
    60.    case RegisterApplication(description) => {  
    61.        //如果是standby,那么忽略这个消息  
    62.        //否则注册application;返回结果并且开始调度  
    63.    }  
    64.    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {  
    65.      // 通过idToApp获得app,然后通过app获得executors,从而通过execId获得executor  
    66.      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))  
    67.      execOption match {  
    68.        case Some(exec) => {  
    69.          exec.state = state  
    70.          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)  
    71.          if (ExecutorState.isFinished(state)) {  
    72.            val appInfo = idToApp(appId)  
    73.            // Remove this executor from the worker and app  
    74.            logInfo("Removing executor " + exec.fullId + " because it is " + state)  
    75.            appInfo.removeExecutor(exec)  
    76.            exec.worker.removeExecutor(exec)  
    77.           }  
    78.      }  
    79.    }  
    80.    case DriverStateChanged(driverId, state, exception) => {  
    81.      //  如果Driver的state为ERROR | FINISHED | KILLED | FAILED, 删除它。  
    82.    }  
    83.    case Heartbeat(workerId) => {  
    84.      // 更新worker的时间戳 workerInfo.lastHeartbeat = System.currentTimeMillis()  
    85.    }  
    86.    case MasterChangeAcknowledged(appId) => {  
    87.      //  将appId的app的状态置为WAITING,为切换Master做准备。  
    88.      }  
    89.    case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {  
    90.      // 通过workerId查找到worker,那么worker的state置为ALIVE,  
    91.      // 并且查找状态为idDefined的executors,并且将这些executors都加入到app中,  
    92.      // 然后保存这些app到worker中。可以理解为Worker在Master端的Recovery  
    93.      idToWorker.get(workerId) match {  
    94.        case Some(worker) =>  
    95.          logInfo("Worker has been re-registered: " + workerId)  
    96.          worker.state = WorkerState.ALIVE  
    97.   
    98.          val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)  
    99.          for (exec <- validExecutors) {  
    100.            val app = idToApp.get(exec.appId).get  
    101.            val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))  
    102.            worker.addExecutor(execInfo)  
    103.            execInfo.copyState(exec)  
    104.          }  
    105.          // 将所有的driver设置为RUNNING然后加入到worker中。  
    106.          for (driverId <- driverIds) {  
    107.            drivers.find(_.id == driverId).foreach { driver =>  
    108.              driver.worker = Some(worker)  
    109.              driver.state = DriverState.RUNNING  
    110.              worker.drivers(driverId) = driver  
    111.            }  
    112.          }  
    113.      }  
    114.    }  
    115.    case DisassociatedEvent(_, address, _) => {  
    116.      // 这个请求是Worker或者是App发送的。删除address对应的Worker和App  
    117.      // 如果Recovery可以结束,那么结束Recovery        
    118.    }  
    119.    case RequestMasterState => {  
    120.      //向sender返回master的状态  
    121.      sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)  
    122.    }  
    123.    case CheckForWorkerTimeOut => {  
    124.      //删除超时的Worker  
    125.    }  
    126.    case RequestWebUIPort => {  
    127.      //向sender返回web ui的端口号  
    128.      sender ! WebUIPortResponse(webUi.boundPort)  
    129.    }  
    130.  }  


    2.3 Worker 源码解析

    通过对Client和Master的源码解析,相信你也知道如何去分析Worker是如何和Master进行通信的了,没错,答案就在下面:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. override def receive  

  • 相关阅读:
    【带权并查集】How Many Answers Are Wrong HDU
    【带权并查集+离散化】Parity game POJ
    【并查集】Supermarket POJ
    【并查集】P3958 奶酪
    【并查集-判环】Is It A Tree? POJ
    【最短路/线性差分约束】Layout POJ
    【最短路-负环】Extended Traffic LightOJ
    【最短路】Subway POJ
    【最短路-判负环 Floyd】Wormholes POJ
    [JZOJ]1293.气象牛[区间DP]
  • 原文地址:https://www.cnblogs.com/bluejoe/p/5115905.html
Copyright © 2011-2022 走看看