zoukankan      html  css  js  c++  java
  • 【源码】Flink 算子 chain 在一起的条件

    Flink 任务的图结构中,部分算子是 chain 在一起的,因为 chain 在一起有很多好处(减少序列化和网络开销,提高效率),而算子 chain 在一起是需要条件的

    Flink 任务在生成 JobGraph 的时候,会加入 chain 的概念,会判断算子能否 chain 在一起

    首先在 env 中不能 禁用 算子chain

    env.disableOperatorChaining()

    其次chain 在一起的算子,还需要满足如下 7 个条件:

    1、下游节点只有一个输入边
    2、上下游算子在同一个 slotSharingGroup
    3、上下游算子可以 chain 在一起
    4、上下游算子数据分发策略是 ForwardPartitioner
    5、上下游算子数据 shuffleMode 不是 BATCH(是: PIPELINED,UNDEFINED)
    6、上下游算子并发度一样
    7、StreamGraph 允许 算子 chain 在一起 (env 允许)

    // 检测上下游是否能 chain 在一起
    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
      StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
      StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
      // 下游节点输入边为 1
      return downStreamVertex.getInEdges().size() == 1
        // 上下游是同一个 sharingGroup
        && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
        // 上下游算子策略能 chain 在一起
        && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
        // 上下游边的分区策略是 ForwardPartitioner
        && (edge.getPartitioner() instanceof ForwardPartitioner)
        // shuffleMode 不是 batch
        && edge.getShuffleMode() != ShuffleMode.BATCH
        // 上下游并发度一样
        && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
        // streamGraph 是可以 chain
        && streamGraph.isChainingEnabled();
    }
    
    @VisibleForTesting
    static boolean areOperatorsChainable(
      StreamNode upStreamVertex,
      StreamNode downStreamVertex,
      StreamGraph streamGraph) {
      StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
      StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
      // 上下游算子 有工厂类
      if (downStreamOperator == null || upStreamOperator == null) {
        return false;
      }
      // 上游的策略是 NEVER , 下游的策略不是 ALWAYS
      if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
        downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
        return false;
      }
    
      // yielding operators cannot be chained to legacy sources
      if (downStreamOperator instanceof YieldingOperatorFactory) {
        // unfortunately the information that vertices have been chained is not preserved at this point
        return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource();
      }
      // 其他 都是可以 chain 的
      return true;
    }

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    AspDotNetStorefront客户化开始
    "超时时间已到。在操作完成之前超时时间已过或服务器未响应。"另一个原因
    转:只打开一个窗口和关闭窗口而不出现提示
    .net 数据格式设置
    SQLServer导出数据表中数据的存储过程
    游标、临时表、嵌套游标使用一列
    转:将图片转换成16进制的代码写入文本
    根据文件后缀返回Http的ContentType类型的函数
    正确配置p6spy后没有日志输出的一个可能的原因
    C99 声明 + 表达式 + 词法 部分Grammar
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14110351.html
Copyright © 2011-2022 走看看