zoukankan      html  css  js  c++  java
  • Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

    3 JobGraph 在 Client 生成 

    StreamGraph 转变成 JobGraph 也是在 Client 完成,主要作了三件事:
    ⚫ StreamNode 转成 JobVertex。
    ⚫ StreamEdge 转成 JobEdge。
    ⚫ JobEdge 和 JobVertex 之间创建 IntermediateDataSet 来连接。
    从 1.2.6 接着进行源码分析,看 execute 里的逻辑(yarn-per-job 为例):
    AbstractJobClusterExecutor.java
    PipelineExecutorUtils.java
    FlinkPipelineTranslationUtil.java
    StreamGraphTranslator.java
    StreamGraph.java
    StreamingJobGraphGenerator.java
    看一下核心类 StreamingJobGraphGenerator 的相关属性:

     

    核心逻辑:根据 StreamGraph,生成 JobGraph: 

     

     

     

      StreamingJobGraphGenerator 的成员变量都是为了辅助生成最终的 JobGraph。
      为所有节点生成一个唯一的 hash id,如果节点在多次提交中没有改变(包括并发度、上下游等),那么这个 id 就不会改变,这主要用于故障恢复。
      这里不能用 StreamNode.id 来代替,因为这是一个从 1 开始的静态计数变量,同样的 Job可能会得到不一样的 id,如下代码示例的两个 job 是完全一样的,但是 source 的 id 却不一
    样了。 
    看一下最关键的 chaining 处理:

     

     

     

     

     

     

      每个 JobVertex 都会对应一个可序列化的 StreamConfig, 用来发送给 JobManager 和TaskManager。最后在 TaskManager 中起 Task 时,需要从这里面反序列化出所需要的配置信息, 其中就包括了含有用户代码的 StreamOperator。
      setChaining 会对 source 调用 createChain 方法,该方法会递归调用下游节点,从而构建出 node chains。createChain 会分析当前节点的出边,根据 Operator Chains 中的 chainable 条
    件,将出边分成 chainalbe 和 noChainable 两类,并分别递归调用自身方法。之后会将StreamNode 中的配置信息序列化到 StreamConfig 中。如果当前不是 chain 中的子节点,则会构建 JobVertex 和 JobEdge 相连。如果是 chain 中的子节点,则会将 StreamConfig 添加到该chain 的 config 集合中。一个 node chains,除了 headOfChain node 会生成对应的 JobVertex,其余的 nodes 都是以序列化的形式写入到 StreamConfig 中,并保存到 headOfChain 的CHAINED_TASK_CONFIG 配置项中。直到部署时,才会取出并生成对应的 ChainOperators。 

     

     
  • 相关阅读:
    HashMap 和 Hashtable 的区别
    提高利用运行(安装)内存
    MyEclipse、Hbuilder、Idea快捷键
    本地安装MySQL详细教程
    MyEclipse/Eclipse相关设置
    MyEclipse 10导入JDK1.7或1.8
    Oracle视图(和Mysq一样l)
    Oracle事务
    MySql综合知识汇总
    Mysql存储过程
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14508706.html
Copyright © 2011-2022 走看看