Flink简介
Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布,数据通信以及容错机制等功能。
因现在主要Flink这一块做先关方面的学习,因此准备要开通Apache Flink专栏这一块定期发布一些文章。今天在自己的博客因为专栏无法申请通过,所以先在此记录第一篇关于Flink部署的文章。
在这里顺便打个小广告,Flink社区第一季线下meetup,已在上海,北京举办。接下来分别会在成都和深圳举办接下来的几期,也希望小伙伴们踊跃的加入到Flink社区来,下载钉钉,扫描下方二维码即可加入大群。
首先今天先介绍一下Flink的安装,安装部署最新1.6版本支持有8种安装方式,详细可以参考安装部署方式【Clusters & Deployment】 。下面主要介绍Standalone Cluster模式和on yarn模式 。
软件包下载地址
一.Flink独立集群模式安装(Cluster Standalone)
1.1.解压安装
[root@h001 soft]# tar -zxvf flink-1.2.0-bin-hadoop26-scala_2.11.tgz -C /usr/bigdata/
1
1.2.Flink配置(Configuring Flink)
对其进行相关的配置。主要涉及到的配置文件是conf/flink-conf.yaml
flink-conf.yaml配置
jobmanager.rpc.address:值设置成你master节点的IP地址
taskmanager.heap.mb:每个TaskManager可用的总内存
taskmanager.numberOfTaskSlots:每台机器上可用CPU的总数
parallelism.default:每个Job运行时默认的并行度(这个参数在文档中介绍好像有问题)
taskmanager.tmp.dirs:临时目录
jobmanager.heap.mb:每个节点的JVM能够分配的最大内存
jobmanager.rpc.port: 6123
jobmanager.web.port: 8081
[root@h001 conf]# vim flink-conf.yaml
jobmanager.rpc.address:h001
taskmanager.heap.mb:2048
taskmanager.numberOfTaskSlots:4
parallelism.default:10
taskmanager.tmp.dirs:/tmp
jobmanager.heap.mb:2048
jobmanager.web.port: 8081
jobmanager.rpc.port: 6123
1
2
3
4
5
6
7
8
9
10
主节点与从节点配置
[root@h002 conf]# vim slaves
h002
h003
h004
h005
[root@h001 conf]# vim masters
h001:8082
1
2
3
4
5
6
7
1.3.Flink安装包分发到所有的worker节点上
[root@h001 bigdata]# clush -v -w h[002-005] --copy flink-1.5.1 --dest /usr/bigdata/
1
1.4.启动Flink(Starting Flink)
在master节点上运行下面的脚本,那么这台机器上将会启动一个JobManager,并通过SSH连接列在slaves文件中的所有节点以便在每个节点上启动TaskManager
[root@h001 flink-1.5.1]# bin/start-cluster.sh
1
如果停止集群,可以在master节点上运行下面的命令
[root@h001 flink-1.5.1]# bin/stop-cluster.sh
1
1.5. 在已经运行的集群中添加JobManager/TaskManager
通过bin/taskmanager.sh或者bin/jobmanager.sh脚本在已经运行的集群中添加JobManager或者TaskManager节点
[root@h001 flink-1.2.0]# bin/jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
[root@h001 flink-1.2.0]# bin/taskmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
1
2
3
二.JobManager高可用(HA)
JobManager协调每一个Flink集群环境,它负责作业调度和资源管理。默认情况下,一个Flink集群中只有一个JobManager实例,这很容易造成单点故障(SPOF)。如果JobManager奔溃了,那么将没有新的程序被提交,同时运行的程序将失败。
对于JobManager高可用来说,我们可以从失败的JobManager中恢复,因此可以消除单点故障的问题。我们可以配置Standalone模式和YARN集群模式下的高可用JobManager的HA,是通过Zookeeper实现的,因此需要先搭建好Zookeeper集群,同时HA的信息,还要存储在HDFS中,因此也需要Hadoop集群,最后修改Flink中的配置文件。
根据部署方式不同,Flink Jobmanager HA配置分为2种:
1、standalone cluster HA
2、Yarn cluster HA
2.1. Standalone集群模式高可用
对于Standalone集群模式下的JobManager高可用通常的方案是:Flink集群的任一时刻只有一个leading JobManager,并且有多个standby JobManager。当leader失败后,standby通过选举出一个JobManager作为新的leader。这个方案可以保证没有单点故障的问题。对于standby和master JobManager实例来说,其实没有明确的区别,每一个JobManager能够当担master或standby角色。
2.1.1.相关配置
为了保证JobManager高可用,你需要设置Zookeeper为recovery mode(恢复模式),配置一个Zookeeper quorum并且对所有的JobManager节点和它们的Web UI端口号设置一个masters文件。
Flink引入Zookeeper的目的主要是让JobManager实现高可用(leader选举)
Flink使用Zookeeper在所有运行的JobManager实例中进行分布式调度的协调。Zookeeper在Flink中是一个独立的服务,它能够通过leader选举和轻量级的一致性状态存储来提供高度可靠的分布式协调器
Master File(masters)
为了启动一个HA-cluster,需要在conf/masters中配置masters。
masters文件:masters文件包含所有的hosts,每个host启动都JobManager,并且指定绑定的Web UI端口号:
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
1
2
3
配置文件flink-conf.yaml
为了启动一个HA-Cluster,需要在conf/flink-conf.yaml添加如下配置参数:
Recovery mode(必须的):recovery.mode: zookeeper
zookeeper quorum(必须的):recovery.zookeeper.quorum: address1:2181,...
Zookeeper root(推荐的):Flink在Zookeeper中的root节点,下面放置所有需要协调的数据recovery.zookeeper.path.root: /flink
1
2
3
如果你运行多个Flink HA集群,那么你必须手工配置每个Flink集群使用独立的root节点
State backend and storage directory(必须的):JobManager元数据在statebackend保持并且仅仅在Zookeeper中存储,目前在HA模式中,仅支持filesystem。
state.backend: filesystem
state.backend.fs.checkpointdir:hdfs://namenode-host:port/flink-checkpoints
recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.storageDir指定的路径中存储了所有的元数据,用来恢复失败的JobManager
1
2
3
4
5
2.2. 两个JobManager的Standalone模式下的集群
conf/flink-conf.yaml文件
配置恢复模式和Zookeeper quorum
[root@h001 conf]# vim flink-conf.yaml
recovery.mode: zookeeper
recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
recovery.zookeeper.path.root: /flink
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
1
2
3
4
5
6
7
在Hadoop文件系统创建文件夹
[root@h001 conf]# hadoop fs -mkdir -p /flink/checkpoints
[root@h001 conf]# hadoop fs -mkdir -p /flink/recovery
[root@h001 conf]# hadoop fs -chown -R hdfs:supergroup /flink/
1
2
3
配置conf/masters文件
[root@h001 conf]# vim masters
h001:8081
h002:8081
1
2
3
配置conf/zoo.cfg文件,添加Zookeeper集群节点
[root@h001 conf]# vim zoo.cfg
server.1=h002:2888:3888
server.2=h003:2888:3888
server.3=h004:2888:3888
1
2
3
4
启动Zookeeper集群
[root@h001 flink-1.2.0]# bin/start-zookeeper-quorum.sh
1
启动Flink集群
[root@h001 flink-1.2.0]# bin/start-cluster.sh
1
经过测试kill掉其中一个jobmanager可切换主备。
2.2. YARN集群模式高可用
当运行一个高可用YARN集群时,我们不需要运行多个JobManager(ApplicationMaster)实例,只需要运行一个实例,如果失败了通过YARN来进行重启
Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:
用于启动JobManager(AM)的容器
用于启动TaskManager的容器
1
2
通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息
其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量
在配置HA前,先通过-q看一下我的yarn集群的资源情况:
从图中可以看出,我配置的每个NodeManager的内存是2048MB(yarn-site.xml),每个NodeManager的vcores数量是2。所以,当前yarn集群中可用内存总量为6144,总cores是6
2.2.1. FLINK ON YARN HA 配置
配置准备
在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过HADOOPHOME/sbin/start−all.sh启动hdfs和yarn配置(yarn−site.xml)此配置需要在HADOOPHOME/sbin/start−all.sh启动hdfs和yarn配置(yarn−site.xml)此配置需要在HADOOP_CONF_DIR 的yarn-site.xml添加
[root@h001 ~]# cd /usr/bigdata/hadoop/etc/hadoop/
[root@h001 hadoop]# vim yarn-site.xml
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
1
2
3
4
5
6
此配置代表application master在重启时,尝试的最大次数
[root@h001 hadoop]# clush -v -w h[002-005] --copy yarn-site.xml --dest /usr/bigdata/hadoop/etc/hadoop/
1
配置(flink-conf.yaml),此参数需要在$FLINK_HOME/conf 的flink-conf.yaml中配置
[root@h001 conf]# vim flink-conf.yaml
yarn.application-attempts: 10
1
2
此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(ApplicationMaster)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps
[root@h001 conf]# clush -v -w h[002-005] --copy flink-conf.yaml --dest /usr/bigdata/flink-1.5.1/conf/
1
配置zookeeper信息
虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息。其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。
在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。
完整的Flink配置信息如下:
root@h001 conf]# vim flink-conf.yaml
env.java.home: /usr/java/jdk1.8.0_111
recovery.mode: zookeeper
recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
recovery.zookeeper.path.root: /flink
recovery.zookeeper.path.namespace: /cluster_yarn
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /usr/bigdata/hadoop/etc/Hadoop
1
2
3
4
5
6
7
8
9
10
11
以上的yarn HA配置可在Standalone集群模式下进一步添加几个参数即可完成。
2.2.2.启动FLINK YARN SESSION
在YARN上启动一个Flink主要有两种方式:
(1)、启动一个YARN session(Start a long-running Flink cluster on YARN)
(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)
1
2
Flink YARN Session
启动Flink Yarn Session有2种模式:
(1)、分离模式
(2)、客户端模式
1
2
通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill 命令来停止
这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和TaskManagers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)。我们可以通过./bin/yarn-session.sh脚本启动YARN Session。
在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个。
采用客户端模式来启动Flink Yarn Session:
[root@h001 flink-1.5.1]# bin/yarn-session.sh -n 4 -jm 4096 -tm 8192 -s 2 -nm FlinkOnYarnSession -d -st
或者
./bin/yarn-session.sh -n 4 -tm 8192 -s 8
1
2
3
参数说明:
-n:--container 指YARN container分配的个数(即TaskManagers的个数)
-jm:--jobManagerMemory 指JobManager Containe的内存大小,单位为MB
-tm:--taskManagerMemory 指每个TaskManagerContainer的内存大小,单位为MB
-s :指每个TaskManager的slot个数
1
2
3
4
可以通过yarn的webUI查看一下当前启动的Application
通过ApplicationMaster tracking一下Flink的WebUI
http://192.168.xxx.xxx:8088/proxy/application_1500340359200_0002/#/overview
提交作业
使用bin/flink脚本提交作业,同样我们来看看这个脚本支持哪些参数:
[root@h001 flink-1.5.1]# bin/flink run
bin/flink run ./examples/batch/WordCount.jar
--input hdfs:///user/test/LICENSE
--output hdfs:///user/test/result.txt
1
2
3
4
后面相应的跟上参数提交作业即可。
Run a single Flink job on YARN
上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业。这里我们还是使用./bin/flink,但是不需要事先启动YARN session:
[root@h001 flink-1.5.1]# bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
--input hdfs:///user/test/LICENSE
--output hdfs:///user/test/result.txt
1
2
3
上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须指定。
---------------------
作者:独行夏
来源:CSDN
原文:https://blog.csdn.net/u013368491/article/details/81610672
版权声明:本文为博主原创文章,转载请附上博文链接!