zoukankan      html  css  js  c++  java
  • Spark源代码阅读笔记之MetadataCleaner

    MetadataCleaner执行定时任务周期性的清理元数据(metadata),有6种类型的元数据:MAP_OUTPUT_TRACKER。executor跟踪各个map任务输出的存储位置的数据,依据spark.cleaner.ttl.MAP_OUTPUT_TRACKER设置清理时间,默认值为-1,表示不清理。SPARK_CONTEXT。SparkContext中记录缓存到内存中的RDD的数据结构。依据spark.cleaner.ttl.SPARK_CONTEXT设置清理时间,默认值为-1,表示不清理;;HTTP_BROADCAST。採用http方式广播broadcast的元数据。依据spark.cleaner.ttl.HTTP_BROADCAST设置清理时间,默认值为-1。表示不清理;;BLOCK_MANAGER,BlockManager中非Broadcast类型的Block数据,依据spark.cleaner.ttl.BLOCK_MANAGER设置清理时间,默认值为-1。表示不清理。;SHUFFLE_BLOCK_MANAGER。shuffle输出的数据。依据spark.cleaner.ttl.SHUFFLE_BLOCK_MANAGER设置清理时间。默认值为-1,表示不清理;;BROADCAST_VARS,Torrent方式广播broadcast的元数据,底层依赖于BlockManager,依据spark.cleaner.ttl.BROADCAST_VARS设置清理时间,默认值为-1,表示不清理。

    Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)

    MetadataCleanerMetadataCleanerType枚举类型来记录须要清理的6种元数据:

    object MetadataCleanerType extends Enumeration {
    
      val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, BLOCK_MANAGER,
      SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
    
      type MetadataCleanerType = Value
    
      def systemProperty(which: MetadataCleanerType.MetadataCleanerType) =
          "spark.cleaner.ttl." + which.toString
    }

    MetadataCleaner属性

    • cleanerTypeMetadataCleanerType
      清理的元数据类型

    • name:String = cleanerType.toString

    • delaySeconds:Int
      表示数据多少秒过期,值为conf.get(“spark.cleaner.ttl.” + cleanerType.toString, conf.getInt(“spark.cleaner.ttl”, -1).toString).toInt

    • periodSeconds:Int = math.max(10, delaySeconds / 10)
      清理周期。即以periodSeconds的间隔周期性的调用清理函数来推断数据是否过期

    • cleanupFunc:(Long) => Unit
      清理函数。MetadataCleaner以periodSeconds为间隔周期性的调用该函数,并把System.currentTimeMillis() - (delaySeconds * 1000)传给该函数,因此该函数须要实现的逻辑是推断数据存储的时间戳是否小于传入的參数。若小于则表明过期,需清理;否则没有过期。

    • timer:Timer = new Timer(name + ” cleanup timer”, true)
      定时调度器

    • task:TimerTask
      清理任务

    task = new TimerTask {
        override def run() {
          try {
            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
            logInfo("Ran metadata cleaner for " + name)
          } catch {
            case e: Exception => logError("Error running cleanup task for " + name, e)
          }
        }
      }

    MetadataCleaner代码:

    class MetadataCleaner(
        cleanerType: MetadataCleanerType.MetadataCleanerType,
        cleanupFunc: (Long) => Unit,
        conf: SparkConf)
      extends Logging
    {
      val name = cleanerType.toString
    
      private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
      private val periodSeconds = math.max(10, delaySeconds / 10)
      private val timer = new Timer(name + " cleanup timer", true)
    
    
      private val task = new TimerTask {
        override def run() {
          try {
            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
            logInfo("Ran metadata cleaner for " + name)
          } catch {
            case e: Exception => logError("Error running cleanup task for " + name, e)
          }
        }
      }
    
      if (delaySeconds > 0) {
        logDebug(
          "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
          "and period of " + periodSeconds + " secs")
        timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
      }
    
      def cancel() {
        timer.cancel()
      }
    }
  • 相关阅读:
    JSX
    打包相关
    SpringMVC:数据绑定入门(-)
    转:String,StringBuffer与StringBuilder的区别??
    Spring,@Controller,@RequestMapping, @ResponseBody,@RequestParam
    spring:org.springframework.web.servlet.DispatcherServlet noHandlerFound解决方法
    IDEA: 遇到问题Error during artifact deployment. See server log for details解决方法
    【Quick-Cocos2d-x笔记】【一】Mac环境及相关配置
    Cocos2dx学习之---TableView实现二级菜单
    Cocos2dx学习之---使用Sqlite3的笔记
  • 原文地址:https://www.cnblogs.com/blfbuaa/p/7341247.html
Copyright © 2011-2022 走看看