zoukankan      html  css  js  c++  java
  • 【源码】Flink 算子 chain 在一起的条件

    Flink 任务的图结构中,部分算子是 chain 在一起的,因为 chain 在一起有很多好处(减少序列化和网络开销,提高效率),而算子 chain 在一起是需要条件的

    Flink 任务在生成 JobGraph 的时候,会加入 chain 的概念,会判断算子能否 chain 在一起

    首先在 env 中不能 禁用 算子chain

    env.disableOperatorChaining()

    其次chain 在一起的算子,还需要满足如下 7 个条件:

    1、下游节点只有一个输入边
    2、上下游算子在同一个 slotSharingGroup
    3、上下游算子可以 chain 在一起
    4、上下游算子数据分发策略是 ForwardPartitioner
    5、上下游算子数据 shuffleMode 不是 BATCH(是: PIPELINED,UNDEFINED)
    6、上下游算子并发度一样
    7、StreamGraph 允许 算子 chain 在一起 (env 允许)

    // 检测上下游是否能 chain 在一起
    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
      StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
      StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
      // 下游节点输入边为 1
      return downStreamVertex.getInEdges().size() == 1
        // 上下游是同一个 sharingGroup
        && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
        // 上下游算子策略能 chain 在一起
        && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
        // 上下游边的分区策略是 ForwardPartitioner
        && (edge.getPartitioner() instanceof ForwardPartitioner)
        // shuffleMode 不是 batch
        && edge.getShuffleMode() != ShuffleMode.BATCH
        // 上下游并发度一样
        && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
        // streamGraph 是可以 chain
        && streamGraph.isChainingEnabled();
    }
    
    @VisibleForTesting
    static boolean areOperatorsChainable(
      StreamNode upStreamVertex,
      StreamNode downStreamVertex,
      StreamGraph streamGraph) {
      StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
      StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
      // 上下游算子 有工厂类
      if (downStreamOperator == null || upStreamOperator == null) {
        return false;
      }
      // 上游的策略是 NEVER , 下游的策略不是 ALWAYS
      if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
        downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
        return false;
      }
    
      // yielding operators cannot be chained to legacy sources
      if (downStreamOperator instanceof YieldingOperatorFactory) {
        // unfortunately the information that vertices have been chained is not preserved at this point
        return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource();
      }
      // 其他 都是可以 chain 的
      return true;
    }

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    [省选联考 2020 A 卷] 组合数问题
    [HAOI2018]苹果树
    [集训队作业2013]城市规划
    多项式牛顿迭代
    多项式开方
    分治 FFT 模板的三种过法
    Graham 求静态凸包
    exp 初探
    HAOI2018 染色
    如何关闭wps热点,如何关闭wpscenter,如何关闭我的wps
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14110351.html
Copyright © 2011-2022 走看看