zoukankan      html  css  js  c++  java
  • 【源码】Flink StreamGraph 生成过程

    Flink StreamGraph 的核心是 streamNodes 包含所以 算子生成的 StreamNode(也叫 Vertex), StreamNode 中包含连接算子的边(Edge),其他的虚拟节点 使用 virtualSelectNodes、virtualSideOutputNodes、virtualPartitionNodes 这三个map 标示上下游物理节点的连接信息
    sources、sinks 的 set 标示流的source 和 sink 的 StreamNode id

    private Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Set<Integer> sinks;
    private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
    private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
    private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>> virtualPartitionNodes;

    StreamGraph 的生成是从用户代码执行 env.execute() 开始的,getStreamGraph(jobName) 做参数的 execute 下面,就是生成 JobGraph 的内容,这次的主角就是 getStreamGraph 方法执行的部分了

    StreamExecutionEnvironment.java

    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
    
        // 使用jobName 做参数调用 getStreamGraph 生成 StreamGraph,
        // 再用 StreamGraph 做参数,调用 execute 方法 生成后续的 JobGraph
        return execute(getStreamGraph(jobName));
      }

    StreamGraph 是 StreamGraphGenerator 生成的,StreamGraphGenerator 对象的创建比较简单,就是将所以执行配置都放进去,除了部署属性 env.configuration(DeploymentOptions)

    transformations 即是所有算子转换的 Transformation 列表
    config 是 ExecutionConfig
    其他的很明显,就不一一说明了

    StreamExecuteEnvironment.java

    @Internal
    public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
      // 先创建 StreamGraphGenerator, 再调用 generate 生成 StreamGraph
      StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
      if (clearTransformations) {p
        this.transformations.clear();
      }
      return streamGraph;
    }
    
    private StreamGraphGenerator getStreamGraphGenerator() {
      if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
      }
      // 创建 StreamGraphGenerator, 将 transformations/config/checkpointCfg/stateBackend 等配置信息放进去
      return new StreamGraphGenerator(transformations, config, checkpointCfg)
        .setStateBackend(defaultStateBackend)
        .setChaining(isChainingEnabled)
        .setUserArtifacts(cacheFile)
        // 时间类型
        .setTimeCharacteristic(timeCharacteristic)
        .setDefaultBufferTimeout(bufferTimeout);
    }

    generate 过程也比较粗暴,直接遍历 transformations 列表,将每个 transform 都再 transform 一遍,从 Transformation 转成 StreamNode

    StreamGraphGenerator.java

    // generate
    public StreamGraph generate() {
      // 先创建 StreamGraph 将 env 中的 配置信息全部放进去
      streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
      streamGraph.setStateBackend(stateBackend);
      streamGraph.setChaining(chaining);
      streamGraph.setScheduleMode(scheduleMode);
      streamGraph.setUserArtifacts(userArtifacts);
      streamGraph.setTimeCharacteristic(timeCharacteristic);
      streamGraph.setJobName(jobName);
      streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
    
      alreadyTransformed = new HashMap<>();
      // 遍历 transformation 列表,对所以算子做 转换
      for (Transformation<?> transformation: transformations) {
        // 生成 StreamGraph 的核心逻辑
        transform(transformation);
      }
      // 返回 final 的对象,后面就不能修改了
      final StreamGraph builtStreamGraph = streamGraph;
    
      alreadyTransformed.clear();
      alreadyTransformed = null;
      streamGraph = null;
      // 返回 生成的 StreamGraph
      return builtStreamGraph;
    }

    transform 方法的内容比较重要,涉及到所以 算子的转换,不同类型的 transform 调用不同的方法

    物理节点和虚拟节点(分区、侧输出、select) 逻辑不同
    物理节点的处理是创建 StreamNode(vertex),设置虚拟化器,指定输入输出类型,设置 keySelector、并行度、最大并行度、添加 输入边
    虚拟节点没有 StreamNode 只包含上下游物理节点的 连接关系

    private Collection<Integer> transform(Transformation<?> transform) {
    
        // transform: 类型 + id + name + outputType + partition
        // 判断 转换操作是否已经添加了,添加了就返回 input 的 node id
        if (alreadyTransformed.containsKey(transform)) {
          return alreadyTransformed.get(transform);
        }
    
        LOG.debug("Transforming " + transform);
        // 设置最大并行度
        if (transform.getMaxParallelism() <= 0) {
    
          // if the max parallelism hasn't been set, then first use the job wide max parallelism
          // from the ExecutionConfig.
          int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
          // 最大并行度大于 0 的情况,就设置最大并行度(否则会使用默认值
          if (globalMaxParallelismFromConfig > 0) {
            transform.setMaxParallelism(globalMaxParallelismFromConfig);
          }
        }
        // 校验 输出类型, 如果没有类型 (MissingTypeInfo) 的 抱错
        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();
    
        // 处理不同类型的 transform
        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
          // 只有一个输入
          transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
          // 两个输入
          transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof AbstractMultipleInputTransformation<?>) {
          // 多个输入
          transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
        } else if (transform instanceof SourceTransformation) {
          // source
          transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof LegacySourceTransformation<?>) {
          // source
          transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
          // sink
          transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
          // union
          transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
          // split
          transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
          // select
          transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
          // feedback
          transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
          // co feedback
          transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
          // 分区
          transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
          // 侧输出
          transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
          // 其他
          throw new IllegalStateException("Unknown transformation: " + transform);
        }
        // 添加 transform 到 已经 transform 的 map 中
        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
          alreadyTransformed.put(transform, transformedIds);
        }
        // 设置 buffer timeout
        if (transform.getBufferTimeout() >= 0) {
          streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
          streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
        }
        // transform 设置 UID
        if (transform.getUid() != null) {
          streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        // 设置 node hash
        if (transform.getUserProvidedNodeHash() != null) {
          streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }
        //
        if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
          if (transform instanceof PhysicalTransformation &&
              transform.getUserProvidedNodeHash() == null &&
              transform.getUid() == null) {
            throw new IllegalStateException("Auto generated UIDs have been disabled " +
              "but no UID or hash has been assigned to operator " + transform.getName());
          }
        }
        // 设置 streamNode 资源 : 最小资源、最优资源 包含 cpuCores/taskHeapMemory/taskOffHeapMemory/managedMemory/extendedResources
        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
          streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }
        // 设置 managedMemory 权重
        streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());
    
        return transformedIds;
      }

    ## source 算子 transform 过程

    source 算子 transform 过程也很明显,直接调用 streamGraph.addSource 方法,将 source id 、slotSharingGroup 、 输出类型等 做为参数,生成 Source 的 StreamNode

    else if (transform instanceof SourceTransformation) {
      // source
      transformedIds = transformSource((SourceTransformation<?>) transform);
    } 
    private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
        // 获取 slotSharingGroup
        String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
    
        // 添加 source
        streamGraph.addSource(source.getId(),
            slotSharingGroup,
            source.getCoLocationGroupKey(),
            source.getOperatorFactory(),
            null,
            source.getOutputType(),
            "Source: " + source.getName());
        int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
            source.getParallelism() : executionConfig.getParallelism();
        // 设置并行度
        streamGraph.setParallelism(source.getId(), parallelism);
        // 设置最大并行度
        streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
        // 返回 source vertexID
        return Collections.singleton(source.getId());
      }

    addSource 添加一个Source 的StreamNode,同时将生成的 StreamNode ID (vertexId) 放到 sources Set 中

    public <IN, OUT> void addSource(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        SourceOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName) {
        // 添加一个 operator
        addOperator(
          vertexID,
          slotSharingGroup,
          coLocationGroup,
          operatorFactory,
          inTypeInfo,
          outTypeInfo,
          operatorName,
          SourceOperatorStreamTask.class);
        // 添加到 source set 中
        sources.add(vertexID);
      }

    addOperator 方法其他算子调用的也差不多了

    public <IN, OUT> void addOperator(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName) {
        // 反射获取 算子Task 的 类型
        Class<? extends AbstractInvokable> invokableClass =
          operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
        // 添加算子, 转换成了算子
        addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
          outTypeInfo, operatorName, invokableClass);
      }
    
    
      private <IN, OUT> void addOperator(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName,
        Class<? extends AbstractInvokable> invokableClass) {
        // 用算子创建 StreamNode 并 添加到 StreamGraph 的 核心  StreamNodes 中
        addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
        // 设置 StreamNode 输入、输出的序列化类型
        setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
        // StreamOperator 工厂类 如果指定了 输出类型配置
        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
          // sets the output type which must be know at StreamGraph creation time
          operatorFactory.setOutputType(outTypeInfo, executionConfig);
        }
    
        // StreamOperator 工厂类 如果指定了 输入类型配置
        if (operatorFactory.isInputTypeConfigurable()) {
          operatorFactory.setInputType(inTypeInfo, executionConfig);
        }
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Vertex: {}", vertexID);
        }
      }

    addNode 创建 StreamNode,将 StreamNode 添加到 streamNodes 列表中

    // 算子创建 StreamNode,并加入 StreamGraph 的 StreamNodes 中
      protected StreamNode addNode(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        Class<? extends AbstractInvokable> vertexClass,
        StreamOperatorFactory<?> operatorFactory,
        String operatorName) {
    
        // 如果已经存在,说明已经处理过这个节点了,任务出现错误 抛出 RuntimeException
        if (streamNodes.containsKey(vertexID)) {
          throw new RuntimeException("Duplicate vertexID " + vertexID);
        }
    
        // new StreamNode
        StreamNode vertex = new StreamNode(
          vertexID,
          slotSharingGroup,
          coLocationGroup,
          operatorFactory,
          operatorName,
          new ArrayList<OutputSelector<?>>(),
          vertexClass);
        // 添加到 streamNodes 中
        streamNodes.put(vertexID, vertex);
    
        return vertex;
      }

    ## 物理节点 OneInputTransformation transform 过程

    创建 StreamNode,添加输入边,添加到 streamNodes,返回 vertexId

    if (transform instanceof OneInputTransformation<?, ?>) {
          // 只有一个输入
          transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        }
    /**
       * Transforms a {@code OneInputTransformation}.
       * 一个输入的算子
       * <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
       * wired the inputs to this new node.
       */
      private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
    
        // 把输入 的 transform 放进去 transform 一下
        // 有多个上游输入算子处理的时候,以防其他分支还没有处理
        Collection<Integer> inputIds = transform(transform.getInput());
    
        // 检查是否已经添加了
        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
          return alreadyTransformed.get(transform);
        }
    
        // 获取 slotSharingGroup, 输入或者默认
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
        // 添加 转换到 streamNodes 中
        streamGraph.addOperator(transform.getId(),
            slotSharingGroup,
            transform.getCoLocationGroupKey(),
            transform.getOperatorFactory(),
            transform.getInputType(),
            transform.getOutputType(),
            transform.getName());
        // 判断是 keyby 的 KeySelector
        if (transform.getStateKeySelector() != null) {
          // 可以 序列化器
          TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
          // 设置 一个输入的 state key 的 序列化器 和 KeySelector
          streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }
        // 获取并行度
        int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
          transform.getParallelism() : executionConfig.getParallelism();
        // 设置并行度
        streamGraph.setParallelism(transform.getId(), parallelism);
        // 设置 最大并行度
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
        // 给 StreamNode 每个输入添加 边
        for (Integer inputId: inputIds) {
          streamGraph.addEdge(inputId, transform.getId(), 0);
        }
        // 返回 节点 transform 的 id 也是 vertexID
        return Collections.singleton(transform.getId());
      }

    这一段逻辑比较清晰,就不多废话了, addOperator 与 Source 的差不多

    ## 虚拟分区节点 PartitionTransformation transform 过程

    else if (transform instanceof PartitionTransformation<?>) {
        // 分区
        transformedIds = transformPartition((PartitionTransformation<?>) transform);
      }
    
    
    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
        // 获取输入
        Transformation<T> input = partition.getInput();
        List<Integer> resultIds = new ArrayList<>();
        // transform 输入
        Collection<Integer> transformedIds = transform(input);
        // 对每个输入添加一个 虚拟 分区节点
        for (Integer transformedId: transformedIds) {
          int virtualId = Transformation.getNewNodeId();
          // 添加细腻分区节点
          streamGraph.addVirtualPartitionNode(
              transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
          // 添加到返回的 resultId 列表中
          resultIds.add(virtualId);
        }
    
        return resultIds;
      }
    
    
    public void addVirtualPartitionNode(
        Integer originalId,
        Integer virtualId,
        StreamPartitioner<?> partitioner,
        ShuffleMode shuffleMode) {
    
        // 查看是否已经添加了
        if (virtualPartitionNodes.containsKey(virtualId)) {
          throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
        }
        // 添加
        virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode));
      }

    ## union 算子 transform 过程

    只是把所以 输入都 transform 了一遍,其他就没有做了,union 算子不会创建节点, union 的每个流会单独处理,直接与下游 节点相连,而不是先合并,再关联下游节点(从webUI 连线也能看出来)

    else if (transform instanceof UnionTransformation<?>) {
          // union
          transformedIds = transformUnion((UnionTransformation<?>) transform);
        } 
    
    /**
     * Transforms a {@code UnionTransformation}.
     *
     * <p>This is easy, we only have to transform the inputs and return all the IDs in a list so
     * that downstream operations can connect to all upstream nodes.
     * 这很容易,我们只需要转换输入并返回列表中的所有ID,以便下游操作可以连接到所有上游节点。
     */
    private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
      List<Transformation<T>> inputs = union.getInputs();
      List<Integer> resultIds = new ArrayList<>();
    
      for (Transformation<T> input: inputs) {
        resultIds.addAll(transform(input));
      }
    
      return resultIds;
    }   

    ## sink 算子 transform 过程

    else if (transform instanceof SinkTransformation<?>) {
          // sink
          transformedIds = transformSink((SinkTransformation<?>) transform);
        }
    private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
        // transform sink 算子的 输入算子
        Collection<Integer> inputIds = transform(sink.getInput());
        // 决定 slotSharingGroup
        String slotSharingGroup = determineSlotSharingGroup(sink.getSlotSharingGroup(), inputIds);
        // 添加 Sink
        streamGraph.addSink(sink.getId(),
            slotSharingGroup,
            sink.getCoLocationGroupKey(),
            sink.getOperatorFactory(),
            sink.getInput().getOutputType(),
            null,
            "Sink: " + sink.getName());
        // 设置 sink 的 StreamOperatorFactory
        StreamOperatorFactory operatorFactory = sink.getOperatorFactory();
        if (operatorFactory instanceof OutputFormatOperatorFactory) {
          streamGraph.setOutputFormat(sink.getId(), ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat());
        }
        // 设置并行度与最大并行度
        int parallelism = sink.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
          sink.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(sink.getId(), parallelism);
        streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());
        // sink 算子添加输入边
        for (Integer inputId: inputIds) {
          streamGraph.addEdge(inputId,
              sink.getId(),
              0
          );
        }
        // 设置 keySelector
        if (sink.getStateKeySelector() != null) {
          TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(executionConfig);
          streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer);
        }
        // 返回空 这个分支就结束了
        return Collections.emptyList();
      }
    public <IN, OUT> void addSink(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName) {
        // 添加 sink StreamNode
        addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
        // 添加 Sink StreamNode id 到 sinks Set 中
        sinks.add(vertexID);
      }

    到这里,从 Source 到 Sink 的 transform 过程就结束了,略微总结下:

    1、source StreamNode 没有输入,会添加到 streamNodes 和 sources 中
    2、Sink StreamNode 不返回,即没有下游, 会添加到 streamNodes 和 sinks 中
    3、物理节点会创建 StreamNode 添加到 streamNodes 中
    4、虚拟节点不会创建 StreamNode
    5、union 算子是没有节点的,只返回 union 输入算子的 id

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    MEAN: AngularJS + NodeJS的REST API开发教程
    什么是MEAN全堆栈javascript开发框架
    fputcsv 导出excel,解决内存、性能、乱码、科学计数法问题
    React 高德地图画点画区域放大缩小
    React 拖动布局
    React+TypeScript搭建项目
    js 运算符优先级
    for...in 与 for...of
    前端面试点记录
    Vue 高德地图 路径规划 画点
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/13937203.html
Copyright © 2011-2022 走看看