zoukankan      html  css  js  c++  java
  • Spark源码分析 – BlockManager

    参考, Spark源码分析之-Storage模块

    对于storage, 为何Spark需要storage模块?为了cache RDD
    Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block
    所以storage模块,就是要实现RDD在memory和disk上的persistent功能

    首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave
    master负责track所有的slave BlockManager的BlockManagerInfo, 而BlockManagerInfo中又track了该BlockManager管理的所有的block的BlockStatus
    当slave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息
    典型的中心化设计, master和slave之间的通信通过actor来进行, 当然对于block数据的传输, 由于数据量比较大, 所以使用connectionManager(NIO或Netty)
    所以自然需要BlockManagerMasterActor和BlockManagerSlaveActor, 参考Spark 源码分析 – BlockManagerMaster&Slave

    其中还有个BlockManagerMaster,负责wrap BlockManagerMasterActor, 比较confusing的是每个节点都会创建这个BlockManagerMaster, 只是在slave中不会真正创建BlockManagerMasterActor, 而是Ref, 不好的设计
    而且由于BlockManager被master和slave公用, 所以提供了两者大部分接口, 而对于master部分都是直接wrap BlockManagerMaster, 而对于slave中的数据读写等就直接实现了, 设计不统一

    总之, storage这个模块, 设计比较随意, 不是很合理, 也体现在一些细小的命名上, 给分析和理解带来了一些困难.

     

    在SparkEnv的初始化中, 创建BlockManagerMaster和blockManager

        val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
          "BlockManagerMaster",
          new BlockManagerMasterActor(isLocal)))
        val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
        // 创建actor和actor ref
    // 对于BlockManagerMaster, 在master上创建BlockManagerMasterActor, 而在slave上创建BlockManagerMasterActor的ref
        def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            actorSystem.actorOf(Props(newActor), name = name)
          } else {
            val driverHost: String = System.getProperty("spark.driver.host", "localhost")
            val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
            Utils.checkHost(driverHost, "Expected hostname")
            val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
            logInfo("Connecting to " + name + ": " + url)
            actorSystem.actorFor(url)
          }
        }

     

    1 BlockManagerId

    BlockManagerId作为BlockManager唯一标识, 所以希望一个BlockManager只创建一个BlockManagerId 对象
    典型Singleton的场景
    在Scala里面实现Singleton比较晦涩, 这里是个典型的例子
    将所有的构造函数设为private, 然后利用伴生对象的来创建对象实例

    /**
     * This class represent an unique identifier for a BlockManager.
     * The first 2 constructors of this class is made private to ensure that
     * BlockManagerId objects can be created only using the apply method in
     * the companion object. This allows de-duplication of ID objects.
     * Also, constructor parameters are private to ensure that parameters cannot
     * be modified from outside this class.
     */
    private[spark] class BlockManagerId private (
        private var executorId_ : String,
        private var host_ : String,
        private var port_ : Int,
        private var nettyPort_ : Int
      ) extends Externalizable {
      private def this() = this(null, null, 0, 0)  // For deserialization only
    }
     
    private[spark] object BlockManagerId {
    
      /**
       * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton.
       *
       * @param execId ID of the executor.
       * @param host Host name of the block manager.
       * @param port Port of the block manager.
       * @param nettyPort Optional port for the Netty-based shuffle sender.
       * @return A new [[org.apache.spark.storage.BlockManagerId]].
       */
      def apply(execId: String, host: String, port: Int, nettyPort: Int) =
        getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
    
      def apply(in: ObjectInput) = {
        val obj = new BlockManagerId()
        obj.readExternal(in)
        getCachedBlockManagerId(obj)
      }
    
      val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
    
      def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
        blockManagerIdCache.putIfAbsent(id, id)
        blockManagerIdCache.get(id)
      }
    }

     

    2 BlockManager

    BlockManager是被master和slave公用的, 但对于master的逻辑都已经wrap在BlockManagerMaster中了
    所以这里主要分析一些slave相关的接口逻辑, reportBlockStatus, get, put
    其中put, get使用到memoryStore和diskStore, 参考Spark 源码分析 -- BlockStore

    private[spark] class BlockManager(
        executorId: String,
        actorSystem: ActorSystem,
        val master: BlockManagerMaster,
        val defaultSerializer: Serializer,
        maxMemory: Long)
      extends Logging {
      private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {} // BlockInfo的定义, 详细见下
    
      val shuffleBlockManager = new ShuffleBlockManager(this)
      private val blockInfo = new TimeStampedHashMap[String, BlockInfo] // 记录manage的所有block的BlockInfo [blockid,blockinfo]
    
      private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
      private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
      
      val blockManagerId = BlockManagerId(executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // BlockManagerId
      val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),   // 创建slaveActor, 貌似每个BlockManager都会创建slaveActor
        name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
     
      /**
       * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
       * BlockManagerWorker actor.
       */
      private def initialize() {
        master.registerBlockManager(blockManagerId, maxMemory, slaveActor) // 向master注册BlockManager, 如果本身就是driver, 啥都不做
        BlockManagerWorker.startBlockManagerWorker(this) // 创建BlockManagerWorker用于和remote传输block,block比较大所以无法用akka
        if (!BlockManager.getDisableHeartBeatsForTesting) {
          heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { // 设定scheduler定期发送hb
            heartBeat()
          }
        }
      }

    2.1 BlockInfo

    BlockInfo关键是对block做了访问互斥, 访问block前需要, 先waitForReady
    所以每个block, 都需要生成一个BlockInfo来经行互斥管理
    这个为啥叫BlockInfo?
    BlockManagerMasterActor中updateBlockInfo事件更新的不是这个BlockInfo, 而是BlockManagerInfo.BlockStatus, 不太合理!

      private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
        @volatile var pending: Boolean = true
        @volatile var size: Long = -1L
        @volatile var initThread: Thread = null
        @volatile var failed = false
    
        setInitThread()
    
        private def setInitThread() {
          // Set current thread as init thread - waitForReady will not block this thread
          // (in case there is non trivial initialization which ends up calling waitForReady as part of
          // initialization itself)
          this.initThread = Thread.currentThread()
        }
    
        /**
         * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
         * Return true if the block is available, false otherwise.
         */
        def waitForReady(): Boolean = {
          if (initThread != Thread.currentThread() && pending) {
            synchronized {
              while (pending) this.wait()
            }
          }
          !failed
        }
    
        /** Mark this BlockInfo as ready (i.e. block is finished writing) */
        def markReady(sizeInBytes: Long) {
          assert (pending)
          size = sizeInBytes
          initThread = null
          failed = false
          initThread = null
          pending = false
          synchronized {
            this.notifyAll()
          }
        }
    
        /** Mark this BlockInfo as ready but failed */
        def markFailure() {
          assert (pending)
          size = 0
          initThread = null
          failed = true
          initThread = null
          pending = false
          synchronized {
            this.notifyAll()
          }
        }
      }

    2.2 reportBlockStatus

      /**
       * Tell the master about the current storage status of a block. This will send a block update
       * message reflecting the current status, *not* the desired storage level in its block info.
       * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
       *
       * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
       * This ensures that update in master will compensate for the increase in memory on slave.
       */
      def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) {
        val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) // 如果返回false, 说明你发的blockid在master没有, 需要重新注册
        if (needReregister) {
          logInfo("Got told to reregister updating block " + blockId)
          // Reregistering will report our new block for free.
          asyncReregister()
        }
        logDebug("Told master about block " + blockId)
      }
      /**
       * Actually send a UpdateBlockInfo message. Returns the mater's response,
       * which will be true if the block was successfully recorded and false if
       * the slave needs to re-register.
       */
      private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
        val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
          info.level match {
            case null =>
              (StorageLevel.NONE, 0L, 0L, false)
            case level =>
              val inMem = level.useMemory && memoryStore.contains(blockId)
              val onDisk = level.useDisk && diskStore.contains(blockId)
              val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
              val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
              val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
              (storageLevel, memSize, diskSize, info.tellMaster)
          }
        }
        if (tellMaster) {  // 把当前block的情况, disk和memory的使用情况报告给master
          master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
        } else {
          true
        }
      }

    2.3 Get

      /**
       * Get block from local block manager, 在本地读取block
       */
      def getLocal(blockId: String): Option[Iterator[Any]] = {
        val info = blockInfo.get(blockId).orNull
        if (info != null) {
          info.synchronized { // 对block的互斥访问
            // In the another thread is writing the block, wait for it to become ready.
            if (!info.waitForReady()) { // 等待block ready, block只能被线性的写入
              // If we get here, the block write failed.
              logWarning("Block " + blockId + " was marked as failure.")
              return None
            }
    
            val level = info.level
            // Look for the block in memory
            if (level.useMemory) { // 如果storage level是用到memory的, 就先在memoryStore中试图取这个block 
               memoryStore.getValues(blockId) match { 
                case Some(iterator) =>
                  return Some(iterator) // 直接返回iterator
                case None =>
                  logDebug("Block " + blockId + " not found in memory")
              }
            }
    
            //前面在memory中没有找到, 所以继续在disk里面找 
            //Look for block on disk, potentially loading it back into memory if required
            if (level.useDisk) {
              if (level.useMemory && level.deserialized) { // MEMORY_AND_DISK, 没有序列化, 部分数据在disk
                diskStore.getValues(blockId) match {
                  case Some(iterator) =>  // 从disk中取出这个block, 并重新放到memory中
                    // Put the block back in memory before returning it
                    // TODO: Consider creating a putValues that also takes in a iterator ?
                    val elements = new ArrayBuffer[Any]
                    elements ++= iterator
                    memoryStore.putValues(blockId, elements, level, true).data match {
                      case Left(iterator2) => // 期望从putValues中得到存入block的iterator
                        return Some(iterator2)
                      case _ =>
                        throw new Exception("Memory store did not return back an iterator")
                    }
                  case None =>
                    throw new Exception("Block " + blockId + " not found on disk, though it should be")
                }
              } else if (level.useMemory && !level.deserialized) { // MEMORY_AND_DISK_SER, 序列化
                // Read it as a byte buffer into memory first, then return it
                diskStore.getBytes(blockId) match { // 由于读取的是序列化数据, 使用getBytes
                  case Some(bytes) =>
                    // Put a copy of the block back in memory before returning it. Note that we can't
                    // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
                    // The use of rewind assumes this.
                    assert (0 == bytes.position())
                    val copyForMemory = ByteBuffer.allocate(bytes.limit)
                    copyForMemory.put(bytes)
                    memoryStore.putBytes(blockId, copyForMemory, level) // 在memoryStore中缓存的仍然是序列化数据
                    bytes.rewind() // 反序列化需要重新读数据, 所以rewind
                    return Some(dataDeserialize(blockId, bytes)) // 但返回的需要反序列化后的数据
                  case None =>
                    throw new Exception("Block " + blockId + " not found on disk, though it should be")
                }
              } else { // DISK_ONLY, 没啥说的, 直接取disk读
                diskStore.getValues(blockId) match {
                  case Some(iterator) =>
                    return Some(iterator)
                  case None =>
                    throw new Exception("Block " + blockId + " not found on disk, though it should be")
                }
              }
            }
          }
        } else {
          logDebug("Block " + blockId + " not registered locally")
        }
        return None
      }
      /**
       * Get block from the local block manager as serialized bytes.
       */
      def getLocalBytes(blockId: String): Option[ByteBuffer] = {
      //逻辑更简单......
    }
     
      /**
       * Get block from remote block managers.
       */
      def getRemote(blockId: String): Option[Iterator[Any]] = {
        // Get locations of block
        val locations = master.getLocations(blockId)
        // Get block from remote locations
        for (loc <- locations) {
          val data = BlockManagerWorker.syncGetBlock( //使用BlockManagerWorker从remote获取block
              GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
          if (data != null) {
            return Some(dataDeserialize(blockId, data))
          }
          logDebug("The value of block " + blockId + " is null")
        }
        logDebug("Block " + blockId + " not found")
        return None
      }

    2.3 Put

      /**
       * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
       */
      def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
        tellMaster: Boolean = true) : Long = {
        // Remember the block's storage level so that we can correctly drop it to disk if it needs
        // to be dropped right after it got put into memory. Note, however, that other threads will
        // not be able to get() this block until we call markReady on its BlockInfo.
        val myInfo = { 
          val tinfo = new BlockInfo(level, tellMaster) // 创建新的BlockInfo
          // Do atomically !
          val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) // check blockid的blockinfo是否已经存在
    
          if (oldBlockOpt.isDefined) { // 如果存在就需要互斥
            if (oldBlockOpt.get.waitForReady()) {
              logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
              return oldBlockOpt.get.size
            }
            // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
            oldBlockOpt.get
          } else {
            tinfo
          }
        }
    
        // If we need to replicate the data, we'll want access to the values, but because our
        // put will read the whole iterator, there will be no values left. For the case where
        // the put serializes data, we'll remember the bytes, above; but for the case where it
        // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
        var valuesAfterPut: Iterator[Any] = null
    
        // Ditto for the bytes after the put
        var bytesAfterPut: ByteBuffer = null
    
        // Size of the block in bytes (to return to caller)
        var size = 0L
    
        myInfo.synchronized { // 加锁, 开始真正的put
          var marked = false
          try {
            if (level.useMemory) { // 如果可以用memory, 优先放memory里面
              // Save it just to memory first, even if it also has useDisk set to true; we will later
              // drop it to disk if the memory store can't hold it.
              val res = memoryStore.putValues(blockId, values, level, true)
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case Left(newIterator) => valuesAfterPut = newIterator
              }
            } else { // 否则存到disk上
              // Save directly to disk.
              // Don't get back the bytes unless we replicate them.
              val askForBytes = level.replication > 1
              val res = diskStore.putValues(blockId, values, level, askForBytes)
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case _ =>
              }
            }
    
            // Now that the block is in either the memory or disk store, let other threads read it,
            // and tell the master about it.
            marked = true  // 释放blockinfo上的互斥条件, 让其他线程可以访问改block
            myInfo.markReady(size)
            if (tellMaster) {
              reportBlockStatus(blockId, myInfo) // 通知master, block状态变化
            }
          } finally {
            // If we failed at putting the block to memory/disk, notify other possible readers
            // that it has failed, and then remove it from the block info map.
            if (! marked) { // 如果put失败, 需要做些clear工作
              // Note that the remove must happen before markFailure otherwise another thread
              // could've inserted a new BlockInfo before we remove it.
              blockInfo.remove(blockId)
              myInfo.markFailure()
              logWarning("Putting block " + blockId + " failed")
            }
          }
        }
        // Replicate block if required
        if (level.replication > 1) {
          val remoteStartTime = System.currentTimeMillis
          // Serialize the block if not already done
          if (bytesAfterPut == null) {
            if (valuesAfterPut == null) {
              throw new SparkException(
                "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, level) // 做replicate
          logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
        }
        BlockManager.dispose(bytesAfterPut)
    
        return size
      }
     
      /**
       * Put a new block of serialized bytes to the block manager.
       */
      def putBytes(
        blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
        //逻辑比较简单......
      }

     

      /**
       * Replicate block to another node.
       */
      var cachedPeers: Seq[BlockManagerId] = null
      private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
        val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
        if (cachedPeers == null) {
          cachedPeers = master.getPeers(blockManagerId, level.replication - 1) //找到可用于replica的peers
        }
        for (peer: BlockManagerId <- cachedPeers) {  //把需要replica的block放到这些peer上去
          val start = System.nanoTime
          data.rewind()
          if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), //通过BlockManagerWorker传输block数据
            new ConnectionManagerId(peer.host, peer.port))) {
            logError("Failed to call syncPutBlock to " + peer)
          }
          logDebug("Replicated BlockId " + blockId + " once used " +
            (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
            data.limit() + " bytes.")
        }
      }

    2.3 dropFromMemory

      /**
       * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
       * store reaches its limit and needs to free up space.
       */
      def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
        logInfo("Dropping block " + blockId + " from memory")
        val info = blockInfo.get(blockId).orNull
        if (info != null)  {
          info.synchronized {  //获取blockInfo的互斥
            // required ? As of now, this will be invoked only for blocks which are ready
            // But in case this changes in future, adding for consistency sake.
            if (! info.waitForReady() ) {
              // If we get here, the block write failed.
              logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
              return
            }
    
            val level = info.level
            if (level.useDisk && !diskStore.contains(blockId)) { // 如果使用disk, 就把memory中要删除的写入disk
              logInfo("Writing block " + blockId + " to disk")
              data match {
                case Left(elements) =>
                  diskStore.putValues(blockId, elements, level, false)
                case Right(bytes) =>
                  diskStore.putBytes(blockId, bytes, level)
              }
            }
            val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L // 计算出从memory中drop掉的size
            val blockWasRemoved = memoryStore.remove(blockId)  // 从memoryStore drop掉block
            if (info.tellMaster) {
              reportBlockStatus(blockId, info, droppedMemorySize) // 通知master, block信息变化
            }
            if (!level.useDisk) {
              // The block is completely gone from this node; forget it so we can put() it again later.
              blockInfo.remove(blockId) // 如果没有使用disk, 那么从memory中删除, 意味着完全删除这个block
            }
          }
        } else {
          // The block has already been dropped
        }
      }
  • 相关阅读:
    Shell基础:什么是shell脚本、2种脚本解释器、#!约定解释器类型、运行shell脚本的2种方式、shell变量命令规范/赋值/如何使用/只读变量/删除变量/变量类型、shell字符串及其常用方法、shell数组及其常用方法、shell注释
    Linux su命令:su命令语法、su root与su
    docker容器内使用apt报错E: List directory /var/lib/apt/lists/partial is missing.
    浅析事务是什么、mysql是如何实现事务提交和回滚的、保证事务持久性redo log的实现原理、保证事务一致性undo log的实现原理、事务ACID特性及其实现原理
    浅析前后端分离架构下的API安全问题:JWT保证token不被盗用的方案(即如何防范Replay Attacks)
    浅析如何保证缓存与数据库的双写一致性:4种更新缓存的设计模式理解
    浅析SpringCloud中断路器是什么、断路器的作用以及在Feign中使用断路器
    浅析后端微服务涉及到定时任务时如何解决多集群定时任务重复执行并发的方案对比
    Linux连续执行多条命令的写法区别
    Dockerfile中RUN/CMD/ENTRYPOINT命令区别
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3513369.html
Copyright © 2011-2022 走看看