zoukankan      html  css  js  c++  java
  • Flink History Job


    history job的写入
    1. org.apache.flink.runtime.jobmanager,Object JobManager
    runJobManager中指定使用MemoryArchivist进行作业保存
    startJobManagerActors中创建了进行作业保存的actor
    此archive的actor会被传入jobmanager的actor

    2. org.apache.flink.runtime.jobmanager,Class JobManager
    handleMessage中接收到JobStatusChanged的msg之后会根据逻辑判断调用removeJob
     接收到RemoveJob消息后,会调用removeJob
     接收到RemoveCachedJob的时候,会调用removeJob
     在SubmitJob的时候如果发现没有leader,会调用removeJob
    3.MemoryArchivist
    handleMessage中的  调用进行持久化的函数
    archiveJsonFiles中的  传入路径path和执行图graph调用FsJobArchivist进行持久化

    4.FsJobArchivist
    archiveJob(Path rootPath, AccessExecutionGraph graph)
    rootPath是配置的路径
    graph是作业的执行图
    archiveJob中首先调用WebMonitorUtils.getJsonArchivists()获取持久化的json类型,实际调用的是WebRuntimeMonitor.getJsonArchivists
    目前的类型包括
    new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),//joboverview

    new JobPlanHandler.JobPlanJsonArchivist(),//jobs/:jobid/plan
    new JobConfigHandler.JobConfigJsonArchivist(),//jobs/:jobid/config
    new JobExceptionsHandler.JobExceptionsJsonArchivist(),//jobs/:jobid/exceptions
    new JobDetailsHandler.JobDetailsJsonArchivist(),//jobs/:jobid,//jobs/:jobid/vertices
    new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),//jobs/:jobid/accumulators

    new CheckpointStatsHandler.CheckpointStatsJsonArchivist(),//jobs/:jobid/checkpoints
    new CheckpointConfigHandler.CheckpointConfigJsonArchivist(),//jobs/:jobid/checkpoints/config
    new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid
    new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid

    new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid
    new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasktimes
    new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),//jobs/:jobid/vertices/:vertexid/taskmanagers
    new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/accumulators
    new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/accumulators

    new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum,//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt,
    new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators

    上面所有的archivist都继承于JsonArchivist
    其中只有一个接口 Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException
    其从graph中获取相应的信息 组装成ArchivedJson,ArchivedJson的定义如下
    public ArchivedJson(String path, String json) {
    this.path = Preconditions.checkNotNull(path);
    this.json = Preconditions.checkNotNull(json);
    }
    其中path指定存储的位置,json指定存储的内容

    如果要新定义restful接口,则可以在上面增加JsonArchivist类型
    如果只是要在已有的restful接口中增加字段,则可以修改上述的类型

    5.上述流程走完之后,每个job会在hdfs上生成一个json文件,包含各种路径、指明对应的维度


    History Job的读取
    org.apache.flink.runtime.webmonitor.history
    1.HistoryServer,负责历史作业的存储和展示,包含一个HistoryServerArchiveFetcher对象,此对象使用“刷新间隔,拉取路径,本地临时地址,”
    2.HistoryServerArchiveFetcher根据指定的时间间隔,在单独的线程中调用JobArchiveFetcherTask获取的任务
    3.JobArchiveFetcherTask是一个线程类,从指定的目录中不断的拉取数据,存入本地指定的路径;如果设置了每次拉取之后更新joboverview,则在拉取完毕之后进行joboverview的更新
    4.org.apache.flink.runtime.history
    调用FsJobArchivist中的Collection<ArchivedJson> getArchivedJsons(Path file)来获取数据,path指定存储的位置,返回该位置的所有Json数据

    5.上述流程完毕之后,会在本地临时目录每个job创建一个目录,目录中有很多子目录,分门别类的保存了各种的json文件

    文件保存

    从上述的过程中,在jobmanager写入文件的时候,是不考虑频繁读取的,所以写成了一个大文件,也符合hdfs的要求,但是在history server的保存中,如上的在hdfs中的一个文件被安装路径和维度被拆成了很多个json文件,也是为了在UI上便于展示。

  • 相关阅读:
    107. Binary Tree Level Order Traversal II
    108. Convert Sorted Array to Binary Search Tree
    111. Minimum Depth of Binary Tree
    49. Group Anagrams
    使用MALTAB标定实践记录
    442. Find All Duplicates in an Array
    522. Longest Uncommon Subsequence II
    354. Russian Doll Envelopes
    opencv 小任务3 灰度直方图
    opencv 小任务2 灰度
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/9355398.html
Copyright © 2011-2022 走看看