zoukankan      html  css  js  c++  java
  • Flink on YARN时,如何确定TaskManager数

    转自: https://www.jianshu.com/p/5b670d524fa5

    答案写在最前面:Job的最大并行度除以每个TaskManager分配的任务槽数。

    问题

    Flink 1.5 Release Notes中,有这样一段话,直接上截图。

     

    这说明从1.5版本开始,Flink on YARN时的容器数量——亦即TaskManager数量——将由程序的并行度自动推算,也就是说flink run脚本的-yn/--yarncontainer参数不起作用了。那么自动推算的规则是什么呢?要弄清楚它,先来复习Flink的并行度(Parallelism)和任务槽(Task Slot)。

    并行度(Parallelism)

    与Spark类似地,一个Flink Job在生成执行计划时也划分成多个Task。Task可以是Source、Sink、算子或算子链(算子链有点意思,之后会另写文章详细说的)。Task可以由多线程并发执行,每个线程处理Task输入数据的一个子集。而并发的数量就称为Parallelism,即并行度。

    Flink程序中设定并行度有4种级别,从低到高分别为:算子级别、执行环境(ExecutionEnvironment)级别、客户端(命令行)级别、配置文件(flink-conf.yaml)级别。实际执行时,优先级则是反过来的,算子级别最高。简单示例如下。

    • 算子级别
    dataStream.flatMap(new SomeFlatMapFunction()).setParallelism(4);
    
    • 执行环境级别
    streamExecutionEnvironment.setParallelism(4);
    
    • 命令行级别
    bin/flink -run --parallelism 4 example-0.1.jar
    
    • flink-conf.yaml级别
    parallelism.default: 4
    

    任务槽(Task Slot)

    Flink运行时由两个组件组成:JobManager与TaskManager,与Spark Standalone模式下的Master与Worker是同等概念。从官网抄来的图如下所示,很容易理解。

     

    JobManager和TaskManager本质上都是JVM进程。为了提高Flink程序的运行效率和资源利用率,Flink在TaskManager中实现了任务槽(Task Slot)。任务槽是Flink计算资源的基本单位,每个任务槽可以在同一时间执行一个Task,而TaskManager可以拥有一个或者多个任务槽。

    任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存。CPU资源不算在内。

    TaskManager的任务槽个数在使用flink run脚本提交on YARN作业时用-ys/--yarnslots参数来指定,另外在flink-conf.yaml文件中也有默认值taskManager.numberOfTaskSlots。一般来讲,我们设定该参数时可以将它理解成一个TaskManager可以利用的CPU核心数,因此也要根据实际情况(集群的CPU资源和作业的计算量)来确定。

    确定TaskManager数

    以Flink自带示例中简化的WordCount程序为例:

        // 执行环境并行度设为6
        env.setParallelism(6);
        // Source并行度为1
        DataStream<String> text = env
          .readTextFile(params.get("input"))
          .setParallelism(1);
        DataStream<Tuple2<String, Integer>> counts = text
          .flatMap(new Tokenizer())
          .keyBy(0)
          .sum(1);
        counts.print();
    

    --yarnslots 3参数来执行,即每个TaskManager分配3个任务槽。TaskManager、任务槽和任务的分布将如下图所示,方括号内的数字为并行线程的编号。

     
    图来自http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/,致敬

    由图中可以看出,由于算子链机制的存在,KeyAgg与Sink操作链接在了一起,作为一个Task来执行。

    Flink允许任务槽共享,即来自同一个Job的不同Task的Sub-Task(理解为Task的子集就行)进入同一个槽位,因此在图中也可以见到任务槽X中同时存在FlatMap[X]与KeyAgg[X]+Sink[X]。任务槽共享有两点好处:

    • 能够让每个Task的Sub-Task都均摊到不同的TaskManager,避免负载倾斜。
    • 不需要再计算App一共需要起多少个Task,因为作业需要的任务槽数量肯定等于Job中最大的并行度。

    所以,可以得出Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager,如Web UI所示。

     
  • 相关阅读:
    hdu1238 Substrings
    CCF试题:高速公路(Targin)
    hdu 1269 迷宫城堡(Targin算法)
    hdu 1253 胜利大逃亡
    NYOJ 55 懒省事的小明
    HDU 1024 Max Sum Plus Plus
    HDU 1087 Super Jumping! Jumping! Jumping!
    HDU 1257 最少拦截系统
    HDU 1069 Monkey and Banana
    HDU 1104 Remainder
  • 原文地址:https://www.cnblogs.com/leon0/p/11607844.html
Copyright © 2011-2022 走看看