zoukankan      html  css  js  c++  java
  • Spark 源码分析 -- BlockStore

    BlockStore

    抽象接口类, 关键get和put都有两个版本
    序列化, putBytes, getBytes
    非序列化, putValues, getValues

    其中putValues的返回值为PutResult, 其中的data可能是Iterator或ByteBuffer

    private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer])
     
    /**
     * Abstract class to store blocks
     */
    private[spark]
    abstract class BlockStore(val blockManager: BlockManager) extends Logging {
      def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
    
      /**
       * Put in a block and, possibly, also return its content as either bytes or another Iterator.
       * This is used to efficiently write the values to multiple locations (e.g. for replication).
       *
       * @return a PutResult that contains the size of the data, as well as the values put if
       *         returnValues is true (if not, the result's data field can be null)
       */
      def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, 
        returnValues: Boolean) : PutResult
    
      /**
       * Return the size of a block in bytes.
       */
      def getSize(blockId: String): Long
    
      def getBytes(blockId: String): Option[ByteBuffer]
    
      def getValues(blockId: String): Option[Iterator[Any]]
    
      /**
       * Remove a block, if it exists.
       * @param blockId the block to remove.
       * @return True if the block was found and removed, False otherwise.
       */
      def remove(blockId: String): Boolean
    
      def contains(blockId: String): Boolean
    
      def clear() { }
    }

    DiskStore

    对应DiskStore其实很单纯, 就是打开相应的文件读或写.

    /**
     * Stores BlockManager blocks on disk.
     */
    private class DiskStore(blockManager: BlockManager, rootDirs: String)
      extends BlockStore(blockManager) with Logging {
    
      override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {
        // So that we do not modify the input offsets !
        // duplicate does not copy buffer, so inexpensive
        val bytes = _bytes.duplicate()
        val file = createFile(blockId)
        val channel = new RandomAccessFile(file, "rw").getChannel()
        while (bytes.remaining > 0) {
          channel.write(bytes)
        }
        channel.close()
      }
      override def putValues(
          blockId: String,
          values: ArrayBuffer[Any],
          level: StorageLevel,
          returnValues: Boolean)
        : PutResult = {
        val file = createFile(blockId)
        val fileOut = blockManager.wrapForCompression(blockId,
          new FastBufferedOutputStream(new FileOutputStream(file)))
        val objOut = blockManager.defaultSerializer.newInstance().serializeStream(fileOut)
        objOut.writeAll(values.iterator)
        objOut.close()
        val length = file.length()
    
        if (returnValues) {
          // Return a byte buffer for the contents of the file
          val buffer = getFileBytes(file)
          PutResult(length, Right(buffer))
        } else {
          PutResult(length, null)
        }
      }
    
      override def getBytes(blockId: String): Option[ByteBuffer] = {
        val file = getFile(blockId)
        val bytes = getFileBytes(file)
        Some(bytes)
      }
    
      override def getValues(blockId: String): Option[Iterator[Any]] = {
        getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
      }
    }

    MemoryStore

    对于MemoryStore复杂一些

    首先使用LinkedHashMap, 可遍历的HashMap, 来组织MemoryStore, 其中的hashmap的结构(blockid, entry)
    使用Entry抽象来表示block内容
    并且在put的时候, 还涉及到memory空间的释放, ensureFreeSpace

    /**
     * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
     * serialized ByteBuffers.
     */
    private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
      extends BlockStore(blockManager) {
      // 使用Entry来表示block内容
      case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) 
    
      private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) // 使用LinkedHashMap来表示整个MemoryStore
      private var currentMemory = 0L
      // Object used to ensure that only one thread is putting blocks and if necessary, dropping
      // blocks from the memory store.
      private val putLock = new Object() // HashMap不是线程安全的, 需要锁同步
    
      override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {
        // Work on a duplicate - since the original input might be used elsewhere.
        val bytes = _bytes.duplicate()
        bytes.rewind()  // 对于NIO的ByteBuffer, 使用前最好rewind
        if (level.deserialized) { // 如果storage level需要非序列化的
          val values = blockManager.dataDeserialize(blockId, bytes) // 需要先反序列化
          val elements = new ArrayBuffer[Any]
          elements ++= values
          val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
          tryToPut(blockId, elements, sizeEstimate, true)
        } else {
          tryToPut(blockId, bytes, bytes.limit, false)
        }
      }
    
      // putValues的返回值取决于storage level, 如果是deserialized, 返回iterator, 否则ByteBuffer
      override def putValues(
          blockId: String,
          values: ArrayBuffer[Any],
          level: StorageLevel,
          returnValues: Boolean)
        : PutResult = {
        if (level.deserialized) {
          val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
          tryToPut(blockId, values, sizeEstimate, true)
          PutResult(sizeEstimate, Left(values.iterator))
        } else {
          val bytes = blockManager.dataSerialize(blockId, values.iterator)
          tryToPut(blockId, bytes, bytes.limit, false)
          PutResult(bytes.limit(), Right(bytes.duplicate()))
        }
      }
    
      override def getBytes(blockId: String): Option[ByteBuffer] = {
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        if (entry == null) {
          None
        } else if (entry.deserialized) {
          Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
        } else {
          Some(entry.value.asInstanceOf[ByteBuffer].duplicate())   // Doesn't actually copy the data
        }
      }
    
      override def getValues(blockId: String): Option[Iterator[Any]] = {
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        if (entry == null) {
          None
        } else if (entry.deserialized) {
          Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
        } else {
          val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
          Some(blockManager.dataDeserialize(blockId, buffer))
        }
      }
     
      /**
       * Try to put in a set of values, if we can free up enough space. The value should either be
       * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
       * size must also be passed by the caller.
       *
       * Locks on the object putLock to ensure that all the put requests and its associated block
       * dropping is done by only on thread at a time. Otherwise while one thread is dropping
       * blocks to free memory for one block, another thread may use up the freed space for
       * another block.
       */
      private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
        // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
        // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
        // released, it must be ensured that those to-be-dropped blocks are not double counted for
        // freeing up more space for another block that needs to be put. Only then the actually dropping
        // of blocks (and writing to disk if necessary) can proceed in parallel.
        putLock.synchronized {
          if (ensureFreeSpace(blockId, size)) { // 如果可用分配足够的memory
            val entry = new Entry(value, size, deserialized)
            entries.synchronized { entries.put(blockId, entry) }
            currentMemory += size
            true
          } else { // 如果memory无法放下这个block, 那么只有从memory删除, 如果可以用disk, 那么在dropFromMemory中会put到disk中 
            // Tell the block manager that we couldn't put it in memory so that it can drop it to
            // disk if the block allows disk storage.
            val data = if (deserialized) {
              Left(value.asInstanceOf[ArrayBuffer[Any]])
            } else {
              Right(value.asInstanceOf[ByteBuffer].duplicate())
            }
            blockManager.dropFromMemory(blockId, data)
            false
          }
        }
      }
    
      /**
       * Tries to free up a given amount of space to store a particular block, but can fail and return
       * false if either the block is bigger than our memory or it would require replacing another
       * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
       * don't fit into memory that we want to avoid).
       *
       * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
       * Otherwise, the freed space may fill up before the caller puts in their new value.
       */
      private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
        if (space > maxMemory) {
          logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
          return false
        }
    
        if (maxMemory - currentMemory < space) {
          val rddToAdd = getRddId(blockIdToAdd)
          val selectedBlocks = new ArrayBuffer[String]()
          var selectedMemory = 0L
    
          // This is synchronized to ensure that the set of entries is not changed
          // (because of getValue or getBytes) while traversing the iterator, as that
          // can lead to exceptions.
          entries.synchronized {
            val iterator = entries.entrySet().iterator()  // 会依次删除现有的block, 直到可以放下新的block
            while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
              val pair = iterator.next()
              val blockId = pair.getKey
              if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
                logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
                  "block from the same RDD")
                return false
              }
              selectedBlocks += blockId
              selectedMemory += pair.getValue.size
            }
          }
    
          if (maxMemory - (currentMemory - selectedMemory) >= space) {
            logInfo(selectedBlocks.size + " blocks selected for dropping")
            for (blockId <- selectedBlocks) {  // 删除selectedBlocks, 释放空间
              val entry = entries.synchronized { entries.get(blockId) }
              // This should never be null as only one thread should be dropping
              // blocks and removing entries. However the check is still here for
              // future safety.
              if (entry != null) {
                val data = if (entry.deserialized) {
                  Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
                } else {
                  Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
                }
                blockManager.dropFromMemory(blockId, data)
              }
            }
            return true
          } else {
            return false
          }
        }
        return true
      }
  • 相关阅读:
    UVA12125 March of the Penguins (最大流+拆点)
    UVA 1317 Concert Hall Scheduling(最小费用最大流)
    UVA10249 The Grand Dinner(最大流)
    UVA1349 Optimal Bus Route Design(KM最佳完美匹配)
    UVA1212 Duopoly(最大流最小割)
    UVA1395 Slim Span(kruskal)
    UVA1045 The Great Wall Game(二分图最佳匹配)
    UVA12168 Cat vs. Dog( 二分图最大独立集)
    hdu3488Tour(KM最佳完美匹配)
    UVA1345 Jamie's Contact Groups(最大流+二分)
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3512590.html
Copyright © 2011-2022 走看看