zoukankan      html  css  js  c++  java
  • Flink 源码(二十):Flink 任务调度机制(一)Graph 的概念 与StreamGraph 在 Client 生成

    0 简介

    1 Graph 的概念 

    1.1 概述
      Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
      StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
      JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节
    点之间流动所需要的序列化/反序列化/传输消耗。
      ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是JobGraph 的并行化版本,是调度层最核心的数据结构。
      物 理 执 行 图 : JobManager 根 据 ExecutionGraph 对 Job 进 行 调 度 后 , 在 各 个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
      例如 example 里的 SocketTextStreamWordCount 并发度为 2(Source 为 1 个并发度)的四层执行图的演变过程如下图所示:

     

     

    1.2 名词解释: 

      1)StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。
      (1)StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
      (2)StreamEdge:表示连接两个 StreamNode 的边。
      2)JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。
      (1)JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个 JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是
    IntermediateDataSet。
      (2)IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
      (3)JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。
      3)ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是 JobGraph 的并行化版本,是调度层最核心的数据结构。
      ( 1 ) ExecutionJobVertex : 和 JobGraph 中 的 JobVertex 一 一 对 应 。 每 一 个ExecutionJobVertex 都有和并发度一样多的 ExecutionVertex。
      (2)ExecutionVertex:表示 ExecutionJobVertex 的其中一个并发子任务,输入是ExecutionEdge,输出是 IntermediateResultPartition。
      (3)IntermediateResult:和 JobGraph 中的 IntermediateDataSet 一一对应。一个IntermediateResult 包含多个 IntermediateResultPartition,其个数等于该 operator 的并发度。
      (4)IntermediateResultPartition:表示 ExecutionVertex 的一个输出分区,producer 是ExecutionVertex,consumer 是若干个 ExecutionEdge。
      (5)ExecutionEdge:表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target 是 ExecutionVertex。source 和 target 都只能是一个。
      (6)Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过ExecutionAttemptID 来唯一标识。JM 和 TM 之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
      从这些基本概念中,也可以看出以下л点:
      ⚫ 由于每个 JobVertex 可能有多个 IntermediateDataSet,所以每个 ExecutionJobVertex可能有多个 IntermediateResult,因此,每个 ExecutionVertex 也可能会包含多个
    IntermediateResultPartition;
      ⚫ ExecutionEdge 这里主要的作֌是把 ExecutionVertex 和 IntermediateResultPartition连接起来,表示它们之间的连接关系。
      4)物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
      (1)Task:Execution 被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
      (2)ResultPartition:代表由一个 Task 的生成的数据,和 ExecutionGraph 中的IntermediateResultPartition 一一对应。
      (3)ResultSubpartition:是 ResultPartition 的一个子分区。每个 ResultPartition 包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。
      (4)InputGate:代表 Task 的输入封装,和 JobGraph 中 JobEdge 一一对应。每个 InputGate消费了一个或多个的 ResultPartition。
      (5)InputChannel:每个 InputGate 会包含一个以上的 InputChannel,和 ExecutionGraph中的 ExecutionEdge 一一对应,也和 ResultSubpartition 一对一地相连,即一个 InputChannel
    接收一个 ResultSubpartition 的输出。 

    2 StreamGraph 在 Client 生成

      调用用户代码中的 StreamExecutionEnvironment.execute()
    StreamExecutionEnvironment.java
     

     

      一个关键的参数是 List<Transformation<?>> transformations。Transformation 代表了从一个或多个 DataStream 生成新 DataStream 的操作。DataStream 的底层其实就是一个Transformation,描述了这个 DataStream 是怎么来的。
      DataStream 上常见的 transformation 有 map、flatmap、filter 等。这些 transformation 会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。以 map 为例,分析
    List<Transformation<?>> transformations 的数据: 
    DataStream.java

     

     

     

      从上方代码可以了解到,map 转换将用户自定义的函数 MapFunction 包装到 StreamMap这个 Operator 中,再将 StreamMap 包装到 OneInputTransformation,最后该 transformation 存
    到 env 中,当调用 env.execute 时,遍历其中的 transformation 集合构造出 StreamGraph。其分层实现如下图所示: 
      另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition 等。如下图所示的转换树,在运行时
    会优化成下方的操作图。
      union、split/select(1.12 已移除)、partition 中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现 UnionTransformation , SplitTransformation(1.12 移除) ,
    SelectTransformation(1.12 移除),PartitionTransformation 由于不包含具体的操作所以都没有 StreamOperator 成员变量,而其他 StreamTransformation 的子类基本上都有。
      接着分析 StreamGraph 生成的源码:
      StreamExecutionEnvironment.java -> generator() -> transform()

     

     

    SimpleTransformationTranslator.java
    AbstractOneInputTransformationTranslator.java

     

     

      该函数首先会对该 transform 的上游 transform 进行递归转换,确保上游的都已经完成了转化。然后通过 transform 构造出 StreamNode,最后与上游的 transform 进行连接,构造
    出 StreamNode。
      最后再来看下对逻辑转换(partition、union 等)的处理,如下是 transformPartition 函数的源码:
    PartitionTransformationTranslator.java

     

     

     

      对 partition 的转换没有生成具体的 StreamNode 和 StreamEdge,而是添加一个虚节点。当 partition 的下游 transform(如 map)添加 edge 时(调用 StreamGraph.addEdge),会把
    partition 信息写入到 edge 中。接前面 map 的流程: 
    AbstractOneInputTransformationTranslator.java -> translateInternal() 

     

     

     

     

    实例分析:
      看一个实例:如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区 shuffle。分析该程序是如何生成 StreamGraph 的。

    首先会在 env 中生成一棵 transformation 树,用 List<Transformation<?>>保存。其结构图如下: 

      其中符号*为 input 指针,指向上游的 transformation,从而形成了一棵 transformation树。然后,通过调用 StreamGraphGenerator.generate(env, transformations)来生成
    StreamGraph。自底向上递归调用每一个 transformation,也就是说处理顺序是
    Source->FlatMap->Shuffle->Filter->Sink。
    如上图所示:
    1)首先处理的 Source,生成了 Source 的 StreamNode。
    2)然后处理的 FlatMap,生成了 FlatMap 的 StreamNode,并生成 StreamEdge 连接上游
    Source 和 FlatMap。由于上下游的并发度不一样(1:4),所以此处是 Rebalance 分区。
    3)然后处理的 Shuffle,由于是逻辑转换,并不会生成实际的节点。将 partitioner 信息暂存在 virtuaPartitionNodes 中。
    4)在处理 Filter 时,生成了 Filter 的 StreamNode。发现上游是 shuffle,找到 shuffle 的上游FlatMap,创建 StreamEdge 与 Filter 相连。并把 ShufflePartitioner 的信息写到 StreamEdge
    中。
    5)最后处理 Sink,创建 Sink 的 StreamNode,并生成 StreamEdge 与上游 Filter 相连。由于
    上下游并发度一样(4:4),所以此处选择 Forward 分区。
    最后可以通过 UI 可视化 来观察得到的 StreamGraph。 
     
  • 相关阅读:
    28完全背包+扩展欧几里得(包子凑数)
    HDU 3527 SPY
    POJ 3615 Cow Hurdles
    POJ 3620 Avoid The Lakes
    POJ 3036 Honeycomb Walk
    HDU 2352 Verdis Quo
    HDU 2368 Alfredo's Pizza Restaurant
    HDU 2700 Parity
    HDU 3763 CDs
    POJ 3279 Fliptile
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14508696.html
Copyright © 2011-2022 走看看