zoukankan      html  css  js  c++  java
  • Spark 源码分析 – BlockManagerMaster&Slave

     

    BlockManagerMaster

    只是维护一系列对BlockManagerMasterActor的接口, 所有的都是通过tell和askDriverWithReply从BlockManagerMasterActor获取数据
    比较鸡肋的类

    private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
      /** Remove a dead executor from the driver actor. This is only called on the driver side. */
      def removeExecutor(execId: String) 
      /**
       * Send the driver actor a heart beat from the slave. Returns true if everything works out,
       * false if the driver does not know about the given block manager, which means the block
       * manager should re-register.
       */
      def sendHeartBeat(blockManagerId: BlockManagerId): Boolean 
      /** Register the BlockManager's id with the driver. */
      def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef)
      def updateBlockInfo(
          blockManagerId: BlockManagerId,
          blockId: String,
          storageLevel: StorageLevel,
          memSize: Long,
          diskSize: Long): Boolean 
      /** Get locations of the blockId from the driver */
      def getLocations(blockId: String): Seq[BlockManagerId] 
      /** Get locations of multiple blockIds from the driver */
      def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] 
      /** Get ids of other nodes in the cluster from the driver */
      def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId]
      /**
       * Remove a block from the slaves that have it. This can only be used to remove
       * blocks that the driver knows about.
       */
      def removeBlock(blockId: String) 
      /**
       * Remove all blocks belonging to the given RDD.
       */
      def removeRdd(rddId: Int, blocking: Boolean) 
      /**
       * Return the memory status for each block manager, in the form of a map from
       * the block manager's id to two long values. The first value is the maximum
       * amount of memory allocated for the block manager, while the second is the
       * amount of remaining memory.
       */
      def getMemoryStatus: Map[BlockManagerId, (Long, Long)] 
      def getStorageStatus: Array[StorageStatus] 
      /** Stop the driver actor, called only on the Spark driver node */
      def stop() {
        if (driverActor != null) {
          tell(StopBlockManagerMaster)
          driverActor = null
          logInfo("BlockManagerMaster stopped")
        }
      }
    
      /** Send a one-way message to the master actor, to which we expect it to reply with true. */
      private def tell(message: Any) {
        if (!askDriverWithReply[Boolean](message)) {
          throw new SparkException("BlockManagerMasterActor returned false, expected true.")
        }
      }
    
      /**
       * Send a message to the driver actor and get its result within a default timeout, or
       * throw a SparkException if this fails.
       */
      private def askDriverWithReply[T](message: Any): T = {
        // TODO: Consider removing multiple attempts
        if (driverActor == null) {
          throw new SparkException("Error sending message to BlockManager as driverActor is null" +
            "[message =" + message + "]")
        }
        var attempts = 0
        var lastException: Exception = null
        while (attempts < AKKA_RETRY_ATTEMPTS) {
          attempts += 1
          try {
            val future = driverActor.ask(message)(timeout)
            val result = Await.result(future, timeout)
            if (result == null) {
              throw new SparkException("BlockManagerMaster returned null")
            }
            return result.asInstanceOf[T]
          } catch {
            case ie: InterruptedException => throw ie
            case e: Exception =>
              lastException = e
              logWarning("Error sending message to BlockManagerMaster in" + attempts + " attempts", e)
          }
          Thread.sleep(AKKA_RETRY_INTERVAL_MS)
        }
        throw new SparkException(
          "Error sending message to BlockManagerMaster [message =" + message + "]", lastException)
      }
    }

     

    BlockManagerInfo

    在BlockManagerMasterActor object中主要就是定义BlockManagerInfo
    主要用于管理BlockManager下面的所有block的BlockStatus和hb, 更新和删除

    为何要定义在这个地方?

    private[spark]
    object BlockManagerMasterActor {
      case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
    
      class BlockManagerInfo(
          val blockManagerId: BlockManagerId,
          timeMs: Long,
          val maxMem: Long,
          val slaveActor: ActorRef)
        extends Logging {
        private var _remainingMem: Long = maxMem  //BlockManager的memory大小
        private var _lastSeenMs: Long = timeMs    //BlockManager的heartbeat, 会被不停的更新       
        // Mapping from block id to its status.
        private val _blocks = new JHashMap[String, BlockStatus] // buffer每个block的BlockStatus
        
        // 这里的memSize, 默认为0, 意思是droppedMemorySize
        def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
          if (_blocks.containsKey(blockId)) {
            // The block exists on the slave already.
            val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
            if (originalLevel.useMemory) {
              _remainingMem += memSize
            }
          }
    
          if (storageLevel.isValid) {// isValid means it is either stored in-memory or on-disk.
            _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
            if (storageLevel.useMemory) {
              _remainingMem -= memSize
            }
          } else if (_blocks.containsKey(blockId)) {
            // If isValid is not true, drop the block.
            val blockStatus: BlockStatus = _blocks.get(blockId)
            _blocks.remove(blockId)
            if (blockStatus.storageLevel.useMemory) {
              _remainingMem += blockStatus.memSize
            }
          }
        }
    
        def removeBlock(blockId: String) {
          if (_blocks.containsKey(blockId)) {
            _remainingMem += _blocks.get(blockId).memSize
            _blocks.remove(blockId)
          }
        }
      }
    }

     

    BlockManagerMasterActor

    维护各个slave的BlockManagerInfo信息, 以及各个block的locations信息(所属哪个BlockManager) 
    核心功能就是管理和更新这些元数据,
    RegisterBlockManager
    updateBlockInfo
    heartBeat
    RemoveRDD, Executor(BlockManager), Block

    /**
     * BlockManagerMasterActor is an actor on the master node to track statuses of
     * all slaves' block managers.
     */
    private[spark]
    class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
      // Mapping from block manager id to the block manager's information.
      // Buffer所有的BlockManager的Info 
      private val blockManagerInfo =
        new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
    
      // Mapping from executor ID to block manager ID.
      private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
    
      // Mapping from block id to the set of block managers that have the block.
      // Buffer blockLocation,这里用BlockManagerId来表示location,因为从BlockManagerId可以知道对应的executor
      private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]

      def receive = {
        case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
          register(blockManagerId, maxMemSize, slaveActor)
          sender ! true // BlockManagerMaster.tell要求返回true
        // ……这里接收的和BlockManagerMaster中的接口一致, 省略
      }


       // 处理RegisterBlockManager event, 用于slave向master注册自己的blockmanager
      // 主要就是将slave的BlockManagerInfo注册到master中

      private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
        if (id.executorId == "<driver>" && !isLocal) { // 如果本身就是driver,就不需要注册
          // Got a register message from the master node; don't register it
        } else if (!blockManagerInfo.contains(id)) { // 如果包含,说明已经注册过
          blockManagerIdByExecutor.get(id.executorId) match {
            case Some(manager) => // 一个executor应该只有一个bm, 所以如果该executor已经注册过bm …… 
              // A block manager of the same executor already exists.
              // This should never happen. Let's just quit.
              logError("Got two different block manager registrations on " + id.executorId)
              System.exit(1)
            case None =>
              blockManagerIdByExecutor(id.executorId) = id
          }
          blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( // 创建新的BlockManagerInfo, 并buffer在blockManagerInfo中
            id, System.currentTimeMillis(), maxMemSize, slaveActor)
        }
      }

     

      // 处理updateBlockInfo

      private def updateBlockInfo(
          blockManagerId: BlockManagerId,
          blockId: String,
          storageLevel: StorageLevel,
          memSize: Long,
          diskSize: Long) {
    
        if (!blockManagerInfo.contains(blockManagerId)) { //blockManagerInfo中不包含这个blockManagerId 
          if (blockManagerId.executorId == "<driver>" && !isLocal) {
            // We intentionally do not register the master (except in local mode),
            // so we should not indicate failure.
            sender ! true
          } else {
            sender ! false
          }
          return
        }
        //调用BlockManagerInfo.updateBlockInfo
        blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
        var locations: mutable.HashSet[BlockManagerId] = null
        if (blockLocations.containsKey(blockId)) {
          locations = blockLocations.get(blockId)
        } else {
          locations = new mutable.HashSet[BlockManagerId]
          blockLocations.put(blockId, locations) //缓存该block的location信息
        }
    
        if (storageLevel.isValid) {
          locations.add(blockManagerId)
        } else {
          locations.remove(blockManagerId)
        }
    
        // Remove the block from master tracking if it has been removed on all slaves.
        if (locations.size == 0) {
          blockLocations.remove(blockId)
        }
        sender ! true
      }


        // 处理removeRdd, 删除RDD

      private def removeRdd(rddId: Int): Future[Seq[Int]] = {
        // First remove the metadata for the given RDD, and then asynchronously remove the blocks from the slaves.
        val prefix = "rdd_" + rddId + "_"
        // Find all blocks for the given RDD, remove the block from both blockLocations and
        // the blockManagerInfo that is tracking the blocks.
        val blocks = blockLocations.keySet().filter(_.startsWith(prefix)) // 从blockLocations中找出所有该RDD对应的blocks
        blocks.foreach { blockId =>  // 从blockManagerInfo和blockLocations中去除这些blocks信息
          val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
          bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
          blockLocations.remove(blockId)
        }
        // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
        // The dispatcher is used as an implicit argument into the Future sequence construction.
        import context.dispatcher
        val removeMsg = RemoveRdd(rddId)
        Future.sequence(blockManagerInfo.values.map { bm =>   // Future.sequence, Transforms a Traversable[Future[A]] into a Future[Traversable[A]
          bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int] // 将RemoveRDD的msg发送给每个slave actors
        }.toSeq)
      }
      
      //处理removeExecutor
    //删除Executor上的BlockManager, 名字起的不好
      private def removeExecutor(execId: String) {
        logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
        blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
      }
      private def removeBlockManager(blockManagerId: BlockManagerId) {
        val info = blockManagerInfo(blockManagerId)
    
        // Remove the block manager from blockManagerIdByExecutor.
        blockManagerIdByExecutor -= blockManagerId.executorId
    
        // Remove it from blockManagerInfo and remove all the blocks.
        blockManagerInfo.remove(blockManagerId)
        val iterator = info.blocks.keySet.iterator
        while (iterator.hasNext) {
          val blockId = iterator.next
          val locations = blockLocations.get(blockId)
          locations -= blockManagerId
          if (locations.size == 0) {
            blockLocations.remove(locations)
          }
        }
      }

      // 处理sendHeartBeat
      // blockManager的hb通过blockManagerInfo的LastSeenMs来表示

      private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
        if (!blockManagerInfo.contains(blockManagerId)) {
          blockManagerId.executorId == "<driver>" && !isLocal
        } else {
          blockManagerInfo(blockManagerId).updateLastSeenMs()
          true
        }
      }


       // 处理removeBlock

      // Remove a block from the slaves that have it. This can only be used to remove
      // blocks that the master knows about.
      private def removeBlockFromWorkers(blockId: String) {
        val locations = blockLocations.get(blockId)
        if (locations != null) {
          locations.foreach { blockManagerId: BlockManagerId =>
            val blockManager = blockManagerInfo.get(blockManagerId)
            if (blockManager.isDefined) {
              // Remove the block from the slave's BlockManager.
              // Doesn't actually wait for a confirmation and the message might get lost.
              // If message loss becomes frequent, we should add retry logic here.
              blockManager.get.slaveActor ! RemoveBlock(blockId)
            }
          }
        }
      }

     

    BlockManagerSlaveActor

    Master可用发给的slave的message就2种, 所以很简单...过于简单
    因为他只处理master发送来的event, 而大部分对于数据的读写等, 在BlockManager中直接实现了

    /**
     * An actor to take commands from the master to execute options. For example,
     * this is used to remove blocks from the slave's BlockManager.
     */
    class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
      override def receive = {
        case RemoveBlock(blockId) =>
          blockManager.removeBlock(blockId)
        case RemoveRdd(rddId) =>
          val numBlocksRemoved = blockManager.removeRdd(rddId)
          sender ! numBlocksRemoved
      }
    }
  • 相关阅读:
    Java面试基础 -- Git篇
    Java面试基础
    如何避免死锁?
    如何减少上下文切换?
    Java中的volatile变量有什么作用?
    Thread类中start()方法喝run()方法有什么不同?
    (一)java异常处理的几个问题
    SUSE CaaS Platform 4
    SUSE CaaS Platform 4
    SUSE Ceph 增加节点、减少节点、 删除OSD磁盘等操作
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3513326.html
Copyright © 2011-2022 走看看