zoukankan      html  css  js  c++  java
  • Flink

     

    先看最简单的例子,

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple2<Long, Long>> stream = env.addSource(...);
    stream
        .map(new MapFunction<Integer, Integer>() {...})
        .addSink(new SinkFunction<Tuple2<Long, Long>>() {...});
    
    env.execute();

     

    DataStream

    env.addSource

    第一步是产生source,

        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
    
            if(typeInfo == null) { //如果没有指定typeInfo,做类型推断
                if (function instanceof ResultTypeQueryable) {
                    typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
                } else {
                    try {
                        typeInfo = TypeExtractor.createTypeInfo(
                                SourceFunction.class,
                                function.getClass(), 0, null, null);
                    } catch (final InvalidTypesException e) {
                        typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
                    }
                }
            }
    
            boolean isParallel = function instanceof ParallelSourceFunction;
    
            clean(function);
            StreamSource<OUT, ?> sourceOperator;
            if (function instanceof StoppableFunction) {
                sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
            } else {
                sourceOperator = new StreamSource<>(function); //将SourceFunction封装成StreamSource
            }
    
            return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); //将StreamSource封装成DataStreamSource
        }

     

    StreamSource是一种StreamOperator,核心逻辑是run,

    public class StreamSource<OUT, SRC extends SourceFunction<OUT>> 
            extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
        
        private transient SourceFunction.SourceContext<OUT> ctx; //用于collect output
    
        private transient volatile boolean canceledOrStopped = false;
        
        
        public StreamSource(SRC sourceFunction) {
            super(sourceFunction);
    
            this.chainingStrategy = ChainingStrategy.HEAD; //Source只能做Chaining Head
        }
        
        public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
            final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
    
            LatencyMarksEmitter latencyEmitter = null;  //latencyMarker的相关逻辑
            if(getExecutionConfig().isLatencyTrackingEnabled()) {
                latencyEmitter = new LatencyMarksEmitter<>(
                    getProcessingTimeService(),
                    collector,
                    getExecutionConfig().getLatencyTrackingInterval(),
                    getOperatorConfig().getVertexID(),
                    getRuntimeContext().getIndexOfThisSubtask());
            }
            
            final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
    
            this.ctx = StreamSourceContexts.getSourceContext(
                timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);
    
            try {
                userFunction.run(ctx); //调用souceFunction执行用户逻辑,source应该不停的发送,该函数不会结束
    
                // if we get here, then the user function either exited after being done (finite source)
                // or the function was canceled or stopped. For the finite source case, we should emit
                // a final watermark that indicates that we reached the end of event-time
                if (!isCanceledOrStopped()) {
                    ctx.emitWatermark(Watermark.MAX_WATERMARK); //发出最大的waterMarker
                }
            } finally {
    
            }
        }

     

    但是addSource返回的应该是DataStream,

    所以将StreamSource封装成DataStreamSource

      public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
    
        boolean isParallel;
    
        public DataStreamSource(StreamExecutionEnvironment environment,
                TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
                boolean isParallel, String sourceName) {
            super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
    
            this.isParallel = isParallel;
            if (!isParallel) {
                setParallelism(1);
            }
        }

    可以认为SourceTransformation是StreamOperator的封装

      public class SingleOutputStreamOperator<T> extends DataStream<T> {
    
        protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
            super(environment, transformation);
        }

    而DataStream是StreamTransformation的封装

    SingleOutputStreamOperator,这个命名简直不可理喻,集成自DataStream,叫Operator

     

     

    map操作

    在DataStream中,

        public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    
            TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
                    Utils.getCallLocationName(), true);
    
            return transform("Map", outType, new StreamMap<>(clean(mapper)));
        }

     

    这里,StreamMap是StreamOperator

    public class StreamMap<IN, OUT>
            extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
            implements OneInputStreamOperator<IN, OUT> {
    
        public StreamMap(MapFunction<IN, OUT> mapper) {
            super(mapper);
            chainingStrategy = ChainingStrategy.ALWAYS; //对于map而已,永远是可以chain的
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            output.collect(element.replace(userFunction.map(element.getValue()))); //map的逻辑就执行mapFunc,并替换原有的element
        }
    }

     

    调用transform,

    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    
            OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    operator,
                    outTypeInfo,
                    environment.getParallelism());
    
            @SuppressWarnings({ "unchecked", "rawtypes" })
            SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    
            getExecutionEnvironment().addOperator(resultTransform);
    
            return returnStream;
        }

    可以看到这里做了两层封装,从operator –> transformation –> dataStream

    最后调用getExecutionEnvironment().addOperator(resultTransform);

        protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
    
        public void addOperator(StreamTransformation<?> transformation) {
            Preconditions.checkNotNull(transformation, "transformation must not be null.");
            this.transformations.add(transformation);
        }

    这个会把StreamTransformation,注册到transformations 这个结构中,后面会用到

     

    sink

        public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
    
            // configure the type if needed
            if (sinkFunction instanceof InputTypeConfigurable) {
                ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() );
            }
    
            StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
    
            DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
    
            getExecutionEnvironment().addOperator(sink.getTransformation());
            return sink;
        }

     

    StreamSink是operator,

    public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
            implements OneInputStreamOperator<IN, Object> {
    
        public StreamSink(SinkFunction<IN> sinkFunction) {
            super(sinkFunction);
            chainingStrategy = ChainingStrategy.ALWAYS; //对于sink也是永远可以chain的
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            userFunction.invoke(element.getValue());
        }
    
        @Override
        protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
            // all operators are tracking latencies
            this.latencyGauge.reportLatency(maker, true);
    
            // sinks don't forward latency markers
        }
    }

     

    而DataStreamSink不是DataStream,而是和DataStream对等的一个类,因为他的作用也是封装SinkTransformation

      public class DataStreamSink<T> {
    
        SinkTransformation<T> transformation;
    
        @SuppressWarnings("unchecked")
        protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
            this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
        }

    最终也是注册到执行环境,

    getExecutionEnvironment().addOperator(sink.getTransformation());

     

    DataStream,最终形成一个StreamTransformation的树

     

    StreamGraph

    下面就开始执行,

    env.execute

    public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        transformations.clear();
        return executeRemotely(streamGraph);
    }

    可以看到这里调用的是StreamGraphGenerator.generate

    而传入的参数,就是之前的transformations,所有operator和sink都注册在里面

    public StreamGraph getStreamGraph() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return StreamGraphGenerator.generate(this, transformations);
    }

     

    StreamGraphGenerator

    public class StreamGraphGenerator {
    
        // The StreamGraph that is being built, this is initialized at the beginning.
        private StreamGraph streamGraph;
    
        private final StreamExecutionEnvironment env;
    
        // Keep track of which Transforms we have already transformed, this is necessary because
        // we have loops, i.e. feedback edges.
        private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed; //防止环,所以把transformed过的记下来
    
    
        /**
         * Private constructor. The generator should only be invoked using {@link #generate}.
         */
        private StreamGraphGenerator(StreamExecutionEnvironment env) {
            this.streamGraph = new StreamGraph(env);
            this.streamGraph.setChaining(env.isChainingEnabled());
            this.streamGraph.setStateBackend(env.getStateBackend());
            this.env = env;
            this.alreadyTransformed = new HashMap<>();
        }
    
        /**
         * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations}
         * starting from the given transformations.
         *
         * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the
         *            job
         * @param transformations The transformations starting from which to transform the graph
         *
         * @return The generated {@code StreamGraph}
         */
        public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
            return new StreamGraphGenerator(env).generateInternal(transformations);
        }
    
        /**
         * This starts the actual transformation, beginning from the sinks.
         */
        private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
            for (StreamTransformation<?> transformation: transformations) {
                transform(transformation);
            }
            return streamGraph;
        }

    对每个StreamTransformation调用transform逻辑,

        private Collection<Integer> transform(StreamTransformation<?> transform) {
    
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform); //如果transform过,就直接返回
            }
    
            Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            }
    
            return transformedIds;
        }

    上面有用到,OneInputTransformation,SourceTransformation,SinkTransformation

    transformOnInputTransform

    /**
         * Transforms a {@code OneInputTransformation}.
         *
         * <p>
         * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
         * wired the inputs to this new node.
         */
        private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
    
            Collection<Integer> inputIds = transform(transform.getInput()); //递归调用transform,所以前面source没有加到transformations,因为这里会递归到
    
            // the recursive call might have already transformed this
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform); //如果已经transform过,直接返回
            }
    
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); //产生slotSharingGroup
    
            streamGraph.addOperator(transform.getId(), //addOperator
                    slotSharingGroup,
                    transform.getOperator(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
    
            if (transform.getStateKeySelector() != null) {
                TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
                streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
            }
    
            streamGraph.setParallelism(transform.getId(), transform.getParallelism());
            streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0); //addEdge
            }
    
            return Collections.singleton(transform.getId());
        }

    transform id代表什么?

    public abstract class StreamTransformation<T> {
    
        // This is used to assign a unique ID to every StreamTransformation
        protected static Integer idCounter = 0;
    
        public static int getNewNodeId() {
            idCounter++;
            return idCounter;
        }
    
    
        protected final int id;
        
        public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
            this.id = getNewNodeId();

    可以看到这个id是从0开始自增长的值,先加后返回,所以第一个transform id为1

    类static,所以取决于StreamTransformation对象创建的顺序

     

    slotSharingGroup,这里只是名字,所以是string

    public abstract class StreamTransformation<T> {
        private String slotSharingGroup;
        public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
            this.slotSharingGroup = null;

    默认下slotSharingGroup 是null,没有设置

     

    在DataStreamSink, SingleOutputStreamOperator中都可以设置,

       /**
         * Sets the slot sharing group of this operation. Parallel instances of
         * operations that are in the same slot sharing group will be co-located in the same
         * TaskManager slot, if possible.
         *
         * <p>Operations inherit the slot sharing group of input operations if all input operations
         * are in the same slot sharing group and no slot sharing group was explicitly specified.
         *
         * <p>Initially an operation is in the default slot sharing group. An operation can be put into
         * the default group explicitly by setting the slot sharing group to {@code "default"}.
         *
         * @param slotSharingGroup The slot sharing group name.
         */
        @PublicEvolving
        public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
            transformation.setSlotSharingGroup(slotSharingGroup);
            return this;
        }

    这是用户可以直接通过api设置的

    someStream.filter(...).slotSharingGroup("group1")

     

    determineSlotSharingGroup

       /**
         * Determines the slot sharing group for an operation based on the slot sharing group set by
         * the user and the slot sharing groups of the inputs.
         *
         * <p>If the user specifies a group name, this is taken as is. If nothing is specified and
         * the input operations all have the same group name then this name is taken. Otherwise the
         * default group is choosen.
         *
         * @param specifiedGroup The group specified by the user.
         * @param inputIds The IDs of the input operations.
         */
        private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
            if (specifiedGroup != null) { //如果用户指定,以用户指定为准
                return specifiedGroup;
            } else {
                String inputGroup = null;
                for (int id: inputIds) { //根据输入的SlotSharingGroup进行推断
                    String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
                    if (inputGroup == null) {
                        inputGroup = inputGroupCandidate; //初始化
                    } else if (!inputGroup.equals(inputGroupCandidate)) { //逻辑如果所有input的SlotSharingGroup都相同,就用;否则就用“default”
                        return "default";
                    }
                }
                return inputGroup == null ? "default" : inputGroup; //默认用default
            }
        }

    如果用户不指定,那么所有operator都默认在default slotSharingGroup下

    如果用户指定,以用户指定为准

     

    streamGraph.addOperator

        public <IN, OUT> void addOperator(
                Integer vertexID,
                String slotSharingGroup,
                StreamOperator<OUT> operatorObject,
                TypeInformation<IN> inTypeInfo,
                TypeInformation<OUT> outTypeInfo,
                String operatorName) {
    
            if (operatorObject instanceof StoppableStreamSource) {
                addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
            } else if (operatorObject instanceof StreamSource) {
                addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
            } else {
                addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
            }

    Integer vertexID, 可以看到vertexId就是transform.getId()

        protected StreamNode addNode(Integer vertexID,
            String slotSharingGroup,
            Class<? extends AbstractInvokable> vertexClass,
            StreamOperator<?> operatorObject,
            String operatorName) {
    
            if (streamNodes.containsKey(vertexID)) { //如果已经有vertexId
                throw new RuntimeException("Duplicate vertexID " + vertexID);
            }
    
            StreamNode vertex = new StreamNode(environment,
                vertexID,
                slotSharingGroup,
                operatorObject,
                operatorName,
                new ArrayList<OutputSelector<?>>(),
                vertexClass);
    
            streamNodes.put(vertexID, vertex);
    
            return vertex;
        }

    StreamNode其实就是Transformation的封装

    区别在于,不是每一个Transformation都会形成一个StreamNode

     

    streamGraph.addEdge

    在transformation中,通过递归的记录input transformation来表示之间的关系

    这里增加edge抽象

    streamGraph.addEdge(inputId, transform.getId(), 0);

        public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
            addEdgeInternal(upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    null,
                    new ArrayList<String>());
    
        }

     

    private void addEdgeInternal(Integer upStreamVertexID,
                Integer downStreamVertexID,
                int typeNumber,
                StreamPartitioner<?> partitioner,
                List<String> outputNames) {
    
    
            if (virtualSelectNodes.containsKey(upStreamVertexID)) { //如果是虚拟select节点
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualSelectNodes.get(virtualId).f0; //由于不是真实节点,所以以虚拟节点的父节点为父节点
                if (outputNames.isEmpty()) {
                    // selections that happen downstream override earlier selections
                    outputNames = virtualSelectNodes.get(virtualId).f1; //将select虚拟节点,转换为outputNames
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);//递归的调用addEdgeInternal
            } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
                if (partitioner == null) {
                    partitioner = virtualPartitionNodes.get(virtualId).f1; //对于partition虚拟节点,转换为partitioner
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);//递归的调用addEdgeInternal
            } else {
                StreamNode upstreamNode = getStreamNode(upStreamVertexID);
                StreamNode downstreamNode = getStreamNode(downStreamVertexID);
    
                // If no partitioner was specified and the parallelism of upstream and downstream
                // operator matches use forward partitioning, use rebalance otherwise.
                if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { //关键逻辑,决定默认partitioner
                    partitioner = new ForwardPartitioner<Object>(); //如果并发度相同则是forward
                } else if (partitioner == null) {
                    partitioner = new RebalancePartitioner<Object>(); //如果并发度不同则是Rebalance
                }
    
                if (partitioner instanceof ForwardPartitioner) { //判断如果用户指定forward,而并发度不同,抛异常
                    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                        throw new UnsupportedOperationException("Forward partitioning does not allow " +
                                "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                                ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                                " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                    }
                }
    
                StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner); //创建StreamEdge
    
                getStreamNode(edge.getSourceId()).addOutEdge(edge); //将上下游StreamNode用StreamEdge相连
                getStreamNode(edge.getTargetId()).addInEdge(edge);
            }
        }

    可以看到对于select和partition这样的虚拟node,会被封装在StreamEdge中,而不会真正产生StreamNode

    如下示意图,

    /** 
     * The following graph of {@code StreamTransformations}:
     *
     * <pre>{@code
     *   Source              Source        
     *      +                        +           
     *      |                        |           
     *      v                        v           
     *  Rebalance          HashPartition    
     *      +                        +           
     *      |                        |           
     *      |                        |           
     *      +------>Union<------+           
     *                +                     
     *                |                     
     *                v                     
     *              Split                   
     *                +                     
     *                |                     
     *                v                     
     *              Select                  
     *                +                     
     *                v                     
     *               Map                    
     *                +                     
     *                |                     
     *                v                     
     *              Sink 
     * }</pre>
     *
     * Would result in this graph of operations at runtime:
     *
     * <pre>{@code
     *  Source              Source
     *    +                   +
     *    |                   |
     *    |                   |
     *    +------->Map<-------+
     *              +
     *              |
     *              v
     *             Sink
     * /

     

    SourceTransformation,SinkTransformation都大同小异,不详述了

    看下对虚拟节点处理,

    transformPartition

        private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
            StreamTransformation<T> input = partition.getInput();
            List<Integer> resultIds = new ArrayList<>();
    
            Collection<Integer> transformedIds = transform(input); //递归transform父节点,并得到他们的id
            for (Integer transformedId: transformedIds) {
                int virtualId = StreamTransformation.getNewNodeId(); //产生自己的id
                streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner()); //只是注册到VirtualPartitionNode,而没有真正产生StreamNode
                resultIds.add(virtualId);
            }
    
            return resultIds;
        }

     

    transformUnion

      private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
            List<StreamTransformation<T>> inputs = union.getInputs();
            List<Integer> resultIds = new ArrayList<>();
    
            for (StreamTransformation<T> input: inputs) {
                resultIds.addAll(transform(input)); //递归
            }
    
            return resultIds;
        }

    只是简单的将inputs合并

     

    JobGraph

     

    env.execute

    public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        transformations.clear();
        return executeRemotely(streamGraph);
    }
    继续

    executeRemotely

        protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
            ClusterClient client;
            try {
                return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();
            }
        }

     

    ClusterClient.run

      public JobSubmissionResult run(FlinkPlan compiledPlan,
                List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
            throws ProgramInvocationException
        {
            JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
            return submitJob(job, classLoader);
        }

     

        private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
            JobGraph job;
            if (optPlan instanceof StreamingPlan) { //如果是流job plan
                job = ((StreamingPlan) optPlan).getJobGraph();
                job.setSavepointRestoreSettings(savepointSettings);
            } else { //如果是batch
                JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig);
                job = gen.compileJobGraph((OptimizedPlan) optPlan);
            }
    
            for (URL jar : jarFiles) {
                try {
                    job.addJar(new Path(jar.toURI())); //加入jar
                } catch (URISyntaxException e) {
                    throw new RuntimeException("URL is invalid. This should not happen.", e);
                }
            }
     
            job.setClasspaths(classpaths); //加上classpath
    
            return job;
        }

     

    对于流的case,调用到,

    ((StreamingPlan) optPlan).getJobGraph();

     

    StreamGraph.getJobGraph

        public JobGraph getJobGraph() {
            StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
    
            return jobgraphGenerator.createJobGraph();
        }

     

    StreamingJobGraphGenerator.createJobGraph

     

        public JobGraph createJobGraph() {
    
            jobGraph = new JobGraph(streamGraph.getJobName()); //创建JobGraph
    
            // make sure that all vertices start immediately
            jobGraph.setScheduleMode(ScheduleMode.EAGER); //对于流所有vertices需要立即启动,相对的模式,LAZY_FROM_SOURCES,task只有在input ready时,才会创建
    
            init(); //简单的结构new,初始化
    
            // Generate deterministic hashes for the nodes in order to identify them across
            // submission iff they didn't change.
            Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); //为每个node创建唯一的hashid,这样多次提交时能够定位到,最终返回node id和hash id的对应
    
    
             setChaining(hashes, legacyHashes); //核心逻辑,创建JobVertex,JobEdge
    
            setPhysicalEdges(); //只是将每个vertex的入边信息,写入该vertex所对应的StreamConfig里面
    
            setSlotSharing();
    
            configureCheckpointing();
    
            // set the ExecutionConfig last when it has been finalized
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    
            return jobGraph;
        }

     

    setChaining

        private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
            for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
                createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0);
            }
        }

    对每个source,调用createChain

        private List<StreamEdge> createChain(
                Integer startNodeId,
                Integer currentNodeId,
                Map<Integer, byte[]> hashes,
                List<Map<Integer, byte[]>> legacyHashes,
                int chainIndex) {
    
            if (!builtVertices.contains(startNodeId)) {
    
                List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();//最终要生成JobEdge的StreamingEdge
    
                List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
                List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
    
                for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { //遍历当前Node的所有出边
                    if (isChainable(outEdge, streamGraph)) { //判断是否可以chain,核心逻辑
                        chainableOutputs.add(outEdge);
                    } else {
                        nonChainableOutputs.add(outEdge);
                    }
                }
    
                for (StreamEdge chainable : chainableOutputs) { //对于chainable,递归调用下去
                    transitiveOutEdges.addAll(
                            createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1)); //currentNodeId设为targetNode的id,同时chainIndex加1
                }
    
                for (StreamEdge nonChainable : nonChainableOutputs) { //对于nonChainable
                    transitiveOutEdges.add(nonChainable); //既然不是chained,就需要产生真正的JobEdge,所以放到transitiveOutEdges
                    createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0); //继续,但注意这里startNodeId和currentNodeId都设为TargetId,因为当前的非chained,下一个需要开始新的chain
                }
    
                chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); //为每个chain生成name
    
                StreamConfig config = currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, hashes, legacyHashes) //只有为chain中的startNode创建JobVertex,其他的只是创建空StreamConfig
                        : new StreamConfig(new Configuration());
    
                setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); //将StreamNode中的配置放到StreamConfig中
    
                if (currentNodeId.equals(startNodeId)) { //如果是startNode
    
                    config.setChainStart();
                    config.setChainIndex(0);
                    config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                    config.setOutEdgesInOrder(transitiveOutEdges);
                    config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    
                    for (StreamEdge edge : transitiveOutEdges) {
                        connect(startNodeId, edge); //只要startNode需要connect edge
                    }
    
                    config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
                } else {
    
                    Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
    
                    if (chainedConfs == null) {
                        chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
                    }
                    config.setChainIndex(chainIndex);
                    config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                    chainedConfigs.get(startNodeId).put(currentNodeId, config);
                }
                if (chainableOutputs.isEmpty()) {
                    config.setChainEnd();
                }
    
                return transitiveOutEdges;
    
            } else {
                return new ArrayList<>();
            }
        }

     

    isChainable

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
            StreamNode upStreamVertex = edge.getSourceVertex(); //StreamEdge的起点
            StreamNode downStreamVertex = edge.getTargetVertex(); //StreamEdge的终点
    
            StreamOperator<?> headOperator = upStreamVertex.getOperator();
            StreamOperator<?> outOperator = downStreamVertex.getOperator();
    
            return downStreamVertex.getInEdges().size() == 1 //终点的入边为1,如果多个输入,需要等其他输入,无法chain执行
                    && outOperator != null
                    && headOperator != null
                    && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个SlotSharingGroup
                    && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //终点ChainingStrategy是Always
                    && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                        headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //启动ChainingStrategy是Head或Always
                    && (edge.getPartitioner() instanceof ForwardPartitioner) //Edge是ForwardPartitioner
                    && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //起点和终点的并发度相同
                    && streamGraph.isChainingEnabled(); //允许chain
        }

     

    createJobVertex

        private StreamConfig createJobVertex(
                Integer streamNodeId,
                Map<Integer, byte[]> hashes,
                List<Map<Integer, byte[]>> legacyHashes) {
    
            JobVertex jobVertex;
            StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
    
            byte[] hash = hashes.get(streamNodeId); //取出streamNode对应的唯一id
    
            JobVertexID jobVertexId = new JobVertexID(hash); //生成JobVertexID
    
            if (streamNode.getInputFormat() != null) {
                jobVertex = new InputFormatVertex(
                        chainedNames.get(streamNodeId),
                        jobVertexId,
                        legacyJobVertexIds);
                TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
                taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
            } else {
                jobVertex = new JobVertex(
                        chainedNames.get(streamNodeId),
                        jobVertexId,
                        legacyJobVertexIds);
            }
    
            jobVertex.setInvokableClass(streamNode.getJobVertexClass());
    
            int parallelism = streamNode.getParallelism();
    
            if (parallelism > 0) {
                jobVertex.setParallelism(parallelism); //设置并发度
            } else {
                parallelism = jobVertex.getParallelism();
            }
    
            jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
    
            jobVertices.put(streamNodeId, jobVertex); //将jobVertex加到相应的结构中去
            builtVertices.add(streamNodeId);
            jobGraph.addVertex(jobVertex);
    
            return new StreamConfig(jobVertex.getConfiguration());
        }

     

    connect(startNodeId, edge)

    只需要去connect transitiveOutEdges

    为何叫transitive,对于一组chain node,其实只会创建HeadNode所对应的JobVertex;并且在建立链接的时候,只需要对nonchainable的边建JobEdge

    上面看到,在递归调用createChain的时候会传回所有的transitiveOutEdges,因为后面chain node没有创建JobVertex,所以他们连的nonchainable的边也要放到HeadNode上,这可以理解是一种传递

        private void connect(Integer headOfChain, StreamEdge edge) {
    
            physicalEdgesInOrder.add(edge);//connect都是物理边,即会产生JobEdge
    
            Integer downStreamvertexID = edge.getTargetId();
    
            JobVertex headVertex = jobVertices.get(headOfChain);
            JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
    
            StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
    
            downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); //多一个入边,inputs + 1
    
            StreamPartitioner<?> partitioner = edge.getPartitioner();
            JobEdge jobEdge = null;
            if (partitioner instanceof ForwardPartitioner) {
                jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    DistributionPattern.POINTWISE,
                    ResultPartitionType.PIPELINED); //Streaming都是pipelining,即一有结果,consumer就会来拖
            } else if (partitioner instanceof RescalePartitioner){
                jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    DistributionPattern.POINTWISE, //produer的subtask可以对应一个或多个consumer的tasks
                    ResultPartitionType.PIPELINED);
            } else {
                jobEdge = downStreamVertex.connectNewDataSetAsInput(
                        headVertex,
                        DistributionPattern.ALL_TO_ALL, //producer和consumer的subtask,一对一
                        ResultPartitionType.PIPELINED);
            }
            // set strategy name so that web interface can show it.
            jobEdge.setShipStrategyName(partitioner.toString());
        }

     

    downStreamVertex.connectNewDataSetAsInput

    JobVertex.connectNewDataSetAsInput

    public JobEdge connectNewDataSetAsInput(
                JobVertex input,
                DistributionPattern distPattern,
                ResultPartitionType partitionType) {
    
            IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); //创建IntermediateDataSet,并注册到inputVertex
    
            JobEdge edge = new JobEdge(dataSet, this, distPattern); //创建JobEdge
            this.inputs.add(edge); //把edge作为当前vertex的input
            dataSet.addConsumer(edge); //edge从IntermediateDataSet去数据
            return edge;
        }

    setSlotSharing

        private void setSlotSharing() {
    
            Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
    
            for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { //遍历每个JobVertex
    
                String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
    
                SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);
                if (group == null) {
                    group = new SlotSharingGroup(); //初始化SlotSharingGroup
                    slotSharingGroups.put(slotSharingGroup, group);
                }
                entry.getValue().setSlotSharingGroup(group); //把节点加入SlotSharingGroup
            }
    
            for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) { //对于Iteration要创建CoLocationGroup
    
                CoLocationGroup ccg = new CoLocationGroup();
    
                JobVertex source = jobVertices.get(pair.f0.getId());
                JobVertex sink = jobVertices.get(pair.f1.getId());
    
                ccg.addVertex(source);
                ccg.addVertex(sink);
                source.updateCoLocationGroup(ccg);
                sink.updateCoLocationGroup(ccg);
            }
    
        }

    configureCheckpointing

        private void configureCheckpointing() {
            CheckpointConfig cfg = streamGraph.getCheckpointConfig();
    
            long interval = cfg.getCheckpointInterval();
            if (interval > 0) {  //只要设置过CheckpointInterval,默认设为fixedDelayRestart策略
                // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
                if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                    // if the user enabled checkpointing, the default number of exec retries is infinite.
                    streamGraph.getExecutionConfig().setRestartStrategy(
                        RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
                }
            } else {
                // interval of max value means disable periodic checkpoint
                interval = Long.MAX_VALUE;
            }
    
            // collect the vertices that receive "trigger checkpoint" messages.
            // currently, these are all the sources
            List<JobVertexID> triggerVertices = new ArrayList<>();
    
            // collect the vertices that need to acknowledge the checkpoint
            // currently, these are all vertices
            List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size()); //所以JobVertex都需要ack
    
            // collect the vertices that receive "commit checkpoint" messages
            // currently, these are all vertices
            List<JobVertexID> commitVertices = new ArrayList<>();
    
            for (JobVertex vertex : jobVertices.values()) {
                if (vertex.isInputVertex()) { //没有输入的Vertex
                    triggerVertices.add(vertex.getID()); //加入triggerVertex
                }
                commitVertices.add(vertex.getID());
                ackVertices.add(vertex.getID());
            }
    
            CheckpointingMode mode = cfg.getCheckpointingMode();
    
            boolean isExactlyOnce;
            if (mode == CheckpointingMode.EXACTLY_ONCE) { //Checkpoint模式
                isExactlyOnce = true;
            } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
                isExactlyOnce = false;
            } else {
                throw new IllegalStateException("Unexpected checkpointing mode. " +
                    "Did not expect there to be another checkpointing mode besides " +
                    "exactly-once or at-least-once.");
            }
    
            JobSnapshottingSettings settings = new JobSnapshottingSettings(
                    triggerVertices, ackVertices, commitVertices, interval,
                    cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
                    cfg.getMaxConcurrentCheckpoints(),
                    externalizedCheckpointSettings,
                    isExactlyOnce);
    
            jobGraph.setSnapshotSettings(settings);
        }

     

    至此,JobGraph已经完成

    最终,将JobGraph发送到JobManager

     

    参考,

    http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph/

    http://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/

  • 相关阅读:
    eclipse常用快捷键
    Sql server 问题诊断
    Oracle 表格大小分析
    VM虚拟机增加磁盘空间
    Linux搭建Nexus+Maven私人仓库
    Linux 下安装Git 版本管理工具 使用记录
    Jenkins 环境打建设 Linux
    Oracle 数据库用户表大小分析
    Windgb 其他常用命令
    Windbg 查内存占用
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6655695.html
Copyright © 2011-2022 走看看