zoukankan      html  css  js  c++  java
  • Flink生成StreamGraph

    使用DataStream API开发的应用程序,首先被转换为Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。

    StreamGraph转换流程如下:

    1.入口为StreamExecutionEnviroment.execute()方法,在该方法中调用getStreamGraph()方法触发StreamGraph的构建。

    image-20201028215749021

    2.调用StreamExecutionEnviroment.getStreamGraph()方法

    image-20201028222848146

    3.在generate()方法里面调用transformation()方法对每个transformation进行转换

    image-20201029141119795

    4.调用transformxxx()方法进行转换

    image-20201029141716542

    private Collection<Integer> transform(Transformation<?> transform) {
    
       // 如果已经转换过,直接跳过
       if (alreadyTransformed.containsKey(transform)) {
          return alreadyTransformed.get(transform);
       }
    
       LOG.debug("Transforming " + transform);
    
       if (transform.getMaxParallelism() <= 0) {
    
          // if the max parallelism hasn't been set, then first use the job wide max parallelism
          // from the ExecutionConfig.
          int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
          if (globalMaxParallelismFromConfig > 0) {
             transform.setMaxParallelism(globalMaxParallelismFromConfig);
          }
       }
    
       // call at least once to trigger exceptions about MissingTypeInfo
       transform.getOutputType();
    
       Collection<Integer> transformedIds;
       // 对具体的Transformation进行转换,转成StreamGraph中的StreamNode和StreamEdge,返回transformedIds
       if (transform instanceof OneInputTransformation<?, ?>) {
          transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
       } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
          transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
       } else if (transform instanceof SourceTransformation<?>) {
          transformedIds = transformSource((SourceTransformation<?>) transform);
       } else if (transform instanceof SinkTransformation<?>) {
          transformedIds = transformSink((SinkTransformation<?>) transform);
       } else if (transform instanceof UnionTransformation<?>) {
          transformedIds = transformUnion((UnionTransformation<?>) transform);
       } else if (transform instanceof SplitTransformation<?>) {
          transformedIds = transformSplit((SplitTransformation<?>) transform);
       } else if (transform instanceof SelectTransformation<?>) {
          transformedIds = transformSelect((SelectTransformation<?>) transform);
       } else if (transform instanceof FeedbackTransformation<?>) {
          transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
       } else if (transform instanceof CoFeedbackTransformation<?>) {
          transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
       } else if (transform instanceof PartitionTransformation<?>) {
          transformedIds = transformPartition((PartitionTransformation<?>) transform);
       } else if (transform instanceof SideOutputTransformation<?>) {
          transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
       } else {
          throw new IllegalStateException("Unknown transformation: " + transform);
       }
    
       // need this check because the iterate transformation adds itself before
       // transforming the feedback edges
       if (!alreadyTransformed.containsKey(transform)) {
          alreadyTransformed.put(transform, transformedIds);
       }
    
       if (transform.getBufferTimeout() >= 0) {
          streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
       } else {
          streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
       }
    
       if (transform.getUid() != null) {
          streamGraph.setTransformationUID(transform.getId(), transform.getUid());
       }
       if (transform.getUserProvidedNodeHash() != null) {
          streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
       }
    
       if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
          if (transform instanceof PhysicalTransformation &&
                transform.getUserProvidedNodeHash() == null &&
                transform.getUid() == null) {
             throw new IllegalStateException("Auto generated UIDs have been disabled " +
                "but no UID or hash has been assigned to operator " + transform.getName());
          }
       }
    
       if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
          streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
       }
    
       streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());
    
       return transformedIds;
    }
    

    5.以transformOneInputTransform()方法为例,会对上游transform递归转换,然后构造出StreamNode和StreamEdge。

    image-20201029142135457

  • 相关阅读:
    NFS 服务器实验
    ssh服务实验
    dhcp服务实验
    邮件服务器
    搭建 DNS 服务器
    搭建 web 服务器
    Feign下的数据传递
    基于Spring Cloud Feign的Mock工具
    Git 使用注意事项
    基于redisson的延迟队列
  • 原文地址:https://www.cnblogs.com/jordan95225/p/13896917.html
Copyright © 2011-2022 走看看