zoukankan      html  css  js  c++  java
  • 1、Flink的安装部署

    Flink的安装部署

    local本地模式

    1 原理

    以多线程的方式模拟flink的各个角色

    2、步骤

    1.下载安装包
    https://archive.apache.org/dist/flink/
    
    2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录
    
    3.解压
    tar -zxvf flink-1.12.0-bin-scala_2.12.tgz 
    
    4.如果出现权限问题,需要修改权限
    chown -R root:root /export/server/flink-1.12.0
    5.改名或创建软链接
    mv flink-1.12.0 flink
    ln -s /export/server/flink-1.12.0 /export/server/flink
    

    测试

    • 1.准备文件/root/words.txt
      vim /root/words.txt
    hello me you her
    hello me you
    hello me
    hello
    
    • 2.启动Flink本地“集群”
      /software/flink/bin/start-cluster.sh

    • 3.使用jps可以查看到下面两个进程

      • TaskManagerRunner
      • StandaloneSessionClusterEntrypoint
    • 4.访问Flink的Web UI
      http://node1:8081/#/overview

    • 5、执行官方示例
    /software/flink/bin/flink run  /software/flink/examples/batch/WordCount.jar --input /root/test/words.txt --output /root/test/out
    
    • 6、停止集群
    `/software/flink/bin/stop-cluster.sh`
    
    

    Sandalone独立集群模式

    原理

    操作

    1.集群规划:

    - 服务器: node1(Master + Slave): JobManager + TaskManager
    - 服务器: node2(Slave): TaskManager
    - 服务器: node3(Slave): TaskManager
    

    2.修改flink-conf.yaml
    vim /export/server/flink/conf/flink-conf.yaml

    jobmanager.rpc.address: node1
    taskmanager.numberOfTaskSlots: 2
    web.submit.enable: true
    
    #历史服务器
    jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
    historyserver.web.address: node1
    historyserver.web.port: 8082
    historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
    

    2.修改masters
    vim /export/server/flink/conf/masters

    node1:8081
    

    3.修改slaves
    vim /export/server/flink/conf/workers

    node1
    node2
    node3
    

    4.添加HADOOP_CONF_DIR环境变量
    vim /etc/profile

    export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
    

    5.分发

    scp -r /export/server/flink node2:/export/server/flink
    scp -r /export/server/flink node3:/export/server/flink
    scp  /etc/profile node2:/etc/profile
    scp  /etc/profile node3:/etc/profile
    或
    for i in {2..3}; do scp -r flink node$i:$PWD; done
    

    6.source

    source /etc/profile
    

    测试

    1.启动集群,在node1上执行如下命令

    /export/server/flink/bin/start-cluster.sh
    或者单独启动
    /export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
    /export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
    

    2.启动历史服务器

    /export/server/flink/bin/historyserver.sh start
    

    3.访问Flink UI界面或使用jps查看

    http://node1:8081/#/overview
    http://node1:8082/#/overview
    TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少
    

    4.执行官方测试案例

    # 如果后面没有传入计算文件和输出文件位置,则加载默认的文件,并将结果打印在控制台中
    /software/flink/bin/flink run  /software/flink/examples/batch/WordCount.jar
    

    5.查看历史日志

    http://node1:50070/explorer.html#/flink/completed-jobs
    http://node1:8082/#/overview
    

    6.停止Flink集群

    /software/flink/bin/stop-cluster.sh
    

    Sandalone-HA 独立集群高可用模式

    原理

    从之前的架构中我们可以很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。
    在 Zookeeper 的帮助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。

    操作

    1.集群规划

    - 服务器: node1(Master + Slave): JobManager + TaskManager
    - 服务器: node2(Master + Slave): JobManager + TaskManager
    - 服务器: node3(Slave): TaskManager
    

    2.启动ZooKeeper

    zkServer.sh status
    zkServer.sh stop
    zkServer.sh start
    

    3.启动HDFS

    /software/hadoop/sbin/start-dfs.sh
    

    4.停止Flink集群

    /software/flink/bin/stop-cluster.sh
    

    5.修改flink-conf.yaml
    vim /software/flink/conf/flink-conf.yaml
    增加如下内容

    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
    high-availability: zookeeper
    high-availability.storageDir: hdfs://node1:8020/flink/ha/
    high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
    

    配置说明

    #开启HA,使用文件系统作为快照存储
    state.backend: filesystem
    
    #启用检查点,可以将快照保存到HDFS
    state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
    
    #使用zookeeper搭建高可用
    high-availability: zookeeper
    
    # 存储JobManager的元数据到HDFS
    high-availability.storageDir: hdfs://node1:8020/flink/ha/
    
    # 配置ZK集群地址
    high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
    
    

    6.修改masters
    vim /export/server/flink/conf/masters

    node1:8081
    node2:8081
    

    7.同步

    scp -r /software/flink/conf/flink-conf.yaml node2:/software/flink/conf/
    scp -r /software/flink/conf/flink-conf.yaml node3:/software/flink/conf/
    scp -r /software/flink/conf/masters node2:/software/flink/conf/
    scp -r /software/flink/conf/masters node3:/software/flink/conf/
    
    

    8.修改node2上的flink-conf.yaml
    vim /export/server/flink/conf/flink-conf.yaml

    jobmanager.rpc.address: node2
    

    9.重新启动Flink集群,node1上执行

    /software/flink/bin/stop-cluster.sh
    /software/flink/bin/start-cluster.sh
    

    10.使用jps命令查看
    发现没有Flink相关进程被启动

    11.查看日志
    cat /software/flink/log/flink-root-standalonesession-0-node1.log
    发现如下错误

    因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar

    12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
    下载地址
    https://flink.apache.org/downloads.html

    放入lib目录
    cd /software/flink/lib

    分发
    for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done

    13.重新启动Flink集群,node1上执行
    /export/server/flink/bin/start-cluster.sh

    14.使用jps命令查看,发现三台机器已经ok

    测试

    1.访问WebUI

    http://node1:8081/#/job-manager/config
    http://node2:8081/#/job-manager/config
    

    2.执行wc

    /software/flink/bin/flink run  /software/flink/examples/batch/WordCount.jar
    

    3.kill掉其中一个master

    4.重新执行wc,还是可以正常执行

    /software/flink/bin/flink run  /software/flink/examples/batch/WordCount.jar
    

    3.停止集群

    /software/flink/bin/stop-cluster.sh
    

    Flin on Yarn模式

    在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
    -1.Yarn的资源可以按需使用,提高集群的资源利用率
    -2.Yarn的任务有优先级,根据优先级运行作业
    -3.基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
    ○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
    ○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
    ○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

    Flink与Yarn的交互方式


    1.Client上传jar包和配置文件到HDFS集群上
    2.Client向Yarn ResourceManager提交任务并申请资源
    3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
    JobManager和ApplicationMaster运行在同一个container上。
    一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。
    它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。
    这个配置文件也被上传到HDFS上。
    此外,AppMaster容器也提供了Flink的web服务接口。
    YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink
    4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
    5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
    

    Session模式


    Per-Job模式


    特点:每次递交作业都需要申请一次资源
    优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源
    缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
    应用场景:适合作业比较少的场景、大作业的场景
    
    

    操作

    1.关闭yarn的内存检查

    vim /software/hadoop/etc/hadoop/yarn-site.xml
    

    添加:

    <!-- 关闭yarn内存检查 -->
    <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
         <name>yarn.nodemanager.vmem-check-enabled</name>
         <value>false</value>
    </property>
    
    
    • 说明:
    是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
    在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job
    

    2.同步

    scp -r /software/hadoop/etc/hadoop/yarn-site.xml node2:/software/hadoop/etc/hadoop/yarn-site.xml
    scp -r /software/hadoop/etc/hadoop/yarn-site.xml node3:/software/hadoop/etc/hadoop/yarn-site.xml
    
    

    3.重启yarn

    /software/hadoop/sbin/stop-yarn.sh
    /software/hadoop/sbin/start-yarn.sh
    
    

    测试

    Session模式

    yarn-session.sh(开辟资源) + flink run(提交任务)

    1.在yarn上启动一个Flink会话,node1上执行以下命令

    /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
    
    说明:
    申请2个CPU、1600M内存
    # -n 表示申请2个容器,这里指的就是多少个taskmanager
    # -tm 表示每个TaskManager的内存大小
    # -s 表示每个TaskManager的slots数量
    # -d 表示以后台程序方式运行
    
    注意:
    该警告不用管
    WARN  org.apache.hadoop.hdfs.DFSClient  - Caught exception 
    java.lang.InterruptedException
    
    

    2.查看UI界面
    http://node1:8088/cluster

    3.使用flink run提交任务:

     /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar
     运行完之后可以继续运行其他的小任务
     /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar
    
    

    4.通过上方的ApplicationMaster可以进入Flink的管理界面

    5.关闭yarn-session:

    yarn application -kill application_1599402747874_0001
    

    rm -rf /tmp/.yarn-properties-root

    Per-Job分离模式

    1.直接提交job
    /export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar
    # -m  jobmanager的地址
    # -yjm 1024 指定jobmanager的内存信息
    # -ytm 1024 指定taskmanager的内存信息
    
    

    2.查看UI界面
    http://node1:8088/cluster


    3.注意:
    在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,如果报错需要删除:【/tmp/.yarn-properties-root】
    rm -rf /tmp/.yarn-properties-root
    因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager

  • 相关阅读:
    Python面向对象的魔术方法
    Python面向对象基础
    Python异常处理
    Python装饰器实现函数动态类型检查
    Python装饰器
    Python IO
    HTTP协议
    应用层常用协议
    读写分离
    MySQL优化三之MySQL配置
  • 原文地址:https://www.cnblogs.com/maguangyi/p/14466380.html
Copyright © 2011-2022 走看看