zoukankan      html  css  js  c++  java
  • spark 持久化机制

    spark的持久化机制做的相对隐晦一些,没有一个显示的调用入口。

    首先通过rdd.persist(newLevel: StorageLevel)对此rdd的StorageLevel进行赋值,同checkpoint一样,本身没有进行之久化操作。真正进行持久化操作实在之后的第一个action 中通过iterator方法进行调用:

      final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    其中调用过持久化的rdd的StorageLevel不为NONE,所以会执行getOrCompute方法
    if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        }
      /**
        * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
        */
      private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        //TODO block和partition的关系
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
          readCachedBlock = false
          computeOrReadCheckpoint(partition, context)
        }) match {
          case Left(blockResult) =>
            if (readCachedBlock) {
              val existingMetrics = context.taskMetrics().inputMetrics
              existingMetrics.incBytesRead(blockResult.bytes)
              new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                override def next(): T = {
                  existingMetrics.incRecordsRead(1)
                  delegate.next()
                }
              }
            } else {
              new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
            }
          case Right(iter) =>
            new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
        }
      }
    getOrCompute方法中,调用了blockManager.getOrElseUpdate方法实现了block的读取和持久化操作:
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
          readCachedBlock = false
          computeOrReadCheckpoint(partition, context)
        })

    在getOrElseUpdate中调用doPutIterator方法,具体实现存储方式和级别的逻辑判断进而调用相应的存储实现MemoryStore或者DiskStore进行具体实现。

    private def  doPutIterator[T](
          blockId: BlockId,
          iterator: () => Iterator[T],
          level: StorageLevel,
          classTag: ClassTag[T],
          tellMaster: Boolean = true,
          keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
        doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
          val startTimeMs = System.currentTimeMillis
          var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
          // Size of the block in bytes
          var size = 0L
          if (level.useMemory) {
            // Put it in memory first, even if it also has useDisk set to true;
            // We will drop it to disk later if the memory store can't hold it.
            if (level.deserialized) {
              memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
                case Right(s) =>
                  size = s
                case Left(iter) =>
                  // Not enough space to unroll this block; drop to disk if applicable
                  if (level.useDisk) {
                    logWarning(s"Persisting block $blockId to disk instead.")
                    diskStore.put(blockId) { fileOutputStream =>
                      serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
                    }
                    size = diskStore.getSize(blockId)
                  } else {
                    iteratorFromFailedMemoryStorePut = Some(iter)
                  }
              }
            } else { // !level.deserialized
              memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
                case Right(s) =>
                  size = s
                case Left(partiallySerializedValues) =>
                  // Not enough space to unroll this block; drop to disk if applicable
                  if (level.useDisk) {
                    logWarning(s"Persisting block $blockId to disk instead.")
                    diskStore.put(blockId) { fileOutputStream =>
                      partiallySerializedValues.finishWritingToStream(fileOutputStream)
                    }
                    size = diskStore.getSize(blockId)
                  } else {
                    iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
                  }
              }
            }
    
          } else if (level.useDisk) {
            diskStore.put(blockId) { fileOutputStream =>
              serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
            }
            size = diskStore.getSize(blockId)
          }
    
          val putBlockStatus = getCurrentBlockStatus(blockId, info)
          val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
          if (blockWasSuccessfullyStored) {
            // Now that the block is in either the memory or disk store, tell the master about it.
            info.size = size
            if (tellMaster && info.tellMaster) {
              reportBlockStatus(blockId, putBlockStatus)
            }
            addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
            logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
            if (level.replication > 1) {
              val remoteStartTime = System.currentTimeMillis
              val bytesToReplicate = doGetLocalBytes(blockId, info)
              // [SPARK-16550] Erase the typed classTag when using default serialization, since
              // NettyBlockRpcServer crashes when deserializing repl-defined classes.
              // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
              val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
                scala.reflect.classTag[Any]
              } else {
                classTag
              }
              try {
                replicate(blockId, bytesToReplicate, level, remoteClassTag)
              } finally {
                bytesToReplicate.unmap()
              }
              logDebug("Put block %s remotely took %s"
                .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
            }
          }
          assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
          iteratorFromFailedMemoryStorePut
        }
      }
  • 相关阅读:
    Loadrunner:场景运行较长时间后报错:Message id [-17999] was not saved
    JMeter学习(三十二)属性和变量
    LoadRunner参数化MySQL
    Linux下安装使用NMON监控、分析系统性能
    使用Loadrunner进行http接口压力测试
    Windows远程桌面连接Ubuntu 14.04
    oracle转Mysql中,varchar2(10)和number应该转换为什么类型?
    centos下pip安装mysql_python
    JMeter中返回Json数据的处理方法
    如何使用AutoIT完成单机测试
  • 原文地址:https://www.cnblogs.com/fantiantian/p/9493270.html
Copyright © 2011-2022 走看看