zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(九):数据存储——启动通信

    主要组件:BlockManager、BlockManagerMaster、BlockManagerWorker、BlockManagerMasterActor、BlockManagerSlaveActor……

    存储模块相关的组件,不论是Driver还是Worker,都是在SparkEnv 中实例化的

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
    
    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
      serializer, conf, securityManager, mapOutputTracker)
    
    val connectionManager = blockManager.connectionManager
    
    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    
    val cacheManager = new CacheManager(blockManager)

    注意到BlockManagermaster实例化参数中用到了registerOrLookup,具体判断是否为driver,如果是则创建Actor,否则获取driver的URL和Port,获取远程在driver上的Actor。即,只有driver(不论运行在哪个executor)运行着BlockManagerMaster,其他都只有actor的ref

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            actorSystem.actorOf(Props(newActor), name = name)
          } else {
            val driverHost: String = conf.get("spark.driver.host", "localhost")
            val driverPort: Int = conf.getInt("spark.driver.port", 7077)
            Utils.checkHost(driverHost, "Expected hostname")
            val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
            val timeout = AkkaUtils.lookupTimeout(conf)
            logInfo(s"Connecting to $name: $url")
            Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
          }
        }

    然后,实例化BlockManger,类定义中调用了initialize方法初始化,其定义如下。首先blockManagerMaster调用registerBlockManager,带上自己的slaveActor向driver注册BlockManager,其中第三个参数是slaveActor;然后通过BlockManagerWorker伴生对象调用startBlockManagerWorker,启动blockManagerWorker

    initialize() 
    
      /**
       * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
       * BlockManagerWorker actor.
       */
      private def initialize() {
        master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
        BlockManagerWorker.startBlockManagerWorker(this)
        if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
          heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
            Utils.tryOrExit { heartBeat() }
          }
        }
      }

    首先看一下slaveActor,定义如下,actorSystem创建BlockManagerSlaveActor

    val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
        name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)

    回到registerBlockManager,调用tell发送RegisterBlockManager消息

    /** Register the BlockManager's id with the driver. */
      def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
        logInfo("Trying to register BlockManager")
        tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
        logInfo("Registered BlockManager")
      }

    tell的作用是发送单程消息给master actor,即运行在driver上的blockManagerMasterActor

    /** Send a one-way message to the master actor, to which we expect it to reply with true. */
      private def tell(message: Any) {
        if (!askDriverWithReply[Boolean](message)) {
          throw new SparkException("BlockManagerMasterActor returned false, expected true.")
        }
      }

    在BlockManagerMaster中定义receive接收,调用register注册

    def receive = {
        case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
          register(blockManagerId, maxMemSize, slaveActor)
          sender ! true
    
        case UpdateBlockInfo(
          blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
          // TODO: Ideally we want to handle all the message replies in receive instead of in the
          // individual private methods.
          updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)
    
        case GetLocations(blockId) =>
          sender ! getLocations(blockId)
    
        case GetLocationsMultipleBlockIds(blockIds) =>
          sender ! getLocationsMultipleBlockIds(blockIds)
    
        case GetPeers(blockManagerId, size) =>
          sender ! getPeers(blockManagerId, size)
    
        case GetMemoryStatus =>
          sender ! memoryStatus
    
        case GetStorageStatus =>
          sender ! storageStatus
    
        case GetBlockStatus(blockId, askSlaves) =>
          sender ! blockStatus(blockId, askSlaves)
    
        case GetMatchingBlockIds(filter, askSlaves) =>
          sender ! getMatchingBlockIds(filter, askSlaves)
    
        case RemoveRdd(rddId) =>
          sender ! removeRdd(rddId)
    
        case RemoveShuffle(shuffleId) =>
          sender ! removeShuffle(shuffleId)
    
        case RemoveBroadcast(broadcastId, removeFromDriver) =>
          sender ! removeBroadcast(broadcastId, removeFromDriver)
    
        case RemoveBlock(blockId) =>
          removeBlockFromWorkers(blockId)
          sender ! true
    
        case RemoveExecutor(execId) =>
          removeExecutor(execId)
          sender ! true
    
        case StopBlockManagerMaster =>
          logInfo("Stopping BlockManagerMaster")
          sender ! true
          if (timeoutCheckingTask != null) {
            timeoutCheckingTask.cancel()
          }
          context.stop(self)
    
        case ExpireDeadHosts =>
          expireDeadHosts()
    
        case HeartBeat(blockManagerId) =>
          sender ! heartBeat(blockManagerId)
    
        case other =>
          logWarning("Got unknown message: " + other)
      }

    register判断如果不存在此blockManager,则将blockManagerId和executorId对应起来,并向blockManagerInfo中添加新实例slaveActor

    private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
        if (!blockManagerInfo.contains(id)) {
          blockManagerIdByExecutor.get(id.executorId) match {
            case Some(manager) =>
              // A block manager of the same executor already exists.
              // This should never happen. Let's just quit.
              logError("Got two different block manager registrations on " + id.executorId)
              System.exit(1)
            case None =>
              blockManagerIdByExecutor(id.executorId) = id
          }
          blockManagerInfo(id) =
            new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
        }
        listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
      }

    回到initialize(),第二句BlockManagerWorker.startBlockManagerWorker(this),实例化BlockManagerWorker

    def startBlockManagerWorker(manager: BlockManager) {
        blockManagerWorker = new BlockManagerWorker(manager)
      }

    BlockManagerWorker类定义中,监听远程的block存取请求来进行相应处理

    blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)

    最后,存储模块各组件关系如下图所示

    image

    END

  • 相关阅读:
    Android——继续深造——从安装Android Studio 2.0开始(详)
    PHP——安装wampserver丢失MSVCR110.dll
    Marza Gift for GDC 2016
    Retrieve OpenGL Context from Qt 5.5 on OSX
    Space Time Varying Color Palette
    Screen Space Depth Varying Glow based on Heat Diffusion
    Visualization of Detail Point Set by Local Algebraic Sphere Fitting
    Glass Dragon
    Jump Flood Algorithms for Centroidal Voronoi Tessellation
    京都之行
  • 原文地址:https://www.cnblogs.com/kevingu/p/4761844.html
Copyright © 2011-2022 走看看