zoukankan      html  css  js  c++  java
  • Flink on Yarn的两种模式

    Flink on Yarn模式部署始末:Flink的Standalone和on Yarn模式都属于集群运行模式,但是有很大的不同,在实际环境中,使用Flink on Yarn模式者居多。

             那么使用on yarn模式到底好在哪呢?

             首先,在集群运行时,可能会有很多的集群实例包括MapReduce、Spark、Flink等等,那么如果它们全基于on Yarn就可以完成资源分配,减少单个实例集群的维护,提高集群的利用率。

             Flink on Yarn模式安装部署要做的其实不多,正常的步骤:1、上传二进制包  ===》2、解压缩 ===》 3、更改文件名称 ===》 4、配置环境变量。

    首先看下面这张图

    Flink on yarn的job运行模式大致分为两类:

    内存集中管理模式:在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。
    内存Job管理模式【推荐使用】:在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。 


    一、内存集中管理模式


    第一种模式分为两步:yarn-session.sh(启动,开辟资源)+flink run(提交任务)

    1、开源资源,使用命令

    yarn-session.sh -n 2 -jm 1024 -tm 1024 -d

    参数解释:
    //-n 2 表示指定两个容器 
    // -jm 1024 表示jobmanager 1024M内存 
    // -tm 1024表示taskmanager 1024M内存 
    //-d 任务后台运行 
    //-nm,--name  YARN上为一个自定义的应用设置一个名字
    //-q,--query  显示yarn中可用的资源 (内存, cpu核数)
    //-z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace
    //-id,--applicationId <yarnAppId>   YARN集群上的任务id,附着到一个后台运行的yarn session中


             由于flink on yarn 模式 是基于hadoop的,如果hadoop 集群没启动,则会连接失败。

             当启动之后,又会出现NameNode处于安全模式,这里没有必要手动关闭。解决方法:等hadoop启动之后差不多20s再提交yarn-session的命令。正常运行后如下图所示,并访问JM的web 接口,这里有个麻烦的事情就是每次需要去看主机名和端口号。

             其实,由于这还是属于一个Yarn application,因此我们也可以通过yarn.resourcemanager.webapp.address端口来选择访问哪一个flink集群,例如我这里刚刚启动了两个Flink集群,这里可通过Tracking UI的值来跳转到对用的Flink集群监控页面。

             关闭某个Flink集群:上述图中大家可以看到有两个Flink集群,这是由于误操作直接按了ctrl+c键,导致前台程序退出,但是真正的Flink集群依然在后台健壮的运行着,为了演示方便,这里又通过上述的命令开启了新的flink yarn-session。现在需要关闭一个,其实也很简单,因为是yarn程序,我们可以直接使用 yarn application -kill application_1552292557465_0001 来结束进程。

    2、提交任务

    为了进行测试,我们对Flink目录下的LICENSE文件进行词频统计

    • 上传文件至HDFS。hadoop fs -put LICENSE /
    • 查看文件是否上传成功。hadoop fs -ls /
    • 执行命令。./flink run ../examples/batch/WordCount.jar -input hdfs://192.168.83.129:9000/LICENSE -output hdfs://192.168.83.129:9000/wordcount-result.txt
    • 查看输出结果。hadoop fs -cat /wordcount-result.txt


    二、内存Job管理模式


    第二种模式其实也分为两个部分,依然是开辟资源和提交任务,但是在Job模式下,这两步都合成一个命令了。

    这里,我们直接执行命令./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ../examples/batch/WordCount.jar。上面的命令中没有指定-input 和 -output,这是由于有默认的数据集和输出方式,看看效果。

    上述方框中内容就是默认的数据集,以及将输出打印到控制台上。下面yarn application的图可以清晰的反映第二种方式,在job结束后就会关闭flink yarn-session的集群。

    第二种方式命令 参数解释:

    flink run [OPTIONS] <jar-file> <arguments>  

    •  "run" 操作参数:  
    // -c,--class <classname>  如果没有在jar包中指定入口类,则需要在这里通过这个参数指定  
    // -m,--jobmanager <host:port>  指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager  
    // -p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。


    三、两种模式区分


    //第一种模式,会去找已有的Flink集群
    默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
    • ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

    //第一种模式,给flink指定一个已有的JM,不让他自己去找

    连接指定host和port的jobmanager:
    • ./bin/flink run -m master:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

     

    //第二种模式,指定为 yarn-cluster

    启动一个新的yarn-session:
    • ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
    注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
    例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 

  • 相关阅读:
    POJ 1751 Highways (kruskal)
    POJ 2031 Building a Space Station
    UVA 624
    POJ 1502 MPI Maelstrom (Dijkstra)
    POJ 3259 Wormholes(SPFA判负环)
    HZAU 1199 Little Red Riding Hood(水DP)
    HZAU 1205 Sequence Number(最大值前后缀 +双指针 + 二分)
    HZAU 1209 Deadline (hash 贪心 水题不水)
    STL完整版整理
    set集合完整版整理
  • 原文地址:https://www.cnblogs.com/7920284109q/p/13625573.html
Copyright © 2011-2022 走看看