zoukankan      html  css  js  c++  java
  • Flink 源码(二十):Flink 任务调度机制(一)Graph 的概念 与StreamGraph 在 Client 生成

    0 简介

    1 Graph 的概念 

    1.1 概述
      Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
      StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
      JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节
    点之间流动所需要的序列化/反序列化/传输消耗。
      ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是JobGraph 的并行化版本,是调度层最核心的数据结构。
      物 理 执 行 图 : JobManager 根 据 ExecutionGraph 对 Job 进 行 调 度 后 , 在 各 个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
      例如 example 里的 SocketTextStreamWordCount 并发度为 2(Source 为 1 个并发度)的四层执行图的演变过程如下图所示:

     

     

    1.2 名词解释: 

      1)StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。
      (1)StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
      (2)StreamEdge:表示连接两个 StreamNode 的边。
      2)JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。
      (1)JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个 JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是
    IntermediateDataSet。
      (2)IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
      (3)JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。
      3)ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是 JobGraph 的并行化版本,是调度层最核心的数据结构。
      ( 1 ) ExecutionJobVertex : 和 JobGraph 中 的 JobVertex 一 一 对 应 。 每 一 个ExecutionJobVertex 都有和并发度一样多的 ExecutionVertex。
      (2)ExecutionVertex:表示 ExecutionJobVertex 的其中一个并发子任务,输入是ExecutionEdge,输出是 IntermediateResultPartition。
      (3)IntermediateResult:和 JobGraph 中的 IntermediateDataSet 一一对应。一个IntermediateResult 包含多个 IntermediateResultPartition,其个数等于该 operator 的并发度。
      (4)IntermediateResultPartition:表示 ExecutionVertex 的一个输出分区,producer 是ExecutionVertex,consumer 是若干个 ExecutionEdge。
      (5)ExecutionEdge:表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target 是 ExecutionVertex。source 和 target 都只能是一个。
      (6)Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过ExecutionAttemptID 来唯一标识。JM 和 TM 之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
      从这些基本概念中,也可以看出以下л点:
      ⚫ 由于每个 JobVertex 可能有多个 IntermediateDataSet,所以每个 ExecutionJobVertex可能有多个 IntermediateResult,因此,每个 ExecutionVertex 也可能会包含多个
    IntermediateResultPartition;
      ⚫ ExecutionEdge 这里主要的作֌是把 ExecutionVertex 和 IntermediateResultPartition连接起来,表示它们之间的连接关系。
      4)物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
      (1)Task:Execution 被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
      (2)ResultPartition:代表由一个 Task 的生成的数据,和 ExecutionGraph 中的IntermediateResultPartition 一一对应。
      (3)ResultSubpartition:是 ResultPartition 的一个子分区。每个 ResultPartition 包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。
      (4)InputGate:代表 Task 的输入封装,和 JobGraph 中 JobEdge 一一对应。每个 InputGate消费了一个或多个的 ResultPartition。
      (5)InputChannel:每个 InputGate 会包含一个以上的 InputChannel,和 ExecutionGraph中的 ExecutionEdge 一一对应,也和 ResultSubpartition 一对一地相连,即一个 InputChannel
    接收一个 ResultSubpartition 的输出。 

    2 StreamGraph 在 Client 生成

      调用用户代码中的 StreamExecutionEnvironment.execute()
    StreamExecutionEnvironment.java
     

     

      一个关键的参数是 List<Transformation<?>> transformations。Transformation 代表了从一个或多个 DataStream 生成新 DataStream 的操作。DataStream 的底层其实就是一个Transformation,描述了这个 DataStream 是怎么来的。
      DataStream 上常见的 transformation 有 map、flatmap、filter 等。这些 transformation 会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。以 map 为例,分析
    List<Transformation<?>> transformations 的数据: 
    DataStream.java

     

     

     

      从上方代码可以了解到,map 转换将用户自定义的函数 MapFunction 包装到 StreamMap这个 Operator 中,再将 StreamMap 包装到 OneInputTransformation,最后该 transformation 存
    到 env 中,当调用 env.execute 时,遍历其中的 transformation 集合构造出 StreamGraph。其分层实现如下图所示: 
      另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition 等。如下图所示的转换树,在运行时
    会优化成下方的操作图。
      union、split/select(1.12 已移除)、partition 中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现 UnionTransformation , SplitTransformation(1.12 移除) ,
    SelectTransformation(1.12 移除),PartitionTransformation 由于不包含具体的操作所以都没有 StreamOperator 成员变量,而其他 StreamTransformation 的子类基本上都有。
      接着分析 StreamGraph 生成的源码:
      StreamExecutionEnvironment.java -> generator() -> transform()

     

     

    SimpleTransformationTranslator.java
    AbstractOneInputTransformationTranslator.java

     

     

      该函数首先会对该 transform 的上游 transform 进行递归转换,确保上游的都已经完成了转化。然后通过 transform 构造出 StreamNode,最后与上游的 transform 进行连接,构造
    出 StreamNode。
      最后再来看下对逻辑转换(partition、union 等)的处理,如下是 transformPartition 函数的源码:
    PartitionTransformationTranslator.java

     

     

     

      对 partition 的转换没有生成具体的 StreamNode 和 StreamEdge,而是添加一个虚节点。当 partition 的下游 transform(如 map)添加 edge 时(调用 StreamGraph.addEdge),会把
    partition 信息写入到 edge 中。接前面 map 的流程: 
    AbstractOneInputTransformationTranslator.java -> translateInternal() 

     

     

     

     

    实例分析:
      看一个实例:如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区 shuffle。分析该程序是如何生成 StreamGraph 的。

    首先会在 env 中生成一棵 transformation 树,用 List<Transformation<?>>保存。其结构图如下: 

      其中符号*为 input 指针,指向上游的 transformation,从而形成了一棵 transformation树。然后,通过调用 StreamGraphGenerator.generate(env, transformations)来生成
    StreamGraph。自底向上递归调用每一个 transformation,也就是说处理顺序是
    Source->FlatMap->Shuffle->Filter->Sink。
    如上图所示:
    1)首先处理的 Source,生成了 Source 的 StreamNode。
    2)然后处理的 FlatMap,生成了 FlatMap 的 StreamNode,并生成 StreamEdge 连接上游
    Source 和 FlatMap。由于上下游的并发度不一样(1:4),所以此处是 Rebalance 分区。
    3)然后处理的 Shuffle,由于是逻辑转换,并不会生成实际的节点。将 partitioner 信息暂存在 virtuaPartitionNodes 中。
    4)在处理 Filter 时,生成了 Filter 的 StreamNode。发现上游是 shuffle,找到 shuffle 的上游FlatMap,创建 StreamEdge 与 Filter 相连。并把 ShufflePartitioner 的信息写到 StreamEdge
    中。
    5)最后处理 Sink,创建 Sink 的 StreamNode,并生成 StreamEdge 与上游 Filter 相连。由于
    上下游并发度一样(4:4),所以此处选择 Forward 分区。
    最后可以通过 UI 可视化 来观察得到的 StreamGraph。 
     
  • 相关阅读:
    Java8新特性(转载)
    Mysql学习笔记—时间计算、年份差、月份差、天数差(转载)
    Mysql学习笔记—concat以及group_concat的用法(转载)
    Controller中返回数据总结(ResponseEntity,@ResponseBody,@ResponseStatus)
    Java BigDecimal详解
    WebMagic简介和使用
    poi根据模板导出word文档
    谈谈ConcurrentHashMap1.7和1.8的不同实现
    JVM性能调优监控工具jps、jstack、jmap、jhat、jstat、hprof使用详解
    JAVA优化技巧分享 让游戏更加的流畅
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14508696.html
Copyright © 2011-2022 走看看