zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(九):数据存储——启动通信

    主要组件:BlockManager、BlockManagerMaster、BlockManagerWorker、BlockManagerMasterActor、BlockManagerSlaveActor……

    存储模块相关的组件,不论是Driver还是Worker,都是在SparkEnv 中实例化的

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
    
    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
      serializer, conf, securityManager, mapOutputTracker)
    
    val connectionManager = blockManager.connectionManager
    
    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    
    val cacheManager = new CacheManager(blockManager)

    注意到BlockManagermaster实例化参数中用到了registerOrLookup,具体判断是否为driver,如果是则创建Actor,否则获取driver的URL和Port,获取远程在driver上的Actor。即,只有driver(不论运行在哪个executor)运行着BlockManagerMaster,其他都只有actor的ref

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            actorSystem.actorOf(Props(newActor), name = name)
          } else {
            val driverHost: String = conf.get("spark.driver.host", "localhost")
            val driverPort: Int = conf.getInt("spark.driver.port", 7077)
            Utils.checkHost(driverHost, "Expected hostname")
            val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
            val timeout = AkkaUtils.lookupTimeout(conf)
            logInfo(s"Connecting to $name: $url")
            Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
          }
        }

    然后,实例化BlockManger,类定义中调用了initialize方法初始化,其定义如下。首先blockManagerMaster调用registerBlockManager,带上自己的slaveActor向driver注册BlockManager,其中第三个参数是slaveActor;然后通过BlockManagerWorker伴生对象调用startBlockManagerWorker,启动blockManagerWorker

    initialize() 
    
      /**
       * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
       * BlockManagerWorker actor.
       */
      private def initialize() {
        master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
        BlockManagerWorker.startBlockManagerWorker(this)
        if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
          heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
            Utils.tryOrExit { heartBeat() }
          }
        }
      }

    首先看一下slaveActor,定义如下,actorSystem创建BlockManagerSlaveActor

    val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
        name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)

    回到registerBlockManager,调用tell发送RegisterBlockManager消息

    /** Register the BlockManager's id with the driver. */
      def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
        logInfo("Trying to register BlockManager")
        tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
        logInfo("Registered BlockManager")
      }

    tell的作用是发送单程消息给master actor,即运行在driver上的blockManagerMasterActor

    /** Send a one-way message to the master actor, to which we expect it to reply with true. */
      private def tell(message: Any) {
        if (!askDriverWithReply[Boolean](message)) {
          throw new SparkException("BlockManagerMasterActor returned false, expected true.")
        }
      }

    在BlockManagerMaster中定义receive接收,调用register注册

    def receive = {
        case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
          register(blockManagerId, maxMemSize, slaveActor)
          sender ! true
    
        case UpdateBlockInfo(
          blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
          // TODO: Ideally we want to handle all the message replies in receive instead of in the
          // individual private methods.
          updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)
    
        case GetLocations(blockId) =>
          sender ! getLocations(blockId)
    
        case GetLocationsMultipleBlockIds(blockIds) =>
          sender ! getLocationsMultipleBlockIds(blockIds)
    
        case GetPeers(blockManagerId, size) =>
          sender ! getPeers(blockManagerId, size)
    
        case GetMemoryStatus =>
          sender ! memoryStatus
    
        case GetStorageStatus =>
          sender ! storageStatus
    
        case GetBlockStatus(blockId, askSlaves) =>
          sender ! blockStatus(blockId, askSlaves)
    
        case GetMatchingBlockIds(filter, askSlaves) =>
          sender ! getMatchingBlockIds(filter, askSlaves)
    
        case RemoveRdd(rddId) =>
          sender ! removeRdd(rddId)
    
        case RemoveShuffle(shuffleId) =>
          sender ! removeShuffle(shuffleId)
    
        case RemoveBroadcast(broadcastId, removeFromDriver) =>
          sender ! removeBroadcast(broadcastId, removeFromDriver)
    
        case RemoveBlock(blockId) =>
          removeBlockFromWorkers(blockId)
          sender ! true
    
        case RemoveExecutor(execId) =>
          removeExecutor(execId)
          sender ! true
    
        case StopBlockManagerMaster =>
          logInfo("Stopping BlockManagerMaster")
          sender ! true
          if (timeoutCheckingTask != null) {
            timeoutCheckingTask.cancel()
          }
          context.stop(self)
    
        case ExpireDeadHosts =>
          expireDeadHosts()
    
        case HeartBeat(blockManagerId) =>
          sender ! heartBeat(blockManagerId)
    
        case other =>
          logWarning("Got unknown message: " + other)
      }

    register判断如果不存在此blockManager,则将blockManagerId和executorId对应起来,并向blockManagerInfo中添加新实例slaveActor

    private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
        if (!blockManagerInfo.contains(id)) {
          blockManagerIdByExecutor.get(id.executorId) match {
            case Some(manager) =>
              // A block manager of the same executor already exists.
              // This should never happen. Let's just quit.
              logError("Got two different block manager registrations on " + id.executorId)
              System.exit(1)
            case None =>
              blockManagerIdByExecutor(id.executorId) = id
          }
          blockManagerInfo(id) =
            new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
        }
        listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
      }

    回到initialize(),第二句BlockManagerWorker.startBlockManagerWorker(this),实例化BlockManagerWorker

    def startBlockManagerWorker(manager: BlockManager) {
        blockManagerWorker = new BlockManagerWorker(manager)
      }

    BlockManagerWorker类定义中,监听远程的block存取请求来进行相应处理

    blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)

    最后,存储模块各组件关系如下图所示

    image

    END

  • 相关阅读:
    SpringMVC_day02
    Linux下安装jdk,tomcat,mysql
    SpringMVC_day01
    如何在虚拟机中安装kali linux
    Shell中 &>/dev/null和 >/dev/null 2>&1
    Linux 安全加固2
    linux 安全加固
    记一次修改php.ini不生效的踩坑之旅
    Redis 参数详解
    LAMP + Redis 主从复制
  • 原文地址:https://www.cnblogs.com/kevingu/p/4761844.html
Copyright © 2011-2022 走看看