flink on yarn
为什么要用yarn?
如果不用yarn.假设有10个job运行在flink集群上,如果有一个出问题.发生了OOM,最后导致taskmanager挂掉.那么jobmanager会调度任务到其他的taskmanager上面.最后是连锁反应,会造成所有的taskmanager都挂掉.集群挂掉.
所以要用yarn.
flink on yarn 有两种形式
- yarn per job
- yarn session
yarn per job.
意思就是,一个任务一个集群.如果你这个任务出问题了,那就你自己挂掉,其他集群会活的好好的.
但是缺点是,每个任务都要一个jobmanager.都要单独的内存.
如果一个jobmanager占用1g内存.那么要是有50个任务就会占用50g内存.浪费.
适合执行时间长的作业
yarn session
因为上一个的缺点,jobmanager浪费内存.因此有了这个模式.
意思就是我在yarn中启动一个集群,然后不重要的任务,都放在这里面运行,或者在这里面测试.
相当于一个沙箱.
- 适合规模小,执行时间短的作业
- 离线处理
- 共享资源
- 可用增加taskmanager 和终止空闲的taskmanager
yarn-per-job启动参数
./flink run
-m yarn-cluster
-yn 1
-yjm 1024
-ytm 4096
-ynm FlinkOnYarnSession-MemberLogInfoProducer
-c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer
/home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
参数名 含义
-m 固定为yarn-cluster
-yjm 指定JobManager所在的Container内存。单位:MB
-ytm 每一个TaskManager Container的内存,单位MB。
-ys 每一个TaskManager中slots的数量。
-ynm YARN中application的名称。
-c 指定Job对应的jar包中主函数所在类名。
说明
这里只有一个taskmanager.
假设你的任务并行度是5.那么 -ys 就是5 .
假设你的任务需要8G内存.那么 -ytm 就是 8192
-yjm 固定为 1024
例子
/data/flink-1.10.1/bin/flink run
-m yarn-cluster
-yjm 1024
-ytm 18432
-ys 3
-p 6
-yD env.java.opts="-XX:+UseG1GC"
-ynm huadan_active_job
-c com.xxxxxx.FlinkHuadanActiveJob
/data/hadoop/data/xxxxx-analysis-1.0-SNAPSHOT.jar
使用 G1 收集器
启动 2 个 tm ,每个3slot,18G 内存.
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
yarn session 启动参数
先启动 yarn-session
例子:
#!/bin/bash
/data/flink-1.10.1/bin/yarn-session.sh
-s 4
-jm 1g
-tm 8g
-d
-ynm yarn-flink
yarn-session.sh -n 5 -jm 1024 -tm 2048 -s 2 -d
参数解释:
// -jm 1024 表示jobmanager 1024M内存
// -tm 1024表示taskmanager 1024M内存
// -s 每一个TaskManager上的slots数量。
//-d 任务后台运行
//-nm,--name YARN上为一个自定义的应用设置一个名字
意思:
启动一个集群.有5个taskmanager,每个taskmanager 内存2G,2个slot.
但是在开始的时候,是没有的,在你提交任务的时候,就会创建taskmanager.
比如你提交一个 3并行度的任务,就会创建出来2个taskmanager,一共有4个slot.剩余1个.
你再启动一个2并行度的任务,那么就会再启动1个taskmanager,一共 4个slot,还剩1个slot.
因此在yarn-session中,taskmanager的slot不要设置的过多.
n可以大一点,后面可以扩充.
提交到yarn-session
/data/flink-1.10.1/bin/flink run
-yid {application id}
-p 3
-yD env.java.opts="-XX:+UseG1GC"
-c com.xxxxxxx.FlinkBatchTimeLengthJob
/data/hadoop/data/xxxxxxx.jar
Usage:
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-at,--applicationType Set a custom application type on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode