zoukankan      html  css  js  c++  java
  • Flink中算子进行Chain的规则分析(最新代码,源码版本大于1.11.2)

    在Flink中,从程序代码到真正执行需要经历如下几个过程:

      Program -> StreamGraph -> JobGraph -> ExecutionGraph,在StreamGraph -> JobGraph这个阶段,而Flink会对各个Operator按照一定的规则进行Chain。

    首先,Chain的策略定义是这样的:

      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java

    package org.apache.flink.streaming.api.operators;
    import org.apache.flink.annotation.PublicEvolving;
    /**
     * Defines the chaining scheme for the operator. When an operator is chained to the
     * predecessor, it means that they run in the same thread. They become one operator
     * consisting of multiple steps.
     *
     * <p>The default value used by the StreamOperator is {@link #HEAD}, which means that
     * the operator is not chained to its predecessor. Most operators override this with
     * {@link #ALWAYS}, meaning they will be chained to predecessors whenever possible.
     */
    @PublicEvolving
    public enum ChainingStrategy {
        /**
    * 算子会尽可能的Chain在一起(为了优化性能,最好是使用最大数量的chain和加大算子的并行度) * Operators will be eagerly chained whenever possible. * <p>To optimize performance, it is generally a good practice to allow maximal chaining and increase operator parallelism. */ ALWAYS, /**
    * 当前算子不会与前驱和后继算子进行Chain * The operator will not be chained to the preceding or succeeding operators. */ NEVER, /**
    * 当前算子允许被后继算子Chain,但不会与前驱算子进行Chain * The operator will not be chained to the predecessor, but successors may chain to this operator. */ HEAD, /**
    * 与HEAD类似,但此策略会尝试Chain Source算子 * This operator will run at the head of a chain (similar as in {@link #HEAD}, but it will additionally try to chain source inputs if possible. * This allows multi-input operators to be chained with multiple sources into one task. */ HEAD_WITH_SOURCES; public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS; }

    其次,从StreamGraph生成JobGraph时:

      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java

        /**
         * Gets the assembled {@link JobGraph} with a random {@link JobID}.
         */
        public JobGraph getJobGraph() {
            return getJobGraph(null);
        }
    
        /**
         * Gets the assembled {@link JobGraph} with a specified {@link JobID}.
         */
        public JobGraph getJobGraph(@Nullable JobID jobID) {
            return StreamingJobGraphGenerator.createJobGraph(this, jobID);
        }

    再次,China的动作是这样的:

      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

        public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
            StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
            return downStreamVertex.getInEdges().size() == 1
                    && isChainableInput(edge, streamGraph);
        }
    
        private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
            StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
            StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
            if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                && edge.getShuffleMode() != ShuffleMode.BATCH
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled())) {
    
                return false;
            }
    
            // check that we do not have a union operation, because unions currently only work
            // through the network/byte-channel stack.
            // we check that by testing that each "type" (which means input position) is used only once
            for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
                if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
                    return false;
                }
            }
            return true;
        }

    最后,将JobGraph提交运行

      

  • 相关阅读:
    POJ 3259 Wormholes【BellmanFord】
    POJ 2960 SNim【SG函数的应用】
    ZOJ 3578 Matrixdp水题
    HDU 2897 邂逅明下【bash博弈】
    BellmanFord 算法及其优化【转】
    【转】几个Java的网络爬虫
    thinkphp 反字符 去标签 自动加点 去换行 截取字符串 冰糖
    php 二维数组转 json文本 (jquery datagrid 数据格式) 冰糖
    PHP 汉字转拼音(首拼音,所有拼音) 冰糖
    设为首页与加入收藏 兼容firefox 冰糖
  • 原文地址:https://www.cnblogs.com/mengyao/p/14045389.html
Copyright © 2011-2022 走看看