zoukankan      html  css  js  c++  java
  • 业余草教你解读Spark源码阅读之HistoryServer

    HistoryServer服务可以让用户通过Spark UI界面,查看历史应用(已经执行完的应用)的执行细节,比如job信息、stage信息、task信息等,该功能是基于spark eventlogs日志文件的,所以必须打开eventlogs日志开关,关于日志开关的打开和HistoryServer服务的启动方法这里不再讲述,下面进入正题

    下面使用的spark版本是2.0.2

    类结构图 
    Web相关 
    这里写图片描述 
    数据流相关 
    这里写图片描述

    相关类及特质 
    WebUI 
    Web Server服务中UI层次结构的最顶层。每一个WebUI包含了一个tabs的集合,而每一个tab又包含了一个pages的集合。tabs页是可选的,而且WebUI也可以直接添加page 
    继承该特质的有SparkUI、MasterWebUI、WorkerWebUI和HistoryServer,在这里我们主要介绍HistoryServer

    WebUITab 
    一个tab包含了一个pages的集合。prefix通过追加到parent的url组成一个完整的url path,而且不能包含斜杠 
    继承该特质有JobsTab、StagesTab、ExecutorsTab、StorageTab等(这里没有列全),对应于Spark UI界面上的Jobs、Stages、Executors、Storage等Tab页

    WebUIPage 
    一个page表示UI层次结构中的叶子节点。WebUIPage的直接父类即可以是WebUI,也可以是WebUITab。 
    如果父类是WebUI,prefix追加到parent的url形成完整的url path,如果父类是WebUITab,prefix追加到parent的prefix形成一个相对url path。Prefix中不能包含斜杠 
    继承该特质的有JobPage、StagePage、ExecutionPage、StoragePage等,对应于Tab页中具体的Page

    HistoryPage 
    继承至WebUIPage,通过render函数渲染生成history页面

    UIRoot 
    该特质被根容器(HistoryServer、SparkUI)继承,用来为它们提供获取application信息的统一接口

    HistoryServer

    def main(argStrings: Array[String]): Unit = {
      ……
      val providerName = conf.getOption("spark.history.provider")
        .getOrElse(classOf[FsHistoryProvider].getName())
      val provider = Utils.classForName(providerName)
        .getConstructor(classOf[SparkConf])
        .newInstance(conf)
        .asInstanceOf[ApplicationHistoryProvider]
      val port = conf.getInt("spark.history.ui.port", 18080)
      val server = new HistoryServer(conf, provider, securityManager, port)
      server.bind()
      ShutdownHookManager.addShutdownHook { () => server.stop() }
      // Wait until the end of the world... or if the HistoryServer process is manually stopped
      while(true) { Thread.sleep(Int.MaxValue) }
    }

    HistoryServer继承至WebUI,启动的时候,会将环境配置以及provider作为成员变量来初始化HistoryServer实例,其中provider用来提供application的信息供web展示使用,HistoryServer实例化后执行bind()函数,启动jetty,将HTTP服务与web接口绑定,这时候historyserver web服务已经启动了,之后添加了关闭server钩子函数后进入无限循环等待

    在HistoryServer实例化的过程中,会执行initialize()函数,

    def initialize() {
      attachPage(new HistoryPage(this))
      attachHandler(ApiRootResource.getServletHandler(this))
      attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
      val contextHandler = new ServletContextHandler
      contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
      contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
      attachHandler(contextHandler)
    }

    在该函数中,首先通过attachPage函数在UI中添加了HistoryPage实例,该实例负责渲染生成history page,然后通过attachHandler添加了不同的handler,可以访问url路由获取对应的信息,其中ApiRootResource提供了api/vi/开头的路由,通过该路由,history page可以获取后台解析出的eventlog信息用以呈现,数据通过UIRoot提供的接口获取

    到这里,HistoryServer的Web端基本构建完成

    HsitoryServer数据缓存及获取 
    数据缓存主要通过使用google缓存机制LoadingCache实现,关于LoadingCache在Spark HistoryServer中的运用在另外一篇文章中分析

    FsHistoryProvider 
    前面完成了web结构的构建,接下来就需要提供接口获取历史application的信息来呈现,而FsHistoryProvider就是这个接口,作为成员变量传递给HistoryServer。这个类在实例化的时候,执行了initialize()函数,在该函数中,首先会检查hdfs是否处于安全模式,如果处于安全模式,则会等待至退出安全模式,如果不处于安全模式,则走进startPolling函数,在该函数中会读取配置的eventlog路径(默认为file:/tmp/spark-events,通过spark.history.fs.logDirectory配置),然后启动一个线程不断扫描该路径下的eventlog文件,将文件解析后加载到内存中供web查询使用,相关函数如下:

    private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
      .setNameFormat("spark-history-task-%d").setDaemon(true).build())
    pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
    
    if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
      pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
    }

    另外如果配置了清理开关(默认一天清理一次),则会清理内存中超时的application信息,并删除超时且已完成的文件,加载和清理这两个动作由同一个线程完成,以防止冲突。

    for (file <- logInfos) {
      tasks += replayExecutor.submit(new Runnable {
        override def run(): Unit = mergeApplicationListing(file)
      })
    }

    在checkForLogs函数中,会首先检查文件是否有更新,已经扫描过的文件保存在一个以文件名为key的映射中fileToAppInfo,如果文件不在这个映射中,或者存在这个映射中但是文件大小变大了,则将此文件加入到加载列表中,随后进行解析。解析的过程是采用一个固定线程数的线程池replayExecutor对需要加载的文件进行解析,每解析完一个文件,会将此文件的信息更新至fileToAppInfo,这个过程在mergeApplicationListing函数中完成,另外pendingReplayTasksCount中保存了等待解析的文件数目,所有文件解析完成后,更新一下解析完成时间

    private def replay(
        eventLog: FileStatus,
        appCompleted: Boolean,
        bus: ReplayListenerBus,
        eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
      val logPath = eventLog.getPath()
      logInfo(s"Replaying log path: $logPath")
      val logInput = EventLoggingListener.openEventLog(logPath, fs)
      try {
        val appListener = new ApplicationEventListener
        bus.addListener(appListener)
        bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
        appListener
      } finally {
        logInput.close()
      }
    }

    在mergeApplicationListing函数中,主要通过执行reply函数将eventlog日志文件解析出来,在该函数中,首先将ApplicationEventListener监听器加入到ReplayListenerBus实例中,ReplayListenerBus主要通过调用该实例的replay函数从eventlog记录中解析event事件,每解析一个event,都会发通知到各监听器处理event,在这里通过监听者模式将日志解析与结果处理两个过程解耦开。执行完reply函数后,也就完成了一个eventlog文件的解析,如果解析成功,则将该eventlog的信息加入到fileToAppInfo,表明已经扫描过该文件 

    在cleanLogs函数中,会在log directory中删除已经任务执行完成且超时的文件。欢迎关注业余草:www.xttblog.com;CODE大全:www.codedq.net;爱分享:www.ndislwf.com

  • 相关阅读:
    C语言实现快排
    C语言实现双向循环链表
    mysql插入数据后返回自增ID的方法
    golang flag包简单例子
    练习题 (六)
    练习题 (五)
    练习题 (四)
    练习题 (三)
    练习题 (二)
    练习题 (一)
  • 原文地址:https://www.cnblogs.com/panda2/p/7193431.html
Copyright © 2011-2022 走看看