zoukankan      html  css  js  c++  java
  • flink on yarn

    为什么要用yarn?

    如果不用yarn.假设有10个job运行在flink集群上,如果有一个出问题.发生了OOM,最后导致taskmanager挂掉.那么jobmanager会调度任务到其他的taskmanager上面.最后是连锁反应,会造成所有的taskmanager都挂掉.集群挂掉.

    所以要用yarn.

    flink on yarn 有两种形式

    • yarn per job
    • yarn session

    yarn per job.

    意思就是,一个任务一个集群.如果你这个任务出问题了,那就你自己挂掉,其他集群会活的好好的.

    但是缺点是,每个任务都要一个jobmanager.都要单独的内存.

    如果一个jobmanager占用1g内存.那么要是有50个任务就会占用50g内存.浪费.

    适合执行时间长的作业

    yarn session

    因为上一个的缺点,jobmanager浪费内存.因此有了这个模式.

    意思就是我在yarn中启动一个集群,然后不重要的任务,都放在这里面运行,或者在这里面测试.

    相当于一个沙箱.

    • 适合规模小,执行时间短的作业
    • 离线处理
    • 共享资源
    • 可用增加taskmanager 和终止空闲的taskmanager

    yarn-per-job启动参数

    ./flink run 
    -m yarn-cluster 
    -yn 1 
    -yjm 1024 
    -ytm 4096 
    -ynm FlinkOnYarnSession-MemberLogInfoProducer
    -c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer 
    /home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
    
    参数名	含义
    -m	固定为yarn-cluster
    -yjm	指定JobManager所在的Container内存。单位:MB
    -ytm	每一个TaskManager Container的内存,单位MB。
    -ys	每一个TaskManager中slots的数量。
    -ynm	YARN中application的名称。
    -c	指定Job对应的jar包中主函数所在类名。
    

    说明

    这里只有一个taskmanager.

    假设你的任务并行度是5.那么 -ys 就是5 .

    假设你的任务需要8G内存.那么 -ytm 就是 8192

    -yjm 固定为 1024

    例子

    /data/flink-1.10.1/bin/flink run 
    -m yarn-cluster 
    -yjm 1024 
    -ytm 18432 
    -ys 3 
    -p 6 
    -yD env.java.opts="-XX:+UseG1GC" 
    -ynm huadan_active_job 
    -c com.xxxxxx.FlinkHuadanActiveJob 
    /data/hadoop/data/xxxxx-analysis-1.0-SNAPSHOT.jar 
    
    使用 G1 收集器
    启动 2 个 tm ,每个3slot,18G 内存.
    
    
    Options for yarn-cluster mode:
         -d,--detached                        If present, runs the job in detached
                                              mode
         -m,--jobmanager <arg>                Address of the JobManager (master) to
                                              which to connect. Use this flag to
                                              connect to a different JobManager than
                                              the one specified in the
                                              configuration.
         -yat,--yarnapplicationType <arg>     Set a custom application type for the
                                              application on YARN
         -yD <property=value>                 use value for given property
         -yd,--yarndetached                   If present, runs the job in detached
                                              mode (deprecated; use non-YARN
                                              specific option instead)
         -yh,--yarnhelp                       Help for the Yarn session CLI.
         -yid,--yarnapplicationId <arg>       Attach to running YARN session
         -yj,--yarnjar <arg>                  Path to Flink jar file
         -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                              optional unit (default: MB)
         -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                              application
         -ynm,--yarnname <arg>                Set a custom name for the application
                                              on YARN
         -yq,--yarnquery                      Display available YARN resources
                                              (memory, cores)
         -yqu,--yarnqueue <arg>               Specify YARN queue.
         -ys,--yarnslots <arg>                Number of slots per TaskManager
         -yt,--yarnship <arg>                 Ship files in the specified directory
                                              (t for transfer)
         -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                              optional unit (default: MB)
         -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                              sub-paths for high availability mode
         -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                              sub-paths for high availability mode
    
    

    yarn session 启动参数

    先启动 yarn-session

    例子:

    #!/bin/bash
    
    /data/flink-1.10.1/bin/yarn-session.sh 
    -s 4 
    -jm 1g 
    -tm 8g 
    -d 
    -ynm yarn-flink
    
    
    yarn-session.sh -n 5 -jm 1024 -tm 2048 -s 2 -d
    参数解释:
    // -jm 1024 表示jobmanager 1024M内存
    // -tm 1024表示taskmanager 1024M内存
    // -s  每一个TaskManager上的slots数量。
    //-d 任务后台运行
    //-nm,--name YARN上为一个自定义的应用设置一个名字
    
    
    意思:
    启动一个集群.有5个taskmanager,每个taskmanager 内存2G,2个slot.
    但是在开始的时候,是没有的,在你提交任务的时候,就会创建taskmanager.
    比如你提交一个 3并行度的任务,就会创建出来2个taskmanager,一共有4个slot.剩余1个.
    你再启动一个2并行度的任务,那么就会再启动1个taskmanager,一共 4个slot,还剩1个slot.
     
    因此在yarn-session中,taskmanager的slot不要设置的过多.
    n可以大一点,后面可以扩充.
    

    提交到yarn-session

    /data/flink-1.10.1/bin/flink run 
    -yid {application id} 
    -p 3 
    -yD env.java.opts="-XX:+UseG1GC" 
    -c com.xxxxxxx.FlinkBatchTimeLengthJob 
    /data/hadoop/data/xxxxxxx.jar 
    
    
    Usage:
       Optional
         -D <arg>                        Dynamic properties
         -d,--detached                   Start detached
         -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
         -nm,--name                      Set a custom name for the application on YARN
         -at,--applicationType           Set a custom application type on YARN
         -q,--query                      Display available YARN resources (memory, cores)
         -qu,--queue <arg>               Specify YARN queue.
         -s,--slots <arg>                Number of slots per TaskManager
         -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
         -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode
    
  • 相关阅读:
    小小不爽一下
    银行家算法的讨论
    【转】C字符串处理函数的实现
    Oracle物理存储结构文件
    RAC和ASM环境下修改控制文件control file
    TNS01190: The user is not authorized to execute the requested listener comm (oracle”用户没有启动lisener的权限?)
    RAC环境ASM存储新增控制文件的方法
    Oracle RAC 修改 spfile 文件位置
    Rman通过duplicate创建standby
    rman恢复手册
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034567.html
Copyright © 2011-2022 走看看