zoukankan      html  css  js  c++  java
  • Spark如何删除无效rdd checkpoint

    spark可以使用checkpoint来作为检查点,将rdd的数据写入hdfs文件,也可以利用本地缓存子系统。
    当我们使用checkpoint将rdd保存到hdfs文件时,如果任务的临时文件长时间不删除,长此以往,hdfs会出现很多没有用的文件,spark也考虑到了这一点,因此,用了一些取巧的方式来解决这种问题。

    spark config:

    spark.cleaner.referenceTracking.cleanCheckpoints = 默认false
    

    也就是说默认情况下,保存的文件一直都会放在dfs中,除非人工删除
    下述内容均建立在值为true的情况下

    设置检查点路径

    spark.sparkContext().setCheckpointDir("hdfs://nameservice1/xx/xx");
    

    存放到hdfs文件系统的好处是自带高容错性、可用性。
    那么,所有运行的任务都写这个路径会不会出现覆盖的情况呢?答案是不会

      /**
       * Set the directory under which RDDs are going to be checkpointed.
       * @param directory path to the directory where checkpoint files will be stored
       * (must be HDFS path if running in cluster)
       */
      def setCheckpointDir(directory: String) {
    
        // If we are running on a cluster, log a warning if the directory is local.
        // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
        // its own local file system, which is incorrect because the checkpoint files
        // are actually on the executor machines.
        if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
          logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
            s"must not be on the local filesystem. Directory '$directory' " +
            "appears to be on the local filesystem.")
        }
    
        checkpointDir = Option(directory).map { dir =>
    	  //利用uuid生成了一个子目录,存放的rdd文件将放到子目录中	
          val path = new Path(dir, UUID.randomUUID().toString)
          val fs = path.getFileSystem(hadoopConfiguration)
          fs.mkdirs(path)
          fs.getFileStatus(path).getPath.toString
        }
      }
    

    利用uuid的唯一性,使不同的进程间的checkpoint互不干扰,后续有checkpoint创建的请求时,将会在该目录下创建文件来保存rdd的内容

    在生成checkpoint的ReliableRDDCheckpointData 方法中,

    保存检查点

      /**
       * Materialize this RDD and write its content to a reliable DFS.
       * This is called immediately after the first action invoked on this RDD has completed.
       */
      protected override def doCheckpoint(): CheckpointRDD[T] = {
        //写入到可靠的文件中
        val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    
        // Optionally clean our checkpoint files if the reference is out of scope
    	//默认false,才会注册清理器
        if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
          rdd.context.cleaner.foreach { cleaner =>
    		//注册清理事件
            cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
          }
        }
    
        logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
        newRDD
      }
    

    注册事件

    注册清理事件的意义是当rdd对象无其他引用依赖时,由清理线程异步清理对应的checkpoint文件

      /** Register a RDDCheckpointData for cleanup when it is garbage collected. */
      def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
        registerForCleanup(rdd, CleanCheckpoint(parentId))
      }
    
      /** Register an object for cleanup. */
      private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
        referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
      }
    

    referenceBuffer的作用是持有CleanupTaskWeakReference对象的引用,防止CleanupTaskWeakReference被提前回收,导致提前清理。

    弱引用对象

    CleanupTaskWeakReference继承自WeakReference,将referent(也就是rdd),绑定到referenceQueue上,如果gc回收时,发现referent除了referenceQueue这个弱引用外,已经没有其他对象引用,就会将CleanupTaskWeakReference对应放入referenceQueue中

    //引用队列,当garbage collector发现对应的可达性改变被发现时,会将引用对象推入队列中
    //这是通过Reference.enqueue方法实现的 public boolean enqueue() {return this.queue.enqueue(this);}
    private val referenceQueue = new ReferenceQueue[AnyRef]
    
    /**
     * A WeakReference associated with a CleanupTask.
     *
     * When the referent object becomes only weakly reachable, the corresponding
     * CleanupTaskWeakReference is automatically added to the given reference queue.
     */
    private class CleanupTaskWeakReference(
        val task: CleanupTask,
        referent: AnyRef,
        referenceQueue: ReferenceQueue[AnyRef])
      extends WeakReference(referent, referenceQueue)
    

    回收线程

    再来细致的讲回收线程
    在SparkContext初始化时,会启动cleaner,代码较多,直接依次

    _cleaner =
      if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())
    

      /** Start the cleaner. */
      def start(): Unit = {
        cleaningThread.setDaemon(true) //守护进程
        cleaningThread.setName("Spark Context Cleaner")
        cleaningThread.start()
    	//这里有点银弹的意思,定时执行gc,默认半小时一次,主要是应对长时间任务问题
        periodicGCService.scheduleAtFixedRate(() => System.gc(),
          periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
      }
    
    private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
    

      /** Keep cleaning RDD, shuffle, and broadcast state. */
      private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
        while (!stopped) {
          try {
    		//从referenceQueue中取可以回收的弱引用对象,弱引用对象返回表示登记的rdd已经可回收了
            val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
              .map(_.asInstanceOf[CleanupTaskWeakReference])
            // Synchronize here to avoid being interrupted on stop()
            synchronized {
              reference.foreach { ref =>
                logDebug("Got cleaning task " + ref.task)
    			//清除强引用
                referenceBuffer.remove(ref)
                ref.task match {
                  case CleanRDD(rddId) =>
                    doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                  case CleanShuffle(shuffleId) =>
                    doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                  case CleanBroadcast(broadcastId) =>
                    doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                  case CleanAccum(accId) =>
                    doCleanupAccum(accId, blocking = blockOnCleanupTasks)
                  case CleanCheckpoint(rddId) =>
                    doCleanCheckpoint(rddId) //如果任务是cleancheckpoint任务
                }
              }
            }
          } catch {
            case ie: InterruptedException if stopped => // ignore
            case e: Exception => logError("Error in cleaning thread", e)
          }
        }
      }
    

      /**
       * Clean up checkpoint files written to a reliable storage.
       * Locally checkpointed files are cleaned up separately through RDD cleanups.
       */
      def doCleanCheckpoint(rddId: Int): Unit = {
        try {
          logDebug("Cleaning rdd checkpoint data " + rddId)
    	  //删除checkpoint操作被触发
          ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
          listeners.asScala.foreach(_.checkpointCleaned(rddId))
          logInfo("Cleaned rdd checkpoint data " + rddId)
        }
        catch {
          case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
        }
      }
    

    特殊操作的意思

    为什么要定时执行System.gc()去触发full gc?

    • 由于删除rdd checkpoint的方法利用了WeakReference,它是一个严重依赖gc的功能,如果没有gc,就不会发现对象可回收,也就不会触发回收逻辑。
    • 极端情况可能出现长时间只有yong gc,而老年区的对象长时间无法回收,而对象早已无其他引用,利用System.gc()来尝试执行full gc,达到回收老年代的目的

    总结

    • 默认情况下,保存的文件一直都会放在dfs中,除非人工删除
    • 及时开启spark.cleaner.referenceTracking.cleanCheckpoints,也不能意味着一定能回收,因为垃圾回收并非一定会在合适的时间执行,有可能最终也没有触发弱引用清理任务逻辑
  • 相关阅读:
    HDU4529 郑厂长系列故事——N骑士问题 —— 状压DP
    POJ1185 炮兵阵地 —— 状压DP
    BZOJ1415 聪聪和可可 —— 期望 记忆化搜索
    TopCoder SRM420 Div1 RedIsGood —— 期望
    LightOJ
    LightOJ
    后缀数组小结
    URAL
    POJ3581 Sequence —— 后缀数组
    hdu 5269 ZYB loves Xor I
  • 原文地址:https://www.cnblogs.com/windliu/p/10983334.html
Copyright © 2011-2022 走看看