zoukankan      html  css  js  c++  java
  • 【源码】Flink 三层图结构 —— JobGraph 生成过程

    注: 所有内容都基于Flink 本地模式

    JobGraph 的生成是从 LocalExecutor.java. execute 方法开始的

    // 本地执行调用  Pipeline 是 StreamGraph 的父类
    @Override
    public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
        checkNotNull(pipeline);
        checkNotNull(configuration);
    
        Configuration effectiveConfig = new Configuration();
        effectiveConfig.addAll(this.configuration);
        effectiveConfig.addAll(configuration);
    
        // we only support attached execution with the local executor.
        checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
        // 生成 jobGraph,传入 StreamGraph、 配置
        final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
    
        return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
    }

    pipeline 即使刚生成的 StreamGraph, configuration 即是启动配置。

    pipeline 内容如下图,主要包含 StreamNode 和其他配置

    effectiveConfig 配置: {execution.attached=true, execution.target=local} 即本地模式

    getJobGraph 方法

    private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
        // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
        // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
        // for now.
        // 如果是 batch
        if (pipeline instanceof Plan) {
            Plan plan = (Plan) pipeline;
            final int slotsPerTaskManager = configuration.getInteger(
                    TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
            final int numTaskManagers = configuration.getInteger(
                    ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
    
            plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
        }
        // 生成 JobGraph
        return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
    }

    PipelineExecutorUtils: 作业执行相关的方法的工具类。

    注: 从 StreamGraph 和 JobGraph 两个 Graph 生成类的命令看,好像是两拨人搞的,不过 JobGraph 是通用的,StreamGraph 则绑定在 Environment 上

    PipelineExecutorUtils.java.getJobGraph

    public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
        checkNotNull(pipeline);
        checkNotNull(configuration);
    
        // 默认配置参数
        final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
        // jobGraph
        final JobGraph jobGraph = FlinkPipelineTranslationUtil
            // 默认并行度 1
                .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
    
        configuration
                .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
                .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));
        // 设置空的默认配置
        jobGraph.addJars(executionConfigAccessor.getJars());
        jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
        jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
    
        return jobGraph;
    }

    又到了 FlinkPipelineTranslationUtil.getJobGraph 多传了个 默认并行度

    public static JobGraph getJobGraph(
                Pipeline pipeline,
                Configuration optimizerConfiguration,
                int defaultParallelism) {
    
        // 获取程序的 Translator stream or batch
        FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
        // 使用 pipelineTranslator translate JobGraph
        return pipelineTranslator.translateToJobGraph(pipeline,
                optimizerConfiguration,
                defaultParallelism);
    }

    getPipelineTranslator 比较粗暴了

    // 创建另个 Translator, 如果可以 Translate 就返回对应的 Translator,不行就抱错
    private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
        // DataSet 的 PlanTranslator
        PlanTranslator planTranslator = new PlanTranslator();
        // pipeline instanceof Plan
        if (planTranslator.canTranslate(pipeline)) {
            return planTranslator;
        }
        // Stream 的 Translator
        StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator();
        // pipeline instanceof StreamGraph
        if (streamGraphTranslator.canTranslate(pipeline)) {
            return streamGraphTranslator;
        }
    
        throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate "
            + "the given pipeline " + pipeline + ".");
    }

    batch 的当然是直接跳过,看下 StreamGraphTranslator 的 translateToJobGraph 方法

    public JobGraph translateToJobGraph(
                Pipeline pipeline,
                Configuration optimizerConfiguration,
                int defaultParallelism) {
            checkArgument(pipeline instanceof StreamGraph,
                    "Given pipeline is not a DataStream StreamGraph.");
    
        // pipeline 转回 子类 StreamGraph
        StreamGraph streamGraph = (StreamGraph) pipeline;
        // 使用 streamGraph 生成 JobGraph (又回到 StreamGraph 类了)
        return streamGraph.getJobGraph(null);
    }

    StreamGraph.java

    public JobGraph getJobGraph(@Nullable JobID jobID) {
        return StreamingJobGraphGenerator.createJobGraph(this, jobID);
    }

    StreamingJobGraphGenerator.java (熟悉的命令)

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        // 创建 StreamingJobGraphGenerator 随后生成 JobGraph
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

    StreamingJobGraphGenerator.java 终于把 jobGraph 的对象创建出来了( jobID 是 Null 的)

    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
    
        this.jobVertices = new HashMap<>();
        this.builtVertices = new HashSet<>();
        this.chainedConfigs = new HashMap<>();
        this.vertexConfigs = new HashMap<>();
        this.chainedNames = new HashMap<>();
        this.chainedMinResources = new HashMap<>();
        this.chainedPreferredResources = new HashMap<>();
        this.chainedInputOutputFormats = new HashMap<>();
        this.physicalEdgesInOrder = new ArrayList<>();
    
        jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

    JobGraph.java 构造方法

    public JobGraph(JobID jobId, String jobName) {
        this.jobID = jobId == null ? new JobID() : jobId;
        this.jobName = jobName == null ? "(unnamed job)" : jobName;
    
        try {
            setExecutionConfig(new ExecutionConfig());
        } catch (IOException e) {
            // this should never happen, since an empty execution config is always serializable
            throw new RuntimeException("bug, empty execution config is not serializable");
        }
    }

    StreamingJobGraphGenerator.java.createJobGraph 方法生成 jobGraph


    JobGraph 的核心方法,做了如下内容:
    1、检验 checkpoint 配置
    2、启动模式
    3、生成一致性 hash
    4、核心方法,构造算子 chain, 生成算子名,设置资源, chain 算子,生成 JobVetex
    5、添加物理的边
    6、设置 sharingGroup 和 coLocation
    7、设置内存比例
    8、配置 checkpoint
    9、添加 user Artifact
    10、把 streamGrpaht 的 配置 放到 jobGraph 中

    private JobGraph createJobGraph() {
        // 检验 checkpoint 配置
        preValidate();
    
        // make sure that all vertices start immediately
        // stream 模式 启动时 所有 task 全部启动
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    
        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        // 为节点生成确定性哈希,以便在提交不变的情况下在提交时对其进行标识。
        // 每个 streamNode id 的 hash 数组
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    
        // Generate legacy version hashes for backwards compatibility
        // 生成旧版本哈希以向后兼容
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
    
        // 核心方法,构造 JobVetex
        setChaining(hashes, legacyHashes);
    
        // 添加物理的边
        setPhysicalEdges();
        // 设置 sharingGroup 和  coLocation
        setSlotSharingAndCoLocation();
        // 设置内存比例
        setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getMinResources(),
            id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
        // 配置 checkpoint
        configureCheckpointing();
    
        // 配置 savepoint 策略
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
    
        // 添加 user Artifact
        JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
    
        // set the ExecutionConfig last when it has been finalized
        try {
            // 把 streamGrpaht 的 配置 放到  jobGraph 中
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                "This indicates that non-serializable types (like custom serializers) were registered");
        }
    
        return jobGraph;
    }

    设置算子 chian 是 生成 JobGraph 的核心方法,从 source 节点开始,依次往下游遍历,递归生成算子 chain

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        // 每个source 创建一个 chain
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
            // sourceId  chainIndex  OpeartorChainInfo : SourceNodeId heades legacyHashed? streamGraph
            createChain(
                sourceNodeId,
                // source Node 位于 chain 的 0 位置
                0,
                new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
        }
    }
    
    // 创建一个 chain,递归方法: 如果chain 断了,又重新创建一个
    private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
        // startNodeId
        Integer startNodeId = chainInfo.getStartNodeId();
        // 已经创建的 就返回个空的
        if (!builtVertices.contains(startNodeId)) {
            // 遍历过的列表
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            // chain 在一起的输出
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // 非 chain 在一起的输出
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            // 当前节点
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
            // 遍历当前节点的输出边,看下游是否可以 chain 起来
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    // 可以 chain 的 放到 chainableOutputs 列表
                    chainableOutputs.add(outEdge);
                } else {
                    // 不可以 chain 的 放到 nonChainableOutputs 列表
                    nonChainableOutputs.add(outEdge);
                }
            }
            // 遍历可以chain 在一起的边
            for (StreamEdge chainable : chainableOutputs) {
                // 以上游节点的 chainIndex + 1, 递归的把所有可以 chain 的下游添加进来
                transitiveOutEdges.addAll(
                    createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
            }
            // 遍历不可以chain 在一起的边
            for (StreamEdge nonChainable : nonChainableOutputs) {
                // 添加当前边到遍历的列表
                transitiveOutEdges.add(nonChainable);
                // 以当前边为起点,递归的生成新的 chain
                createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
            }
    
            // 设置 chain 的 名字: 拼接 chain 在一起的算子名
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            // 设置 chain 最小资源: ResourceSpec{UNKNOWN}
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 设置 chain 最优资源: ResourceSpec{UNKNOWN}
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
    
            // 添加 当前节点 到 chain 中
            OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
            // 输入格式
            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
    
            // 生成格式
            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
    
            // 创建 JobVertex, 添加到 jobGraph 的 taskVertices 中
            StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, chainInfo)
                : new StreamConfig(new Configuration());
            // 设置 Vertex config
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
    
            // 如果是 chain start 就 connect chain 上的边
            if (currentNodeId.equals(startNodeId)) {
    
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
    
                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }
    
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
            } else {
                // 如果不是
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
                // 设置 节点在 chain 中的 index
                config.setChainIndex(chainIndex);
                // 获取当前节点
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                // 设置节点 operatorName
                config.setOperatorName(node.getOperatorName());
                // 放到 对应 chain 中
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
    
            // 设置算子 id
            config.setOperatorID(currentOperatorId);
    
            // 如果 chain 没有可以chain 在一起的输出,结束当前 chain
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;
    
        } else {
            return new ArrayList<>();
        }
    }

    从 source 节点开始,递归的生成算子 chain, 先把可以 chain 的递归添加进来,遍历不能 chain 的节点,以它为起点创建个新的算子 chain,继续递归

    算子 chain 是JobGraph 的核心内容( 还有 slotSharingGroup )

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    CentOS初步学习记录(五)用户和用户组
    CentOS初步学习记录(四)curl文件传输和grep文本搜索
    微信公众号推文svg点击交互的方法
    layaair写动画的方法 用时间轴最方便
    spring boot 整合Elasticsearch
    java spring boot 拦截器 实现未登录用户不能登录
    java win 安装 activemq教程
    java spring boot Swagger2 构建api文档
    java spring boot @bean的用法
    java spring boot 写入日志
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14110843.html
Copyright © 2011-2022 走看看