zoukankan      html  css  js  c++  java
  • Spark 源码分析 -- BlockStore

    BlockStore

    抽象接口类, 关键get和put都有两个版本
    序列化, putBytes, getBytes
    非序列化, putValues, getValues

    其中putValues的返回值为PutResult, 其中的data可能是Iterator或ByteBuffer

    private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer])
     
    /**
     * Abstract class to store blocks
     */
    private[spark]
    abstract class BlockStore(val blockManager: BlockManager) extends Logging {
      def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
    
      /**
       * Put in a block and, possibly, also return its content as either bytes or another Iterator.
       * This is used to efficiently write the values to multiple locations (e.g. for replication).
       *
       * @return a PutResult that contains the size of the data, as well as the values put if
       *         returnValues is true (if not, the result's data field can be null)
       */
      def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, 
        returnValues: Boolean) : PutResult
    
      /**
       * Return the size of a block in bytes.
       */
      def getSize(blockId: String): Long
    
      def getBytes(blockId: String): Option[ByteBuffer]
    
      def getValues(blockId: String): Option[Iterator[Any]]
    
      /**
       * Remove a block, if it exists.
       * @param blockId the block to remove.
       * @return True if the block was found and removed, False otherwise.
       */
      def remove(blockId: String): Boolean
    
      def contains(blockId: String): Boolean
    
      def clear() { }
    }

    DiskStore

    对应DiskStore其实很单纯, 就是打开相应的文件读或写.

    /**
     * Stores BlockManager blocks on disk.
     */
    private class DiskStore(blockManager: BlockManager, rootDirs: String)
      extends BlockStore(blockManager) with Logging {
    
      override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {
        // So that we do not modify the input offsets !
        // duplicate does not copy buffer, so inexpensive
        val bytes = _bytes.duplicate()
        val file = createFile(blockId)
        val channel = new RandomAccessFile(file, "rw").getChannel()
        while (bytes.remaining > 0) {
          channel.write(bytes)
        }
        channel.close()
      }
      override def putValues(
          blockId: String,
          values: ArrayBuffer[Any],
          level: StorageLevel,
          returnValues: Boolean)
        : PutResult = {
        val file = createFile(blockId)
        val fileOut = blockManager.wrapForCompression(blockId,
          new FastBufferedOutputStream(new FileOutputStream(file)))
        val objOut = blockManager.defaultSerializer.newInstance().serializeStream(fileOut)
        objOut.writeAll(values.iterator)
        objOut.close()
        val length = file.length()
    
        if (returnValues) {
          // Return a byte buffer for the contents of the file
          val buffer = getFileBytes(file)
          PutResult(length, Right(buffer))
        } else {
          PutResult(length, null)
        }
      }
    
      override def getBytes(blockId: String): Option[ByteBuffer] = {
        val file = getFile(blockId)
        val bytes = getFileBytes(file)
        Some(bytes)
      }
    
      override def getValues(blockId: String): Option[Iterator[Any]] = {
        getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
      }
    }

    MemoryStore

    对于MemoryStore复杂一些

    首先使用LinkedHashMap, 可遍历的HashMap, 来组织MemoryStore, 其中的hashmap的结构(blockid, entry)
    使用Entry抽象来表示block内容
    并且在put的时候, 还涉及到memory空间的释放, ensureFreeSpace

    /**
     * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
     * serialized ByteBuffers.
     */
    private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
      extends BlockStore(blockManager) {
      // 使用Entry来表示block内容
      case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) 
    
      private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) // 使用LinkedHashMap来表示整个MemoryStore
      private var currentMemory = 0L
      // Object used to ensure that only one thread is putting blocks and if necessary, dropping
      // blocks from the memory store.
      private val putLock = new Object() // HashMap不是线程安全的, 需要锁同步
    
      override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {
        // Work on a duplicate - since the original input might be used elsewhere.
        val bytes = _bytes.duplicate()
        bytes.rewind()  // 对于NIO的ByteBuffer, 使用前最好rewind
        if (level.deserialized) { // 如果storage level需要非序列化的
          val values = blockManager.dataDeserialize(blockId, bytes) // 需要先反序列化
          val elements = new ArrayBuffer[Any]
          elements ++= values
          val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
          tryToPut(blockId, elements, sizeEstimate, true)
        } else {
          tryToPut(blockId, bytes, bytes.limit, false)
        }
      }
    
      // putValues的返回值取决于storage level, 如果是deserialized, 返回iterator, 否则ByteBuffer
      override def putValues(
          blockId: String,
          values: ArrayBuffer[Any],
          level: StorageLevel,
          returnValues: Boolean)
        : PutResult = {
        if (level.deserialized) {
          val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
          tryToPut(blockId, values, sizeEstimate, true)
          PutResult(sizeEstimate, Left(values.iterator))
        } else {
          val bytes = blockManager.dataSerialize(blockId, values.iterator)
          tryToPut(blockId, bytes, bytes.limit, false)
          PutResult(bytes.limit(), Right(bytes.duplicate()))
        }
      }
    
      override def getBytes(blockId: String): Option[ByteBuffer] = {
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        if (entry == null) {
          None
        } else if (entry.deserialized) {
          Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
        } else {
          Some(entry.value.asInstanceOf[ByteBuffer].duplicate())   // Doesn't actually copy the data
        }
      }
    
      override def getValues(blockId: String): Option[Iterator[Any]] = {
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        if (entry == null) {
          None
        } else if (entry.deserialized) {
          Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
        } else {
          val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
          Some(blockManager.dataDeserialize(blockId, buffer))
        }
      }
     
      /**
       * Try to put in a set of values, if we can free up enough space. The value should either be
       * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
       * size must also be passed by the caller.
       *
       * Locks on the object putLock to ensure that all the put requests and its associated block
       * dropping is done by only on thread at a time. Otherwise while one thread is dropping
       * blocks to free memory for one block, another thread may use up the freed space for
       * another block.
       */
      private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
        // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
        // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
        // released, it must be ensured that those to-be-dropped blocks are not double counted for
        // freeing up more space for another block that needs to be put. Only then the actually dropping
        // of blocks (and writing to disk if necessary) can proceed in parallel.
        putLock.synchronized {
          if (ensureFreeSpace(blockId, size)) { // 如果可用分配足够的memory
            val entry = new Entry(value, size, deserialized)
            entries.synchronized { entries.put(blockId, entry) }
            currentMemory += size
            true
          } else { // 如果memory无法放下这个block, 那么只有从memory删除, 如果可以用disk, 那么在dropFromMemory中会put到disk中 
            // Tell the block manager that we couldn't put it in memory so that it can drop it to
            // disk if the block allows disk storage.
            val data = if (deserialized) {
              Left(value.asInstanceOf[ArrayBuffer[Any]])
            } else {
              Right(value.asInstanceOf[ByteBuffer].duplicate())
            }
            blockManager.dropFromMemory(blockId, data)
            false
          }
        }
      }
    
      /**
       * Tries to free up a given amount of space to store a particular block, but can fail and return
       * false if either the block is bigger than our memory or it would require replacing another
       * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
       * don't fit into memory that we want to avoid).
       *
       * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
       * Otherwise, the freed space may fill up before the caller puts in their new value.
       */
      private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
        if (space > maxMemory) {
          logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
          return false
        }
    
        if (maxMemory - currentMemory < space) {
          val rddToAdd = getRddId(blockIdToAdd)
          val selectedBlocks = new ArrayBuffer[String]()
          var selectedMemory = 0L
    
          // This is synchronized to ensure that the set of entries is not changed
          // (because of getValue or getBytes) while traversing the iterator, as that
          // can lead to exceptions.
          entries.synchronized {
            val iterator = entries.entrySet().iterator()  // 会依次删除现有的block, 直到可以放下新的block
            while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
              val pair = iterator.next()
              val blockId = pair.getKey
              if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
                logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
                  "block from the same RDD")
                return false
              }
              selectedBlocks += blockId
              selectedMemory += pair.getValue.size
            }
          }
    
          if (maxMemory - (currentMemory - selectedMemory) >= space) {
            logInfo(selectedBlocks.size + " blocks selected for dropping")
            for (blockId <- selectedBlocks) {  // 删除selectedBlocks, 释放空间
              val entry = entries.synchronized { entries.get(blockId) }
              // This should never be null as only one thread should be dropping
              // blocks and removing entries. However the check is still here for
              // future safety.
              if (entry != null) {
                val data = if (entry.deserialized) {
                  Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
                } else {
                  Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
                }
                blockManager.dropFromMemory(blockId, data)
              }
            }
            return true
          } else {
            return false
          }
        }
        return true
      }
  • 相关阅读:
    STL_算法_05_集合算法
    STL_算法_04_算术和生成算法
    STL_算法_03_拷贝和替换算法
    STL_算法_02_排序算法
    STL_算法_01_查找算法
    STL_容器使用时机
    STL_容器共通能力
    Qt5_选择文件对话框
    Qt5_当前exe所在路径
    Java 静态代理和动态代理
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3512590.html
Copyright © 2011-2022 走看看