zoukankan      html  css  js  c++  java
  • Flink的高可用集群环境

    Flink的高可用集群环境

    Flink简介

           Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布,数据通信以及容错机制等功能。

             因现在主要Flink这一块做先关方面的学习,因此准备要开通Apache Flink专栏这一块定期发布一些文章。今天在自己的博客因为专栏无法申请通过,所以先在此记录第一篇关于Flink部署的文章。

             在这里顺便打个小广告,Flink社区第一季线下meetup,已在上海,北京举办。接下来分别会在成都和深圳举办接下来的几期,也希望小伙伴们踊跃的加入到Flink社区来,下载钉钉,扫描下方二维码即可加入大群。


         首先今天先介绍一下Flink的安装,安装部署最新1.6版本支持有8种安装方式,详细可以参考安装部署方式【Clusters & Deployment】 。下面主要介绍Standalone Cluster模式和on yarn模式 。

    软件包下载地址

    一.Flink独立集群模式安装(Cluster Standalone)

    1.1.解压安装

    [root@h001 soft]# tar -zxvf flink-1.2.0-bin-hadoop26-scala_2.11.tgz -C /usr/bigdata/
    1
    1.2.Flink配置(Configuring Flink)

    对其进行相关的配置。主要涉及到的配置文件是conf/flink-conf.yaml

    flink-conf.yaml配置

    jobmanager.rpc.address:值设置成你master节点的IP地址
    taskmanager.heap.mb:每个TaskManager可用的总内存
    taskmanager.numberOfTaskSlots:每台机器上可用CPU的总数
    parallelism.default:每个Job运行时默认的并行度(这个参数在文档中介绍好像有问题)
    taskmanager.tmp.dirs:临时目录
    jobmanager.heap.mb:每个节点的JVM能够分配的最大内存
    jobmanager.rpc.port: 6123
    jobmanager.web.port: 8081

    [root@h001 conf]# vim flink-conf.yaml
    jobmanager.rpc.address:h001
    taskmanager.heap.mb:2048
    taskmanager.numberOfTaskSlots:4
    parallelism.default:10
    taskmanager.tmp.dirs:/tmp
    jobmanager.heap.mb:2048
    jobmanager.web.port: 8081
    jobmanager.rpc.port: 6123
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    主节点与从节点配置

    [root@h002 conf]# vim slaves
    h002
    h003
    h004
    h005
    [root@h001 conf]# vim masters
    h001:8082
    1
    2
    3
    4
    5
    6
    7
    1.3.Flink安装包分发到所有的worker节点上

    [root@h001 bigdata]# clush -v -w h[002-005] --copy flink-1.5.1 --dest /usr/bigdata/
    1
    1.4.启动Flink(Starting Flink)

           在master节点上运行下面的脚本,那么这台机器上将会启动一个JobManager,并通过SSH连接列在slaves文件中的所有节点以便在每个节点上启动TaskManager

    [root@h001 flink-1.5.1]# bin/start-cluster.sh
    1
    如果停止集群,可以在master节点上运行下面的命令

    [root@h001 flink-1.5.1]# bin/stop-cluster.sh
    1
    1.5. 在已经运行的集群中添加JobManager/TaskManager

          通过bin/taskmanager.sh或者bin/jobmanager.sh脚本在已经运行的集群中添加JobManager或者TaskManager节点

    [root@h001 flink-1.2.0]# bin/jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)

    [root@h001 flink-1.2.0]# bin/taskmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
    1
    2
    3
    二.JobManager高可用(HA)

           JobManager协调每一个Flink集群环境,它负责作业调度和资源管理。默认情况下,一个Flink集群中只有一个JobManager实例,这很容易造成单点故障(SPOF)。如果JobManager奔溃了,那么将没有新的程序被提交,同时运行的程序将失败。
           对于JobManager高可用来说,我们可以从失败的JobManager中恢复,因此可以消除单点故障的问题。我们可以配置Standalone模式和YARN集群模式下的高可用JobManager的HA,是通过Zookeeper实现的,因此需要先搭建好Zookeeper集群,同时HA的信息,还要存储在HDFS中,因此也需要Hadoop集群,最后修改Flink中的配置文件。

    根据部署方式不同,Flink Jobmanager HA配置分为2种:
            1、standalone cluster HA
            2、Yarn cluster HA

    2.1. Standalone集群模式高可用

            对于Standalone集群模式下的JobManager高可用通常的方案是:Flink集群的任一时刻只有一个leading JobManager,并且有多个standby JobManager。当leader失败后,standby通过选举出一个JobManager作为新的leader。这个方案可以保证没有单点故障的问题。对于standby和master JobManager实例来说,其实没有明确的区别,每一个JobManager能够当担master或standby角色。

    2.1.1.相关配置

          为了保证JobManager高可用,你需要设置Zookeeper为recovery mode(恢复模式),配置一个Zookeeper quorum并且对所有的JobManager节点和它们的Web UI端口号设置一个masters文件。

    Flink引入Zookeeper的目的主要是让JobManager实现高可用(leader选举)
    Flink使用Zookeeper在所有运行的JobManager实例中进行分布式调度的协调。Zookeeper在Flink中是一个独立的服务,它能够通过leader选举和轻量级的一致性状态存储来提供高度可靠的分布式协调器
    Master File(masters)
    为了启动一个HA-cluster,需要在conf/masters中配置masters。
    masters文件:masters文件包含所有的hosts,每个host启动都JobManager,并且指定绑定的Web UI端口号:
    jobManagerAddress1:webUIPort1
    [...]
    jobManagerAddressX:webUIPortX
    1
    2
    3
    配置文件flink-conf.yaml

    为了启动一个HA-Cluster,需要在conf/flink-conf.yaml添加如下配置参数:

    Recovery mode(必须的):recovery.mode: zookeeper
    zookeeper quorum(必须的):recovery.zookeeper.quorum: address1:2181,...
    Zookeeper root(推荐的):Flink在Zookeeper中的root节点,下面放置所有需要协调的数据recovery.zookeeper.path.root: /flink
    1
    2
    3
          如果你运行多个Flink HA集群,那么你必须手工配置每个Flink集群使用独立的root节点

    State backend and storage directory(必须的):JobManager元数据在statebackend保持并且仅仅在Zookeeper中存储,目前在HA模式中,仅支持filesystem。
    state.backend: filesystem
    state.backend.fs.checkpointdir:hdfs://namenode-host:port/flink-checkpoints
    recovery.zookeeper.storageDir: hdfs:///recovery
    recovery.zookeeper.storageDir指定的路径中存储了所有的元数据,用来恢复失败的JobManager
    1
    2
    3
    4
    5
    2.2. 两个JobManager的Standalone模式下的集群

    conf/flink-conf.yaml文件

    配置恢复模式和Zookeeper quorum

    [root@h001 conf]# vim flink-conf.yaml
    recovery.mode: zookeeper
    recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
    recovery.zookeeper.path.root: /flink
    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
    recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
    1
    2
    3
    4
    5
    6
    7
    在Hadoop文件系统创建文件夹

    [root@h001 conf]# hadoop fs -mkdir -p /flink/checkpoints
    [root@h001 conf]# hadoop fs -mkdir -p /flink/recovery
    [root@h001 conf]# hadoop fs -chown -R hdfs:supergroup /flink/
    1
    2
    3
    配置conf/masters文件

    [root@h001 conf]# vim masters
    h001:8081
    h002:8081
    1
    2
    3
    配置conf/zoo.cfg文件,添加Zookeeper集群节点

    [root@h001 conf]# vim zoo.cfg
    server.1=h002:2888:3888
    server.2=h003:2888:3888
    server.3=h004:2888:3888
    1
    2
    3
    4
    启动Zookeeper集群

    [root@h001 flink-1.2.0]# bin/start-zookeeper-quorum.sh
    1
    启动Flink集群

    [root@h001 flink-1.2.0]# bin/start-cluster.sh
    1
    经过测试kill掉其中一个jobmanager可切换主备。

    2.2. YARN集群模式高可用

            当运行一个高可用YARN集群时,我们不需要运行多个JobManager(ApplicationMaster)实例,只需要运行一个实例,如果失败了通过YARN来进行重启
            Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:

    用于启动JobManager(AM)的容器
    用于启动TaskManager的容器
    1
    2
    通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息

            其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量
    在配置HA前,先通过-q看一下我的yarn集群的资源情况:

          从图中可以看出,我配置的每个NodeManager的内存是2048MB(yarn-site.xml),每个NodeManager的vcores数量是2。所以,当前yarn集群中可用内存总量为6144,总cores是6

    2.2.1. FLINK ON YARN HA 配置

    配置准备

       在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过HADOOPHOME/sbin/start−all.sh启动hdfs和yarn配置(yarn−site.xml)此配置需要在HADOOPHOME/sbin/start−all.sh启动hdfs和yarn配置(yarn−site.xml)此配置需要在HADOOP_CONF_DIR 的yarn-site.xml添加

    [root@h001 ~]# cd /usr/bigdata/hadoop/etc/hadoop/
    [root@h001 hadoop]# vim yarn-site.xml
    <property>
    <name>yarn.resourcemanager.am.max-attempts</name>
    <value>4</value>
    </property>
    1
    2
    3
    4
    5
    6
    此配置代表application master在重启时,尝试的最大次数

    [root@h001 hadoop]# clush -v -w h[002-005] --copy yarn-site.xml --dest /usr/bigdata/hadoop/etc/hadoop/
    1
    配置(flink-conf.yaml),此参数需要在$FLINK_HOME/conf 的flink-conf.yaml中配置

    [root@h001 conf]# vim flink-conf.yaml
    yarn.application-attempts: 10
    1
    2
          此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
          注意,Flink On Yarn环境中,当Jobmanager(ApplicationMaster)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps

    [root@h001 conf]# clush -v -w h[002-005] --copy flink-conf.yaml --dest /usr/bigdata/flink-1.5.1/conf/
    1
    配置zookeeper信息
          虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息。其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。
          在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。

    完整的Flink配置信息如下:

    root@h001 conf]# vim flink-conf.yaml
    env.java.home: /usr/java/jdk1.8.0_111
    recovery.mode: zookeeper
    recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
    recovery.zookeeper.path.root: /flink
    recovery.zookeeper.path.namespace: /cluster_yarn
    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
    recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
    taskmanager.network.numberOfBuffers: 64000
    fs.hdfs.hadoopconf: /usr/bigdata/hadoop/etc/Hadoop
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    以上的yarn HA配置可在Standalone集群模式下进一步添加几个参数即可完成。

    2.2.2.启动FLINK YARN SESSION

    在YARN上启动一个Flink主要有两种方式:

    (1)、启动一个YARN session(Start a long-running Flink cluster on YARN)
    (2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)
    1
    2
    Flink YARN Session

    启动Flink Yarn Session有2种模式:

    (1)、分离模式
    (2)、客户端模式
    1
    2
          通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill 命令来停止
          这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和TaskManagers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)。我们可以通过./bin/yarn-session.sh脚本启动YARN Session。
          在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个。
    采用客户端模式来启动Flink Yarn Session:

    [root@h001 flink-1.5.1]# bin/yarn-session.sh -n 4 -jm 4096 -tm 8192 -s 2 -nm FlinkOnYarnSession -d -st
    或者
    ./bin/yarn-session.sh -n 4 -tm 8192 -s 8
    1
    2
    3
    参数说明:

    -n:--container 指YARN container分配的个数(即TaskManagers的个数)
    -jm:--jobManagerMemory 指JobManager Containe的内存大小,单位为MB
    -tm:--taskManagerMemory 指每个TaskManagerContainer的内存大小,单位为MB
    -s :指每个TaskManager的slot个数
    1
    2
    3
    4
    可以通过yarn的webUI查看一下当前启动的Application

    通过ApplicationMaster tracking一下Flink的WebUI
    http://192.168.xxx.xxx:8088/proxy/application_1500340359200_0002/#/overview
    提交作业
    使用bin/flink脚本提交作业,同样我们来看看这个脚本支持哪些参数:

    [root@h001 flink-1.5.1]# bin/flink run
    bin/flink run ./examples/batch/WordCount.jar
    --input hdfs:///user/test/LICENSE
    --output hdfs:///user/test/result.txt
    1
    2
    3
    4
    后面相应的跟上参数提交作业即可。

    Run a single Flink job on YARN

          上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业。这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

    [root@h001 flink-1.5.1]# bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
    --input hdfs:///user/test/LICENSE
    --output hdfs:///user/test/result.txt
    1
    2
    3
          上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须指定。
    ---------------------
    作者:独行夏
    来源:CSDN
    原文:https://blog.csdn.net/u013368491/article/details/81610672
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    eclipse中创建完整的maven项目
    Nginx+tomcat配置集群负载均衡
    Git的安装与使用
    Angularjs checkbox的ng属性
    chrome渲染hover状态tranform相邻元素抖动bug
    nodejs创建express+ejs项目
    ubuntu常用命令
    ubuntu查看命令
    sublime text2卸载和重新安装
    fiddler代理
  • 原文地址:https://www.cnblogs.com/bigben0123/p/10454836.html
Copyright © 2011-2022 走看看