zoukankan      html  css  js  c++  java
  • Spark源码分析 – BlockManager

    参考, Spark源码分析之-Storage模块

    对于storage, 为何Spark需要storage模块?为了cache RDD
    Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block
    所以storage模块,就是要实现RDD在memory和disk上的persistent功能

    首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave
    master负责track所有的slave BlockManager的BlockManagerInfo, 而BlockManagerInfo中又track了该BlockManager管理的所有的block的BlockStatus
    当slave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息
    典型的中心化设计, master和slave之间的通信通过actor来进行, 当然对于block数据的传输, 由于数据量比较大, 所以使用connectionManager(NIO或Netty)
    所以自然需要BlockManagerMasterActor和BlockManagerSlaveActor, 参考Spark 源码分析 – BlockManagerMaster&Slave

    其中还有个BlockManagerMaster,负责wrap BlockManagerMasterActor, 比较confusing的是每个节点都会创建这个BlockManagerMaster, 只是在slave中不会真正创建BlockManagerMasterActor, 而是Ref, 不好的设计
    而且由于BlockManager被master和slave公用, 所以提供了两者大部分接口, 而对于master部分都是直接wrap BlockManagerMaster, 而对于slave中的数据读写等就直接实现了, 设计不统一

    总之, storage这个模块, 设计比较随意, 不是很合理, 也体现在一些细小的命名上, 给分析和理解带来了一些困难.

     

    在SparkEnv的初始化中, 创建BlockManagerMaster和blockManager

        val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
          "BlockManagerMaster",
          new BlockManagerMasterActor(isLocal)))
        val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
        // 创建actor和actor ref
    // 对于BlockManagerMaster, 在master上创建BlockManagerMasterActor, 而在slave上创建BlockManagerMasterActor的ref
        def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            actorSystem.actorOf(Props(newActor), name = name)
          } else {
            val driverHost: String = System.getProperty("spark.driver.host", "localhost")
            val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
            Utils.checkHost(driverHost, "Expected hostname")
            val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
            logInfo("Connecting to " + name + ": " + url)
            actorSystem.actorFor(url)
          }
        }

     

    1 BlockManagerId

    BlockManagerId作为BlockManager唯一标识, 所以希望一个BlockManager只创建一个BlockManagerId 对象
    典型Singleton的场景
    在Scala里面实现Singleton比较晦涩, 这里是个典型的例子
    将所有的构造函数设为private, 然后利用伴生对象的来创建对象实例

    /**
     * This class represent an unique identifier for a BlockManager.
     * The first 2 constructors of this class is made private to ensure that
     * BlockManagerId objects can be created only using the apply method in
     * the companion object. This allows de-duplication of ID objects.
     * Also, constructor parameters are private to ensure that parameters cannot
     * be modified from outside this class.
     */
    private[spark] class BlockManagerId private (
        private var executorId_ : String,
        private var host_ : String,
        private var port_ : Int,
        private var nettyPort_ : Int
      ) extends Externalizable {
      private def this() = this(null, null, 0, 0)  // For deserialization only
    }
     
    private[spark] object BlockManagerId {
    
      /**
       * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton.
       *
       * @param execId ID of the executor.
       * @param host Host name of the block manager.
       * @param port Port of the block manager.
       * @param nettyPort Optional port for the Netty-based shuffle sender.
       * @return A new [[org.apache.spark.storage.BlockManagerId]].
       */
      def apply(execId: String, host: String, port: Int, nettyPort: Int) =
        getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
    
      def apply(in: ObjectInput) = {
        val obj = new BlockManagerId()
        obj.readExternal(in)
        getCachedBlockManagerId(obj)
      }
    
      val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
    
      def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
        blockManagerIdCache.putIfAbsent(id, id)
        blockManagerIdCache.get(id)
      }
    }

     

    2 BlockManager

    BlockManager是被master和slave公用的, 但对于master的逻辑都已经wrap在BlockManagerMaster中了
    所以这里主要分析一些slave相关的接口逻辑, reportBlockStatus, get, put
    其中put, get使用到memoryStore和diskStore, 参考Spark 源码分析 -- BlockStore

    private[spark] class BlockManager(
        executorId: String,
        actorSystem: ActorSystem,
        val master: BlockManagerMaster,
        val defaultSerializer: Serializer,
        maxMemory: Long)
      extends Logging {
      private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {} // BlockInfo的定义, 详细见下
    
      val shuffleBlockManager = new ShuffleBlockManager(this)
      private val blockInfo = new TimeStampedHashMap[String, BlockInfo] // 记录manage的所有block的BlockInfo [blockid,blockinfo]
    
      private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
      private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
      
      val blockManagerId = BlockManagerId(executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // BlockManagerId
      val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),   // 创建slaveActor, 貌似每个BlockManager都会创建slaveActor
        name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
     
      /**
       * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
       * BlockManagerWorker actor.
       */
      private def initialize() {
        master.registerBlockManager(blockManagerId, maxMemory, slaveActor) // 向master注册BlockManager, 如果本身就是driver, 啥都不做
        BlockManagerWorker.startBlockManagerWorker(this) // 创建BlockManagerWorker用于和remote传输block,block比较大所以无法用akka
        if (!BlockManager.getDisableHeartBeatsForTesting) {
          heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { // 设定scheduler定期发送hb
            heartBeat()
          }
        }
      }

    2.1 BlockInfo

    BlockInfo关键是对block做了访问互斥, 访问block前需要, 先waitForReady
    所以每个block, 都需要生成一个BlockInfo来经行互斥管理
    这个为啥叫BlockInfo?
    BlockManagerMasterActor中updateBlockInfo事件更新的不是这个BlockInfo, 而是BlockManagerInfo.BlockStatus, 不太合理!

      private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
        @volatile var pending: Boolean = true
        @volatile var size: Long = -1L
        @volatile var initThread: Thread = null
        @volatile var failed = false
    
        setInitThread()
    
        private def setInitThread() {
          // Set current thread as init thread - waitForReady will not block this thread
          // (in case there is non trivial initialization which ends up calling waitForReady as part of
          // initialization itself)
          this.initThread = Thread.currentThread()
        }
    
        /**
         * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
         * Return true if the block is available, false otherwise.
         */
        def waitForReady(): Boolean = {
          if (initThread != Thread.currentThread() && pending) {
            synchronized {
              while (pending) this.wait()
            }
          }
          !failed
        }
    
        /** Mark this BlockInfo as ready (i.e. block is finished writing) */
        def markReady(sizeInBytes: Long) {
          assert (pending)
          size = sizeInBytes
          initThread = null
          failed = false
          initThread = null
          pending = false
          synchronized {
            this.notifyAll()
          }
        }
    
        /** Mark this BlockInfo as ready but failed */
        def markFailure() {
          assert (pending)
          size = 0
          initThread = null
          failed = true
          initThread = null
          pending = false
          synchronized {
            this.notifyAll()
          }
        }
      }

    2.2 reportBlockStatus

      /**
       * Tell the master about the current storage status of a block. This will send a block update
       * message reflecting the current status, *not* the desired storage level in its block info.
       * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
       *
       * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
       * This ensures that update in master will compensate for the increase in memory on slave.
       */
      def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) {
        val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) // 如果返回false, 说明你发的blockid在master没有, 需要重新注册
        if (needReregister) {
          logInfo("Got told to reregister updating block " + blockId)
          // Reregistering will report our new block for free.
          asyncReregister()
        }
        logDebug("Told master about block " + blockId)
      }
      /**
       * Actually send a UpdateBlockInfo message. Returns the mater's response,
       * which will be true if the block was successfully recorded and false if
       * the slave needs to re-register.
       */
      private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
        val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
          info.level match {
            case null =>
              (StorageLevel.NONE, 0L, 0L, false)
            case level =>
              val inMem = level.useMemory && memoryStore.contains(blockId)
              val onDisk = level.useDisk && diskStore.contains(blockId)
              val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
              val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
              val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
              (storageLevel, memSize, diskSize, info.tellMaster)
          }
        }
        if (tellMaster) {  // 把当前block的情况, disk和memory的使用情况报告给master
          master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
        } else {
          true
        }
      }

    2.3 Get

      /**
       * Get block from local block manager, 在本地读取block
       */
      def getLocal(blockId: String): Option[Iterator[Any]] = {
        val info = blockInfo.get(blockId).orNull
        if (info != null) {
          info.synchronized { // 对block的互斥访问
            // In the another thread is writing the block, wait for it to become ready.
            if (!info.waitForReady()) { // 等待block ready, block只能被线性的写入
              // If we get here, the block write failed.
              logWarning("Block " + blockId + " was marked as failure.")
              return None
            }
    
            val level = info.level
            // Look for the block in memory
            if (level.useMemory) { // 如果storage level是用到memory的, 就先在memoryStore中试图取这个block 
               memoryStore.getValues(blockId) match { 
                case Some(iterator) =>
                  return Some(iterator) // 直接返回iterator
                case None =>
                  logDebug("Block " + blockId + " not found in memory")
              }
            }
    
            //前面在memory中没有找到, 所以继续在disk里面找 
            //Look for block on disk, potentially loading it back into memory if required
            if (level.useDisk) {
              if (level.useMemory && level.deserialized) { // MEMORY_AND_DISK, 没有序列化, 部分数据在disk
                diskStore.getValues(blockId) match {
                  case Some(iterator) =>  // 从disk中取出这个block, 并重新放到memory中
                    // Put the block back in memory before returning it
                    // TODO: Consider creating a putValues that also takes in a iterator ?
                    val elements = new ArrayBuffer[Any]
                    elements ++= iterator
                    memoryStore.putValues(blockId, elements, level, true).data match {
                      case Left(iterator2) => // 期望从putValues中得到存入block的iterator
                        return Some(iterator2)
                      case _ =>
                        throw new Exception("Memory store did not return back an iterator")
                    }
                  case None =>
                    throw new Exception("Block " + blockId + " not found on disk, though it should be")
                }
              } else if (level.useMemory && !level.deserialized) { // MEMORY_AND_DISK_SER, 序列化
                // Read it as a byte buffer into memory first, then return it
                diskStore.getBytes(blockId) match { // 由于读取的是序列化数据, 使用getBytes
                  case Some(bytes) =>
                    // Put a copy of the block back in memory before returning it. Note that we can't
                    // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
                    // The use of rewind assumes this.
                    assert (0 == bytes.position())
                    val copyForMemory = ByteBuffer.allocate(bytes.limit)
                    copyForMemory.put(bytes)
                    memoryStore.putBytes(blockId, copyForMemory, level) // 在memoryStore中缓存的仍然是序列化数据
                    bytes.rewind() // 反序列化需要重新读数据, 所以rewind
                    return Some(dataDeserialize(blockId, bytes)) // 但返回的需要反序列化后的数据
                  case None =>
                    throw new Exception("Block " + blockId + " not found on disk, though it should be")
                }
              } else { // DISK_ONLY, 没啥说的, 直接取disk读
                diskStore.getValues(blockId) match {
                  case Some(iterator) =>
                    return Some(iterator)
                  case None =>
                    throw new Exception("Block " + blockId + " not found on disk, though it should be")
                }
              }
            }
          }
        } else {
          logDebug("Block " + blockId + " not registered locally")
        }
        return None
      }
      /**
       * Get block from the local block manager as serialized bytes.
       */
      def getLocalBytes(blockId: String): Option[ByteBuffer] = {
      //逻辑更简单......
    }
     
      /**
       * Get block from remote block managers.
       */
      def getRemote(blockId: String): Option[Iterator[Any]] = {
        // Get locations of block
        val locations = master.getLocations(blockId)
        // Get block from remote locations
        for (loc <- locations) {
          val data = BlockManagerWorker.syncGetBlock( //使用BlockManagerWorker从remote获取block
              GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
          if (data != null) {
            return Some(dataDeserialize(blockId, data))
          }
          logDebug("The value of block " + blockId + " is null")
        }
        logDebug("Block " + blockId + " not found")
        return None
      }

    2.3 Put

      /**
       * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
       */
      def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
        tellMaster: Boolean = true) : Long = {
        // Remember the block's storage level so that we can correctly drop it to disk if it needs
        // to be dropped right after it got put into memory. Note, however, that other threads will
        // not be able to get() this block until we call markReady on its BlockInfo.
        val myInfo = { 
          val tinfo = new BlockInfo(level, tellMaster) // 创建新的BlockInfo
          // Do atomically !
          val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) // check blockid的blockinfo是否已经存在
    
          if (oldBlockOpt.isDefined) { // 如果存在就需要互斥
            if (oldBlockOpt.get.waitForReady()) {
              logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
              return oldBlockOpt.get.size
            }
            // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
            oldBlockOpt.get
          } else {
            tinfo
          }
        }
    
        // If we need to replicate the data, we'll want access to the values, but because our
        // put will read the whole iterator, there will be no values left. For the case where
        // the put serializes data, we'll remember the bytes, above; but for the case where it
        // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
        var valuesAfterPut: Iterator[Any] = null
    
        // Ditto for the bytes after the put
        var bytesAfterPut: ByteBuffer = null
    
        // Size of the block in bytes (to return to caller)
        var size = 0L
    
        myInfo.synchronized { // 加锁, 开始真正的put
          var marked = false
          try {
            if (level.useMemory) { // 如果可以用memory, 优先放memory里面
              // Save it just to memory first, even if it also has useDisk set to true; we will later
              // drop it to disk if the memory store can't hold it.
              val res = memoryStore.putValues(blockId, values, level, true)
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case Left(newIterator) => valuesAfterPut = newIterator
              }
            } else { // 否则存到disk上
              // Save directly to disk.
              // Don't get back the bytes unless we replicate them.
              val askForBytes = level.replication > 1
              val res = diskStore.putValues(blockId, values, level, askForBytes)
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case _ =>
              }
            }
    
            // Now that the block is in either the memory or disk store, let other threads read it,
            // and tell the master about it.
            marked = true  // 释放blockinfo上的互斥条件, 让其他线程可以访问改block
            myInfo.markReady(size)
            if (tellMaster) {
              reportBlockStatus(blockId, myInfo) // 通知master, block状态变化
            }
          } finally {
            // If we failed at putting the block to memory/disk, notify other possible readers
            // that it has failed, and then remove it from the block info map.
            if (! marked) { // 如果put失败, 需要做些clear工作
              // Note that the remove must happen before markFailure otherwise another thread
              // could've inserted a new BlockInfo before we remove it.
              blockInfo.remove(blockId)
              myInfo.markFailure()
              logWarning("Putting block " + blockId + " failed")
            }
          }
        }
        // Replicate block if required
        if (level.replication > 1) {
          val remoteStartTime = System.currentTimeMillis
          // Serialize the block if not already done
          if (bytesAfterPut == null) {
            if (valuesAfterPut == null) {
              throw new SparkException(
                "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, level) // 做replicate
          logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
        }
        BlockManager.dispose(bytesAfterPut)
    
        return size
      }
     
      /**
       * Put a new block of serialized bytes to the block manager.
       */
      def putBytes(
        blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
        //逻辑比较简单......
      }

     

      /**
       * Replicate block to another node.
       */
      var cachedPeers: Seq[BlockManagerId] = null
      private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
        val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
        if (cachedPeers == null) {
          cachedPeers = master.getPeers(blockManagerId, level.replication - 1) //找到可用于replica的peers
        }
        for (peer: BlockManagerId <- cachedPeers) {  //把需要replica的block放到这些peer上去
          val start = System.nanoTime
          data.rewind()
          if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), //通过BlockManagerWorker传输block数据
            new ConnectionManagerId(peer.host, peer.port))) {
            logError("Failed to call syncPutBlock to " + peer)
          }
          logDebug("Replicated BlockId " + blockId + " once used " +
            (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
            data.limit() + " bytes.")
        }
      }

    2.3 dropFromMemory

      /**
       * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
       * store reaches its limit and needs to free up space.
       */
      def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
        logInfo("Dropping block " + blockId + " from memory")
        val info = blockInfo.get(blockId).orNull
        if (info != null)  {
          info.synchronized {  //获取blockInfo的互斥
            // required ? As of now, this will be invoked only for blocks which are ready
            // But in case this changes in future, adding for consistency sake.
            if (! info.waitForReady() ) {
              // If we get here, the block write failed.
              logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
              return
            }
    
            val level = info.level
            if (level.useDisk && !diskStore.contains(blockId)) { // 如果使用disk, 就把memory中要删除的写入disk
              logInfo("Writing block " + blockId + " to disk")
              data match {
                case Left(elements) =>
                  diskStore.putValues(blockId, elements, level, false)
                case Right(bytes) =>
                  diskStore.putBytes(blockId, bytes, level)
              }
            }
            val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L // 计算出从memory中drop掉的size
            val blockWasRemoved = memoryStore.remove(blockId)  // 从memoryStore drop掉block
            if (info.tellMaster) {
              reportBlockStatus(blockId, info, droppedMemorySize) // 通知master, block信息变化
            }
            if (!level.useDisk) {
              // The block is completely gone from this node; forget it so we can put() it again later.
              blockInfo.remove(blockId) // 如果没有使用disk, 那么从memory中删除, 意味着完全删除这个block
            }
          }
        } else {
          // The block has already been dropped
        }
      }
  • 相关阅读:
    2018北美部分CS项目学费
    APP接口自动化测试JAVA+TestNG(二)之TestNG简介与基础实例
    浅谈MITM攻击之信息窃取(解密315晚会报道的免费WIFI窃取个人信息)
    APP接口自动化测试JAVA+TestNG(一)之框架环境搭建
    Android测试提升效率批处理脚本(二)
    Android APP压力测试(三)之Monkey日志自动分析脚本
    Android系统build.prop文件
    Android APP压力测试(二)之Monkey信息自动收集脚本
    Android APP压力测试(一)之Monkey工具介绍
    Android反编译(三)之重签名
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3513369.html
Copyright © 2011-2022 走看看