zoukankan      html  css  js  c++  java
  • Checkpoint & cache & persist

    checkpoint

    checkpoint(检查点)是Spark为了避免长链路,大计算量的Rdd不可用时,需要长时间恢复而引入的。主要就是将通过大量计算而获得的这类Rdd的数据直接持久化到外部可靠的存储体系中(一般为hdfs文件)。在以后再需要从这个Rdd获取数据时,直接从检查点获取数据从而避免了从头重新计算Rdd的数据。

    生成checkpoint

    checkpoint是在job执行结束后再启动专门的checkpoint job生成的(完成job的action方法之后),也就是说需要checkpoint的rdd会被提交到集群上计算两次。所以在对rdd调用checkpoint时,建议也加上rdd.cache。如果对多个rdd进行了checkpoint(这些rdd不是父子关系),则需要将这些rdd都提交到集群执行一次。

    Spark当前实现的checkpoint策略,如果父子rdd都调用了checkpoint方法,则默认只会对子rdd进行checkpoint。可以通过系统参数spark.checkpoint.checkpointAllMarkedAncestors来保存所有调用了checkpoint方法的rdd。

    RDDCheckpointData与RDD是一一对应的,包含了一个RDD的checkpoint的所有信息,RDD可以通过其RDDCheckpointData.isCheckpointed方法来判断RDD是否有对应的checkpoint。RDD会调用RDDCheckpointData.checkpoint方法来实际checkpoint当前RDD。RDDCheckpointData有两个子类LocalRDDCheckpointData和ReliableRDDCheckpointData,分别对应本地模式的checkpoint方法和集群模式的checkpoint方法。RDDCheckpointData会调用子类的doCheckpoint方法来实际保存rdd的数据。

    rdd生成checkpoint文件的流程如下(这里以集群模式为例):

    SparkContext.runJob -> RDD.doCheckpoint -> RDDCheckpointData.checkpoint -> ReliableRDDCheckpointData.doCheckpoint -> ReliableRDDCheckpoint.writeRDDToCheckpointDirectory -> SparkContext.runJob

    1. 在SparkContext.runJob方法中,在DAGScheduler.runJob结束后,调用finalRdd的doCheckpoint方法
    2. 在RDD.doCheckpoint方法中,如果当前rdd没有调用checkpoint方法(checkpointData.isDefined == false),则遍历当前rdd依赖的所有rdd,递归调用这些rdd的checkpoint方法
    3. 如果当前rdd调用checkpoint方法(checkpointData.isDefined == true),则如果设置保存所有调用了checkpoint方法的rdd的数据(checkpointAllMarkedAncestors == true),则首先遍历当前rdd的所有依赖的rdd的doCheckpoint方法。
    4. 然后再对当前rdd调用RDDCheckpointData.checkpoint方法来实际将当前rdd的数据保存到checkpoint文件中
    5. 在RDDCheckpointData.checkpoint方法中,首先调用子类的doCheckpoint方法保存rdd数据到checkpoint文件并返回checkpoint文件对应的rdd,然后将当前rdd的所有依赖清空
    6. 在子类的doCheckpoint方法中(这里以ReliableRDDCheckpointData为例),会调用ReliableCheckpointRDD.writeRDDToCheckpointDirectory方法来实际保存rdd数据到checkpoint中,并返回checkpoint对应的rdd
    7. 在ReliableCheckpointRDD.writeRDDToCheckpointDirectory方法中,会调用SparkContext.runJob方法将当前rdd提交到集群执行,并调用rdd.iterator方法遍历rdd中数据并保存到SparkContext设置的checkpoint目录中,最后返回ReliableCheckpointRDD包装的对checkpoint文件引用的rdd

    SparkContext.runJob

      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        // 省略部分代码
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        // 在job结束后再checkpoint
        rdd.doCheckpoint()
      }
    

    RDD.doCheckpoint

      private[spark] def doCheckpoint(): Unit = {
        RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
          if (!doCheckpointCalled) {
            doCheckpointCalled = true
            if (checkpointData.isDefined) {
              if (checkpointAllMarkedAncestors) {
                // 对父rdd进行checkpoint(也会调用runJob方法),当前这个是个串行操作
                dependencies.foreach(_.rdd.doCheckpoint())
              }
              checkpointData.get.checkpoint()
            } else {
              // 如果checkpointData为null,说明当前rdd没有checkpoint,则遍历其父类rdd
              // 如果都没有调用过checkpoint方法,则返回
              dependencies.foreach(_.rdd.doCheckpoint())
            }
          }
        }
      }
    

    RDDCheckpointData.checkpoint

      final def checkpoint(): Unit = {
        // Guard against multiple threads checkpointing the same RDD by
        // atomically flipping the state of this RDDCheckpointData
        RDDCheckpointData.synchronized {
          if (cpState == Initialized) {
            cpState = CheckpointingInProgress
          } else {
            return
          }
        }
    
        val newRDD = doCheckpoint()
    
        // Update our state and truncate the RDD lineage
        RDDCheckpointData.synchronized {
          cpRDD = Some(newRDD)
          cpState = Checkpointed
          rdd.markCheckpointed()
        }
      }
    

    ReliableRDDCheckpointData.checkpoint

      protected override def doCheckpoint(): CheckpointRDD[T] = {
        val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    
        // Optionally clean our checkpoint files if the reference is out of scope
        if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
          rdd.context.cleaner.foreach { cleaner =>
            cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
          }
        }
    
        logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
        newRDD
      }
    

    ReliableCheckpointRDD.writeRDDToCheckpointDirectory

      def writeRDDToCheckpointDirectory[T: ClassTag](
          originalRDD: RDD[T],
          checkpointDir: String,
          blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    
        val sc = originalRDD.sparkContext
    
        // Create the output path for the checkpoint
        val checkpointDirPath = new Path(checkpointDir)
        val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
        if (!fs.mkdirs(checkpointDirPath)) {
          throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
        }
    
        // Save to file, and reload it as an RDD
        val broadcastedConf = sc.broadcast(
          new SerializableConfiguration(sc.hadoopConfiguration))
        // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
        // 再次提交job,来生成checkpoint
        sc.runJob(originalRDD,
          writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    
        if (originalRDD.partitioner.nonEmpty) {
          writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
        }
    
        val newRDD = new ReliableCheckpointRDD[T](
          sc, checkpointDirPath.toString, originalRDD.partitioner)
        if (newRDD.partitions.length != originalRDD.partitions.length) {
          throw new SparkException(
            s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
              s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
        }
        newRDD
      }
    

    读取checkpoint

    Spark的rdd的读取都是通过rdd.iterator方法来读取数据的。rdd.iterator都会调用computeOrReadCheckpoint方法来读取数据。如果当前rdd是被checkpoint的(isCheckpointedAndMaterialized == true),则直接调用rdd父类(被checkpoint的rdd的父类就是CheckpointRDD类型的rdd,也就是在写入checkpoint时,最后返回的rdd)的iterator方法。在父类的rdd中由于isCheckpointedAndMaterialized == false,会调用CheckpointRDD.compute方法来获取数据。这里以CheckpointRDD的一个子类ReliableCheckpointRDD为例,在compute方法中直接读取对应的checkpoint文件来获取数据。

    RDD.computeOrReadCheckpoint

      private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
      {
        // 如果当前rdd被checkpoint了,则直接从rdd的parent取
        if (isCheckpointedAndMaterialized) {
          firstParent[T].iterator(split, context)
        } else {
          compute(split, context)
        }
      }
    

    ReliableCheckpointRDD.compute

    override def compute(split: Partition, context: TaskContext): Iterator[T] = {
      val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
      ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
    }
    

    cache & persist

    cache和persist都是Spark用来缓存数据的方法,cache和persist的唯一区别就是cache的存储级别是MEMORY_ONLY,而persist可以指定存储级别,也就是说cache的数据在保存数据的节点内存不足的情况下会被丢弃,而persist在设置了存储级别可为DISK时,在节点内存不足时,缓存的数据可以被刷新到磁盘上保存。cache&persist和checkpoint的区别在于cache&persist只是为了加快数据的计算,RDD的依赖链路是完整保存的,在cache&persist的数据不可用时,Spark还可以根据RDD的依赖关系重新计算得到数据。而checkpoint检查点对应的RDD的依赖链路是不保存的,对应的RDD只有一个指向检查点文件的父RDD,所以在检查点数据丢失的情况下,Spark是无法根据RDD的依赖链路恢复数据的。而且由于cache&persist的数据由BlockManager管理,所以在driver程序执行结束时,被cache&persist的数据也会被清空。而checkpoint的数据是写入诸如HDFS文件系统中的,是独立存在的,所以可以被下一个driver程序执行使用。

    cache和persist方法只是简单的设置了一下RDD的存储级别(RDD默认的存储级别是NONE)。在调用cache或persist时是不会触发缓存数据,只有在调用了action操作,Spark在调用到设置了cache或persist的RDD的iterator方法时,才会检查RDD是否是缓存了的。执行流程如下:

    1. 首先在action操作前调用RDD.cache或RDD.persist操作,设置RDD的存储级别
    2. 在job真正运行后,最终任务会调用ResultTask.runTask或ShuffleMapTask.runTask方法执行。在runTask方法中会调用RDD.iterator方法
    3. 在iterator中会判断当前RDD是否设置了存储级别(RDD默认的存储级别是NONE,只有cache和persist才可以改变RDD的存储级别,所以只要RDD的存储级别不是NONE则一定是设置了缓存),如果不是NONE,则调用getOrCompute方法,否则直接调用computeOrReadCheckpoint方法
    4. 在getOrCompute方法中,以RDD的id和partitionId作为查询的key,首先尝试从BlockManager中直接获取缓存的RDD数据(先尝试从本地获取,如果本地获取不到再根据driver提供的节点从其他节点获取),如果不能从BlockManager中获取,则再调用computeOrReadCheckpoint方法调用RDD.compute方法直接计算或从检查点获取数据,然后将获取到的数据缓存到BlockManager中,以便下次可以直接从BlockManager中直接获取(BlockManager原理请查看Spark存储体系)

    RDD.cache

      /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
      def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
    
      /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
      def cache(): this.type = persist()
    

    RDD.persist

      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
        // 当前在设置了RDD的存储级别后不允许修改
        if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of an RDD after it was already assigned a level")
        }
        // 如果当前的RDD是第一次被缓存,则将RDD注册到SparkContext中以便之后的清理和统计,这个操作只会做一次
        if (storageLevel == StorageLevel.NONE) {
          sc.cleaner.foreach(_.registerRDDForCleanup(this))
          sc.persistRDD(this)
        }
        storageLevel = newLevel
        this
      }
    

    RDD.iterator

      final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        // 只有rdd设置了cache或persist时,storageLevel才不是NONE,可以调用getOrCompute方法,首先从BlockManager中
        // 直接获取,如果没有再通过compute方法计算获取并保存到BlockManager中
        if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    

    RDD.getOrCompute

      private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        // 这里的一个block就是一个partition
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // 如果是shuffle操作,则通过computeOrReadCheckpoint方法从map获取到iterator,然后调用getOrElseUpdate
        // 将iterator保存到BlockManager中(map阶段只是写到map文件,并没有保存到blockManager,所以这里调用
        // get是获取不到数据的,所以会update),如果是非shuffle操作,首先从BlockManager中获取,获取不到则
        // 调用computeOrReadCheckpoint方法,直接从checkpoint获取或通过compute方法获取。如果是数据源的RDD的
        // compute方法,则直接从数据源获取数据,如果是转换后的RDD的compute方法,则递归调用父类的iterator
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
          readCachedBlock = false
          computeOrReadCheckpoint(partition, context)
        }) match {
          case Left(blockResult) =>
            if (readCachedBlock) {
              val existingMetrics = context.taskMetrics().inputMetrics
              existingMetrics.incBytesRead(blockResult.bytes)
              new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                override def next(): T = {
                  existingMetrics.incRecordsRead(1)
                  delegate.next()
                }
              }
            } else {
              new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
            }
          case Right(iter) =>
            new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
        }
      }
  • 相关阅读:
    JVM运行时数据区--堆
    ES检索服务搜索结果高亮
    SpringBoot 设置编码UTF-8
    response.setContentType()的作用及参数
    将 vue.js 获取的 html 文本转化为纯文本
    SpringBoot读取properties文件配置项
    关于Java的编译执行与解释执行
    Java沙箱安全机制介绍【转载】
    JVM运行时数据区--本地方法栈
    JVM--先说本地方法接口
  • 原文地址:https://www.cnblogs.com/cenglinjinran/p/8476250.html
Copyright © 2011-2022 走看看