zoukankan      html  css  js  c++  java
  • Spark如何删除无效rdd checkpoint

    spark可以使用checkpoint来作为检查点,将rdd的数据写入hdfs文件,也可以利用本地缓存子系统。
    当我们使用checkpoint将rdd保存到hdfs文件时,如果任务的临时文件长时间不删除,长此以往,hdfs会出现很多没有用的文件,spark也考虑到了这一点,因此,用了一些取巧的方式来解决这种问题。

    spark config:

    spark.cleaner.referenceTracking.cleanCheckpoints = 默认false
    

    也就是说默认情况下,保存的文件一直都会放在dfs中,除非人工删除
    下述内容均建立在值为true的情况下

    设置检查点路径

    spark.sparkContext().setCheckpointDir("hdfs://nameservice1/xx/xx");
    

    存放到hdfs文件系统的好处是自带高容错性、可用性。
    那么,所有运行的任务都写这个路径会不会出现覆盖的情况呢?答案是不会

      /**
       * Set the directory under which RDDs are going to be checkpointed.
       * @param directory path to the directory where checkpoint files will be stored
       * (must be HDFS path if running in cluster)
       */
      def setCheckpointDir(directory: String) {
    
        // If we are running on a cluster, log a warning if the directory is local.
        // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
        // its own local file system, which is incorrect because the checkpoint files
        // are actually on the executor machines.
        if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
          logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
            s"must not be on the local filesystem. Directory '$directory' " +
            "appears to be on the local filesystem.")
        }
    
        checkpointDir = Option(directory).map { dir =>
    	  //利用uuid生成了一个子目录,存放的rdd文件将放到子目录中	
          val path = new Path(dir, UUID.randomUUID().toString)
          val fs = path.getFileSystem(hadoopConfiguration)
          fs.mkdirs(path)
          fs.getFileStatus(path).getPath.toString
        }
      }
    

    利用uuid的唯一性,使不同的进程间的checkpoint互不干扰,后续有checkpoint创建的请求时,将会在该目录下创建文件来保存rdd的内容

    在生成checkpoint的ReliableRDDCheckpointData 方法中,

    保存检查点

      /**
       * Materialize this RDD and write its content to a reliable DFS.
       * This is called immediately after the first action invoked on this RDD has completed.
       */
      protected override def doCheckpoint(): CheckpointRDD[T] = {
        //写入到可靠的文件中
        val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    
        // Optionally clean our checkpoint files if the reference is out of scope
    	//默认false,才会注册清理器
        if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
          rdd.context.cleaner.foreach { cleaner =>
    		//注册清理事件
            cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
          }
        }
    
        logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
        newRDD
      }
    

    注册事件

    注册清理事件的意义是当rdd对象无其他引用依赖时,由清理线程异步清理对应的checkpoint文件

      /** Register a RDDCheckpointData for cleanup when it is garbage collected. */
      def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
        registerForCleanup(rdd, CleanCheckpoint(parentId))
      }
    
      /** Register an object for cleanup. */
      private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
        referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
      }
    

    referenceBuffer的作用是持有CleanupTaskWeakReference对象的引用,防止CleanupTaskWeakReference被提前回收,导致提前清理。

    弱引用对象

    CleanupTaskWeakReference继承自WeakReference,将referent(也就是rdd),绑定到referenceQueue上,如果gc回收时,发现referent除了referenceQueue这个弱引用外,已经没有其他对象引用,就会将CleanupTaskWeakReference对应放入referenceQueue中

    //引用队列,当garbage collector发现对应的可达性改变被发现时,会将引用对象推入队列中
    //这是通过Reference.enqueue方法实现的 public boolean enqueue() {return this.queue.enqueue(this);}
    private val referenceQueue = new ReferenceQueue[AnyRef]
    
    /**
     * A WeakReference associated with a CleanupTask.
     *
     * When the referent object becomes only weakly reachable, the corresponding
     * CleanupTaskWeakReference is automatically added to the given reference queue.
     */
    private class CleanupTaskWeakReference(
        val task: CleanupTask,
        referent: AnyRef,
        referenceQueue: ReferenceQueue[AnyRef])
      extends WeakReference(referent, referenceQueue)
    

    回收线程

    再来细致的讲回收线程
    在SparkContext初始化时,会启动cleaner,代码较多,直接依次

    _cleaner =
      if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())
    

      /** Start the cleaner. */
      def start(): Unit = {
        cleaningThread.setDaemon(true) //守护进程
        cleaningThread.setName("Spark Context Cleaner")
        cleaningThread.start()
    	//这里有点银弹的意思,定时执行gc,默认半小时一次,主要是应对长时间任务问题
        periodicGCService.scheduleAtFixedRate(() => System.gc(),
          periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
      }
    
    private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
    

      /** Keep cleaning RDD, shuffle, and broadcast state. */
      private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
        while (!stopped) {
          try {
    		//从referenceQueue中取可以回收的弱引用对象,弱引用对象返回表示登记的rdd已经可回收了
            val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
              .map(_.asInstanceOf[CleanupTaskWeakReference])
            // Synchronize here to avoid being interrupted on stop()
            synchronized {
              reference.foreach { ref =>
                logDebug("Got cleaning task " + ref.task)
    			//清除强引用
                referenceBuffer.remove(ref)
                ref.task match {
                  case CleanRDD(rddId) =>
                    doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                  case CleanShuffle(shuffleId) =>
                    doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                  case CleanBroadcast(broadcastId) =>
                    doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                  case CleanAccum(accId) =>
                    doCleanupAccum(accId, blocking = blockOnCleanupTasks)
                  case CleanCheckpoint(rddId) =>
                    doCleanCheckpoint(rddId) //如果任务是cleancheckpoint任务
                }
              }
            }
          } catch {
            case ie: InterruptedException if stopped => // ignore
            case e: Exception => logError("Error in cleaning thread", e)
          }
        }
      }
    

      /**
       * Clean up checkpoint files written to a reliable storage.
       * Locally checkpointed files are cleaned up separately through RDD cleanups.
       */
      def doCleanCheckpoint(rddId: Int): Unit = {
        try {
          logDebug("Cleaning rdd checkpoint data " + rddId)
    	  //删除checkpoint操作被触发
          ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
          listeners.asScala.foreach(_.checkpointCleaned(rddId))
          logInfo("Cleaned rdd checkpoint data " + rddId)
        }
        catch {
          case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
        }
      }
    

    特殊操作的意思

    为什么要定时执行System.gc()去触发full gc?

    • 由于删除rdd checkpoint的方法利用了WeakReference,它是一个严重依赖gc的功能,如果没有gc,就不会发现对象可回收,也就不会触发回收逻辑。
    • 极端情况可能出现长时间只有yong gc,而老年区的对象长时间无法回收,而对象早已无其他引用,利用System.gc()来尝试执行full gc,达到回收老年代的目的

    总结

    • 默认情况下,保存的文件一直都会放在dfs中,除非人工删除
    • 及时开启spark.cleaner.referenceTracking.cleanCheckpoints,也不能意味着一定能回收,因为垃圾回收并非一定会在合适的时间执行,有可能最终也没有触发弱引用清理任务逻辑
  • 相关阅读:
    .NET面试题解析(07)-多线程编程与线程同步
    .NET面试题解析(06)-GC与内存管理
    .NET面试题解析(05)-常量、字段、属性、特性与委托
    .NET面试题解析(04)-类型、方法与继承
    .NET面试题解析(03)-string与字符串操作
    .NET面试题解析(02)-拆箱与装箱
    .NET面试题解析(01)-值类型与引用类型
    StackExchange.Redis使用配置
    X--名称空间详解
    深入浅出话资源
  • 原文地址:https://www.cnblogs.com/windliu/p/10983334.html
Copyright © 2011-2022 走看看