zoukankan      html  css  js  c++  java
  • Flink History Job


    history job的写入
    1. org.apache.flink.runtime.jobmanager,Object JobManager
    runJobManager中指定使用MemoryArchivist进行作业保存
    startJobManagerActors中创建了进行作业保存的actor
    此archive的actor会被传入jobmanager的actor

    2. org.apache.flink.runtime.jobmanager,Class JobManager
    handleMessage中接收到JobStatusChanged的msg之后会根据逻辑判断调用removeJob
     接收到RemoveJob消息后,会调用removeJob
     接收到RemoveCachedJob的时候,会调用removeJob
     在SubmitJob的时候如果发现没有leader,会调用removeJob
    3.MemoryArchivist
    handleMessage中的  调用进行持久化的函数
    archiveJsonFiles中的  传入路径path和执行图graph调用FsJobArchivist进行持久化

    4.FsJobArchivist
    archiveJob(Path rootPath, AccessExecutionGraph graph)
    rootPath是配置的路径
    graph是作业的执行图
    archiveJob中首先调用WebMonitorUtils.getJsonArchivists()获取持久化的json类型,实际调用的是WebRuntimeMonitor.getJsonArchivists
    目前的类型包括
    new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),//joboverview

    new JobPlanHandler.JobPlanJsonArchivist(),//jobs/:jobid/plan
    new JobConfigHandler.JobConfigJsonArchivist(),//jobs/:jobid/config
    new JobExceptionsHandler.JobExceptionsJsonArchivist(),//jobs/:jobid/exceptions
    new JobDetailsHandler.JobDetailsJsonArchivist(),//jobs/:jobid,//jobs/:jobid/vertices
    new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),//jobs/:jobid/accumulators

    new CheckpointStatsHandler.CheckpointStatsJsonArchivist(),//jobs/:jobid/checkpoints
    new CheckpointConfigHandler.CheckpointConfigJsonArchivist(),//jobs/:jobid/checkpoints/config
    new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid
    new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid

    new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid
    new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasktimes
    new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),//jobs/:jobid/vertices/:vertexid/taskmanagers
    new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/accumulators
    new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/accumulators

    new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum,//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt,
    new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators

    上面所有的archivist都继承于JsonArchivist
    其中只有一个接口 Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException
    其从graph中获取相应的信息 组装成ArchivedJson,ArchivedJson的定义如下
    public ArchivedJson(String path, String json) {
    this.path = Preconditions.checkNotNull(path);
    this.json = Preconditions.checkNotNull(json);
    }
    其中path指定存储的位置,json指定存储的内容

    如果要新定义restful接口,则可以在上面增加JsonArchivist类型
    如果只是要在已有的restful接口中增加字段,则可以修改上述的类型

    5.上述流程走完之后,每个job会在hdfs上生成一个json文件,包含各种路径、指明对应的维度


    History Job的读取
    org.apache.flink.runtime.webmonitor.history
    1.HistoryServer,负责历史作业的存储和展示,包含一个HistoryServerArchiveFetcher对象,此对象使用“刷新间隔,拉取路径,本地临时地址,”
    2.HistoryServerArchiveFetcher根据指定的时间间隔,在单独的线程中调用JobArchiveFetcherTask获取的任务
    3.JobArchiveFetcherTask是一个线程类,从指定的目录中不断的拉取数据,存入本地指定的路径;如果设置了每次拉取之后更新joboverview,则在拉取完毕之后进行joboverview的更新
    4.org.apache.flink.runtime.history
    调用FsJobArchivist中的Collection<ArchivedJson> getArchivedJsons(Path file)来获取数据,path指定存储的位置,返回该位置的所有Json数据

    5.上述流程完毕之后,会在本地临时目录每个job创建一个目录,目录中有很多子目录,分门别类的保存了各种的json文件

    文件保存

    从上述的过程中,在jobmanager写入文件的时候,是不考虑频繁读取的,所以写成了一个大文件,也符合hdfs的要求,但是在history server的保存中,如上的在hdfs中的一个文件被安装路径和维度被拆成了很多个json文件,也是为了在UI上便于展示。

  • 相关阅读:
    Log4Net的使用之winform
    开源一个跨平台运行的服务插件
    定时管理器框架-Task.MainForm
    nginx+iis+redis+Task.MainForm构建分布式架构 之 (redis存储分布式共享的session及共享session运作流程)
    关于SQL查询效率,100w数据,查询只要1秒
    写的一般,从起源到具体算法-深度学习综述
    如何避免SHRINKDATABASE & SHRINKFILE 产生索引碎片(转载)
    在windows service中启动类型“Automatic” 和 “Automatic (Delayed start)” 有何不同?
    C# 对WinForm应用程序的App.config的加密
    SQL Server中怎么查看每个数据库的日志大小,以及怎么确定数据库的日志文件,怎么用语句收缩日志文件
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/9355398.html
Copyright © 2011-2022 走看看