  23、CacheManager原理剖析与源码分析




    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          // cacheManager相关东西
          // 如果storageLevel不为NONE,就是说,我们之前持久化过RDD,那么就不要直接去父RDD执行算子,计算新的RDD的partition了
          // 优先尝试使用CacheManager,去获取持久化的数据
          SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
        } else {
          // 进行rdd partition的计算
          computeOrReadCheckpoint(split, context)
    def getOrCompute[T](
          rdd: RDD[T],
          partition: Partition,
          context: TaskContext,
          storageLevel: StorageLevel): Iterator[T] = {
        val key = RDDBlockId(rdd.id, partition.index)
        logDebug(s"Looking for partition $key")
        // 直接用BlockManager来获取数据,如果获取到了,直接返回就好了
        blockManager.get(key) match {
          case Some(blockResult) =>
            // Partition is already materialized, so just return its values
            val inputMetrics = blockResult.inputMetrics
            val existingMetrics = context.taskMetrics
            val iter = blockResult.data.asInstanceOf[Iterator[T]]
            new InterruptibleIterator[T](context, iter) {
              override def next(): T = {
          // 如果从BlockManager获取不到数据,要进行后续的处理
          // 虽然RDD持久化过,但是因为未知原因,书籍即不在本地内存或磁盘,也不在远程BlockManager的本地或磁盘
          case None =>
            // Acquire a lock for loading this partition
            // If another thread already holds the lock, wait for it to finish return its results
            // 再次调用一次BlockManager的get()方法,去获取数据,如果获取到了,那么直接返回数据,如果还是没有获取数据,那么往后走
            val storedValues = acquireLockForPartition[T](key)
            if (storedValues.isDefined) {
              return new InterruptibleIterator[T](context, storedValues.get)
            // Otherwise, we have to load the partition ourselves
            try {
              logInfo(s"Partition $key not found, computing it")
              // 调用computeOrReadCheckpoint()方法
              // 如果rdd之前checkpoint过,那么尝试读取它的checkpoint,如果rdd没有checkpoint过,那么只能重新使用父RDD的数据,执行算子,计算一份
              val computedValues = rdd.computeOrReadCheckpoint(partition, context)
              // If the task is running locally, do not persist the result
              if (context.isRunningLocally) {
                return computedValues
              // Otherwise, cache the values and keep track of any updates in block statuses
              val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
              // 由于走CacheManager,肯定意味着RDD是设置过持久化级别的,只是因为某些原因,持久化数据没有找到,才会到这
              // 所以读取了checkpoint的数据,或者是重新计算数据之后,要用putInBlockManager()方法,将数据再BlockManager中持久化一份
              val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
              val metrics = context.taskMetrics
              val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
              metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
              new InterruptibleIterator(context, cachedValues)
            } finally {
              loading.synchronized {
    // 通过BlockManager获取数据的入口方法,优先从本地获取,如果本地没有,那么从远程获取
      def get(blockId: BlockId): Option[BlockResult] = {
        val local = getLocal(blockId)
        if (local.isDefined) {
          logInfo(s"Found block $blockId locally")
          return local
        val remote = getRemote(blockId)
        if (remote.isDefined) {
          logInfo(s"Found block $blockId remotely")
          return remote
    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
        // Checkpointed相关先忽略
        if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
    private def putInBlockManager[T](
          key: BlockId,
          values: Iterator[T],
          level: StorageLevel,
          updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
          effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
        val putLevel = effectiveStorageLevel.getOrElse(level)
        // 如果持久化级别,没有指定内存级别,仅仅是纯磁盘的级别
        if (!putLevel.useMemory) {
           * This RDD is not to be cached in memory, so we can just pass the computed values as an
           * iterator directly to the BlockManager rather than first fully unrolling it in memory.
          updatedBlocks ++=
          // 直接调用blockManager的putIterator()方法,将数据写入磁盘即可
            blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
          blockManager.get(key) match {
            case Some(v) => v.data.asInstanceOf[Iterator[T]]
            case None =>
              logInfo(s"Failure to store $key")
              throw new BlockException(key, s"Block manager failed to return cached value for $key!")
        // 如果指定了内存级别,往下看
        else {
           * This RDD is to be cached in memory. In this case we cannot pass the computed values
           * to the BlockManager as an iterator and expect to read it back later. This is because
           * we may end up dropping a partition from memory store before getting it back.
           * In addition, we must be careful to not unroll the entire partition in memory at once.
           * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
           * single partition. Instead, we unroll the values cautiously, potentially aborting and
           * dropping the partition to disk if applicable.
          // 这里会调用blockManager的unrollSafely()方法,尝试将数据写入内存
          // 如果unrollSafely()方法判断数据可以写入内存,那么就将数据写入内存
          // 如果unrollSafely()方法判断某些数据无法写入内存,那么只能写入磁盘文件
          blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
            case Left(arr) =>
              // We have successfully unrolled the entire partition, so cache it in memory
              updatedBlocks ++=
                blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
            case Right(it) =>
              // There is not enough space to cache this partition in memory
              val returnValues = it.asInstanceOf[Iterator[T]]
              // 如果有些数据实在无法写入内存,那么就判断,数据是否有磁盘级别
              // 如果有的话,那么就使用磁盘级别,将数据写入磁盘文件
              if (putLevel.useDisk) {
                logWarning(s"Persisting partition $key to disk instead.")
                val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
                  useOffHeap = false, deserialized = false, putLevel.replication)
                putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
              } else {
    def unrollSafely(
          blockId: BlockId,
          values: Iterator[Any],
          droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
        : Either[Array[Any], Iterator[Any]] = {
        // Number of elements unrolled so far
        var elementsUnrolled = 0
        // Whether there is still enough memory for us to continue unrolling this block
        var keepUnrolling = true
        // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
        val initialMemoryThreshold = unrollMemoryThreshold
        // How often to check whether we need to request more memory
        val memoryCheckPeriod = 16
        // Memory currently reserved by this thread for this particular unrolling operation
        var memoryThreshold = initialMemoryThreshold
        // Memory to request as a multiple of current vector size
        val memoryGrowthFactor = 1.5
        // Previous unroll memory held by this thread, for releasing later (only at the very end)
        val previousMemoryReserved = currentUnrollMemoryForThisThread
        // Underlying vector for unrolling the block
        var vector = new SizeTrackingVector[Any]
        // Request enough memory to begin unrolling
        keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
        if (!keepUnrolling) {
          logWarning(s"Failed to reserve initial memory threshold of " +
            s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
        // Unroll this block safely, checking whether we have exceeded our threshold periodically
        try {
          while (values.hasNext && keepUnrolling) {
            vector += values.next()
            if (elementsUnrolled % memoryCheckPeriod == 0) {
              // If our vector's size has exceeded the threshold, request more memory
              val currentSize = vector.estimateSize()
              if (currentSize >= memoryThreshold) {
                val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
                // Hold the accounting lock, in case another thread concurrently puts a block that
                // takes up the unrolling space we just ensured here
                accountingLock.synchronized {
                  if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
                    // If the first request is not granted, try again after ensuring free space
                    // If there is still not enough space, give up and drop the partition
                    val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
                    // 反复循环,判断,只要还有数据没有写入内存,而且可以继续尝试往内存中写
                    // 那么就判断,如果内存大小够不够存放数据,调用ensureFreeSpace()方法,尝试清空一些内存空间
                    if (spaceToEnsure > 0) {
                      val result = ensureFreeSpace(blockId, spaceToEnsure)
                      droppedBlocks ++= result.droppedBlocks
                    keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
                // New threshold is currentSize * memoryGrowthFactor
                memoryThreshold += amountToRequest
            elementsUnrolled += 1
          if (keepUnrolling) {
            // We successfully unrolled the entirety of this block
          } else {
            // We ran out of space while unrolling the values for this block
            logUnrollFailureMessage(blockId, vector.estimateSize())
            Right(vector.iterator ++ values)
        } finally {
          // If we return an array, the values returned do not depend on the underlying vector and
          // we can immediately free up space for other threads. Otherwise, if we return an iterator,
          // we release the memory claimed by this thread later on when the task finishes.
          if (keepUnrolling) {
            val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
