zoukankan      html  css  js  c++  java
  • flink集群(docker版)


    1 环境说明

    注意:以下所有操作都在root用户下完成 sudo su - root

    ip

    操作系统版本

    用途

    192.168.30.18

    Ubuntu 18.04.4 LTS

    jobmanager容器、nfs服务(存储flinkcheckpointsavepoint)

    192.168.30.17

    Ubuntu 18.04.4 LTS

    taskmanager02容器

    192.168.30.16

    Ubuntu 18.04.4 LTS

    taskmanager01容器

    2 部署nfs

    192.168.30.18节点上操作

    2.1安装nfs软件包

    # apt-get install rpcbind -y
    # apt-get install nfs-kernel-server -y

    2.2 配置参数,映射checkpoints存储目录

    # vim /etc/exports
    /home/liuchan/config/flink/ 192.168.30.0/24(rw,sync,no_root_squash,no_all_squash)

    2.3 重启nfs服务并加入开机自启动

    # systemctl restart rpcbind
    # systemctl enable rpcbind

    # systemctl restart nfs-kernel-server
    # systemctl enable nfs-kernel-server

    2.4 查看nfs状态

    # rpcinfo -p localhost
       program vers proto   port  service
       100000    4   tcp    111  portmapper
        100000    3   tcp    111  portmapper
        100000    2   tcp    111  portmapper
        100000    4   udp    111  portmapper
        100000    3   udp    111  portmapper
        100000    2   udp    111  portmapper
        100005    1   udp  34715  mountd
        100005    1   tcp  46897  mountd
        100005    2   udp  48806  mountd
        100005    2   tcp  39469  mountd
        100005    3   udp  41227  mountd
        100005    3   tcp  34733  mountd
        100003    3   tcp   2049  nfs
        100003    4   tcp   2049  nfs
        100227    3   tcp   2049
        100003    3   udp   2049  nfs
        100227    3   udp   2049
        100021    1   udp  35642  nlockmgr
        100021    3   udp  35642  nlockmgr
        100021    4   udp  35642  nlockmgr
        100021    1   tcp  42347  nlockmgr
        100021    3   tcp  42347  nlockmgr
        100021    4   tcp  42347  nlockmgr

    2.5 查看nfs映射信息

    # exportfs -v
    /home/liuchan/config/flink
    192.168.30.0/24(rw,wdelay,no_root_squash,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash)

    3 客户端挂载nfs

    192.168.30.1617节点上操作

    3.1 安装nfs挂载工具

    # apt install nfs-common

    # showmount -e 192.168.30.18
    Export list for 192.168.30.18:
    /home/liuchan/config/flink 192.168.30.0/24

    3.2 创建挂载目录

    # mkdir -p /home/liuchan/config/flink

    3.3 挂载nfs并写入到/etc/fstab

    # echo '192.168.30.18:/home/liuchan/config/flink /home/liuchan/config/flink nfs defaults 0 0' >> /etc/fstab
    # mount
    -a

    4 集群通用配置说明

    4.1 docker-compose说明

    1 compose目录结构

    # tree -LFa 1 flink/
    flink
    ├── conf/                 # flink容器配置文件目录
    ├── docker-compose.yml    # docker-compose配置文件
    ├── Dockerfile            # 构建flink镜像用到的Dockerfile文件
    ├── .env                  # docker-compose环境变量文件
    ├── flink-1.10.0.tar.gz   # 构建flink镜像用到的flink-1.10.0压缩包
    ├── jdk1.8.0_251.tar.gz   # 构建flink镜像用到的jdk1.8.0_251压缩包
    ├── log/                  # flink容器日志目录
    ├── run.sh                # 构建flink镜像用到的flink容器启动脚本
    └── tmp/                  # flink容器临时文件目录

    3 directories, 6 files

     

    2 .env文件

    # cat flink/.env
    IMAGE=flink:1.10.0-v01
    FLINK_HOME=/usr/local/flink-1.10.0

     

    3 Dockerfile文件

    # cat flink/Dockerfile
    FROM centos:7
    LABEL maintainer lc

    ARG FLINK_VERSION=flink-1.10.0
    ARG JDK_VERSION=jdk1.8.0_251

    RUN yum install wget curl unzip iproute net-tools -y &&
        yum clean all &&
        rm -rf /var/cache/yum/* &&
        ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

    ADD ${FLINK_VERSION}.tar.gz /usr/local
    ADD ${JDK_VERSION}.tar.gz   /usr/local

    ENV JAVA_HOME /usr/local/${JDK_VERSION}
    ENV CLASSPATH $JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
    ENV PATH $JAVA_HOME/bin:$PATH
    ENV FLINK_HOME=/usr/local/${FLINK_VERSION}

    COPY run.sh /root

    WORKDIR ${FLINK_HOME}
    EXPOSE 8081 6123
    ENTRYPOINT ["/root/run.sh"]

     

    4 run.sh文件

    #!/bin/bash

    FLINK_JOB_MANAGER_SH=$FLINK_HOME/bin/jobmanager.sh
    FLINK_TASK_MANAGER_SH=$FLINK_HOME/bin/taskmanager.sh

    case "$1" in
    "jobmanager")
    $FLINK_JOB_MANAGER_SH start-foreground
    ;;

    "taskmanager")
    $FLINK_TASK_MANAGER_SH start-foreground
    ;;

    *)
    echo "COMMAND ERROR"
    ;;
    esac

    4.2 flink集群配置文件

    1 修改flink-conf.yaml

    # vim flink/conf/flink-conf.yaml
    # jobmanager节点地址
    jobmanager.rpc.address: 192.168.30.18
    # checkpoints保留的个数,全局配置
    state.checkpoints.num-retained: 16
    # Blog Server
    taskmanager.rpc.port: 40010-40020
    # Task Manager
    metrics.internal.query-service.port: 40030-40040
    # Metrics
    blob.server.port: 40050-40060

     

    2 修改masters

    # vim flink/conf/masters
    192.168.30.18:8081

     

    3 修改slaves

    # cat flink/conf/slaves 
    192.168.30.16
    192.168.30.17

    5 启动容器

    5.1 jobmanager

    192.168.30.18节点

    1 docker-compose.yml配置文件

    # cat flink/docker-compose.yml
    version: '3'
    networks:
      rulr-network:
        external: true
    services:

      flink-1.10.0-v01:
        image: flink:1.10.0-v01
        build:
          context: ./
          dockerfile: Dockerfile

      jobmanager:
        container_name: jobmanager
        hostname: jobmanager
        image: ${IMAGE}
        volumes:
          - ./conf:${FLINK_HOME}/conf
          - ./tmp/jobmanager-tmp:/tmp
          - ./log/jobmanager-log:${FLINK_HOME}/log
          - /home/liuchan/config/flink:/home/liuchan/config/flink
          - /home/liuchan/servers/compute-streaming:/home/liuchan/servers/compute-streaming
          - /home/liuchan/config/servers:/home/liuchan/config/servers
        ports:
          - "8081:8081"
          - "6123:6123"
          - "40010-40020:40010-40020"
          - "40030-40040:40030-40040"
          - "40050-40060:40050-40060"
        command: jobmanager
        restart: always
        networks:
          - rulr-network
        deploy:
          resources:
            limits:
              cpus: '2'
              memory: 2G

     

    2 构建容器

    # cd flink-jobmanager/
    # docker-compose build
    # docker-compose up -d

    5.2 taskmanager02

    192.168.30.17节点

    1 docker-compose.yml配置文件

    # cat flink/docker-compose.yml
    version: '3'
    networks:
      rulr-network:
        external: true
    services:

      flink-1.10.0-v01:
        image: flink:1.10.0-v01
        build:
          context: ./
          dockerfile: Dockerfile

      taskmanager02:
        container_name: taskmanager02
        hostname: taskmanager02
        image: ${IMAGE}
        ports:
          - "40010-40020:40010-40020"
          - "40030-40040:40030-40040"
          - "40050-40060:40050-40060"
        volumes:
          - ./conf:${FLINK_HOME}/conf
          - ./tmp/taskmanager02-tmp:/tmp
          - ./log/taskmanager02-log:${FLINK_HOME}/log
          - /home/liuchan/config/flink:/home/liuchan/config/flink
        command: taskmanager
        restart: always
        networks:
          - rulr-network
        deploy:
          resources:
            limits:
              cpus: '2'
              memory: 2G

     

    2 构建容器

    # cd flink-taskmanager02/
    # docker-compose build
    # docker-compose up -d

    5.3 taskmanager01

    192.168.30.16节点

    1 docker-compose.yml配置文件

    # cat flink/docker-compose.yml
    version: '3'
    networks:
      rulr-network:
        external: true
    services:

      flink-1.10.0-v01:
        image: flink:1.10.0-v01
        build:
          context: ./
          dockerfile: Dockerfile

      taskmanager01:
        container_name: taskmanager01
        hostname: taskmanager01
        image: ${IMAGE}
        ports:
          - "40010-40020:40010-40020"
          - "40030-40040:40030-40040"
          - "40050-40060:40050-40060"
        volumes:
          - ./conf:${FLINK_HOME}/conf
          - ./tmp/taskmanager01-tmp:/tmp
          - ./log/taskmanager01-log:${FLINK_HOME}/log
          - /home/liuchan/config/flink:/home/liuchan/config/flink
        command: taskmanager
        restart: always
        networks:
          - rulr-network
        deploy:
          resources:
            limits:
              cpus: '2'
              memory: 2G

     

    2 构建容器

    # cd flink-taskmanager01/
    # docker-compose build
    # docker-compose up -d

    6 运行flink任务

    6.1 运行jar

    # docker exec -it flink-jobmanager bash
    [root@jobmanager flink-1.10.0]# bin/flink run -d
    /home/liuchan/servers/compute-streaming/compute-streaming.jar --appMode=2

    6.2 flinkUI页面查看

    URL: http://192.168.30.18:8081/

    # 总体概览
    wpsACC3.tmp

    # Task Managers信息
    wpsACC4.tmp

    # 查看JOB
    详细信息
    wpsACD5.tmp

    6.3 测试

    1 停掉正在运行jobTaskManager容器

    从上面的截图可以判断出ip为172.18.0.10容器正在运行job,该容器在192.168.30.17服务器上。停掉taskmanager02容器。

    # cd flink-taskmanager02/
    # docker-compose down

     

    2 flinkUI中查看

    从下图可以看到,job经历超时报错后,运行的TaskManager节点由taskmanager02容器转移到了taskmanager01容器

    wpsACD6.tmp 

    wpsACD7.tmp 

     

    wpsACD8.tmp 

     

    3 恢复taskmanager02容器

    # cd flink-taskmanager02/
    # docker-compose up -d
    # flink的UI可以看到taskmanager02
    容器又上线了

    7 CheckpointSavepoint

    7.1 CheckpointSavepoint说明

    1 Checkpoint

    (1) Flink Checkpoint是一种容错恢复机制。这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复,不需要用户指定。Checkpoint是增量做的,每次的时间较短,数据量较小。

    (2) Checkpoint对于用户层面,是透明的,用户会感觉程序一直在运行。

    (3) Flink Checkpoint是Flink自身的系统行为,用户无法对其进行交互,用户可以在程序启动之前,设置好实时程序Checkpoint相关参数,当程序启动之后,剩下的就全交给Flink自行管理。

    (4) Checkpoint默认关闭,Checkpoint的启用、存储方式、存储位置,在应用代码中配置,其中存储方式、存储位置,也可以在flink-conf.yaml文件中通过state.backendstate.checkpoints.dir参数配置全局参数,但应用代码中配置优先级更高。

    (5) Checkpoint 默认保留数为1,通过修改flink-conf.yaml文件中state.checkpoints.num-retained参数设置Checkpoint保留数量。

    (6) Checkpoint默认job程序取消后删除,在应用代码中通过设置以下代码实现保留:
    "getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);"
    (5) Checkpoint的组成:Checkpoint根目录/
    CheckpointID

     

    2 Savepoint

    (1) Flink Savepoint你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级,或者修改bug、并行度等情况,还能从保存的状态位继续启动恢复,需要用户指定。是全量做的,每次的时间较长,数据量较大。

    (2) Flink Savepoint一般存储在文件系统上面,它需要用户主动进行触发。

    (3) Savepoint会一直保存,除非用户删除。

    (4) flink-conf.yaml文件state.savepoints.dir参数用以配置savepoints的存储目录,默认none。

    (5) Savepoint的组成:Savepoint根目录/savepoint-jobid前六位-12
    随机数字及字母组合

    7.2 flink常用命令

    1 通用命令

    (1) 命令执行
    bin/flink run [OPTIONS] <jar-file> <arguments>
    options:
    -d: 任务提交后,断开session,会话继续保持,即表示将job放到后台运行。

    示例:后台运行job
    # bin/flink run -d /home/liuchan/servers/compute-streaming/compute-streaming.jar --appMode=2

    (2) 查看任务列表
    bin/flink list [OPTIONS]
    options:
    -a: 全部显示所有程序及其作业ID
    -r: 仅显示正在运行的程序及其作业ID
    -s: 仅显示计划的程序及其作业ID

    示例:查看正在运行的job
    # bin/flink list
    -r

     

    2 操作Savepoint

    (1) 在取消任务时保存Savepoint
    bin/flink cancel [OPTIONS] <Job ID>
    options: 
    -s: 触发保存点并取消作业。目标目录是可选的。如果未指定目录,则
    配置的默认目录使用(state.savepoints.dir)。
    官方文档已经不推荐使用,建议使用stop。

    示例:
    # bin/flink cancel -s /home/liuchan/config/flink/savepoints 1af3e95778850085614d175657948369
    wpsACE8.tmp
    (2) 在停止任务时保存Savepoint
    bin/flink stop [OPTIONS] <Job ID>
    options:
    -p: <savepointPath>保存点的路径,如果没有指定目录时,将使
    用默认值配置(state.savepoints.dir)。

    示例:
    # bin/flink stop -p /home/liuchan/config/flink/savepoints 1504a27aaecba591877a68a233ee9240
    wpsACE9.tmp

    (3) 触发Savepoint
    bin/flink savepoint [OPTIONS] <Job ID> [<target directory>]
    options:

    示例:
    # bin/flink savepoint 3b74f51cc4186aa4bf5bf84e7e716d0f /home/liuchan/config/flink/savepoints
    wpsACEA.tmp

    (4) 从指定的savepoint运行job
    bin/flink run [OPTIONS] <jar-file> <arguments>
    options:
    -s: 从savepoint<savepointPath>路径到用于还原作业的保存点

    示例:
    #
    bin/flink run -d -s /home/liuchan/config/flink/savepoints/savepoint-1af3e9-7a3891c86538 /home/liuchan/servers/compute-streaming/compute-streaming.jar --appMode=2

     

    3 取消和停止job的区别如下

    (1) cancel()调用,立即调用作业算子的cancel()方法,以尽快取消他们。如果算子在接
    cancel()调用后没有停止,flink将开始定期中断算子线程的执行,直到所有算子停止
    为止。

    (2) stop()是更加优雅的方式,仅适用于source实现了stoppableFunction接口的作业。
    当用户请求停止作业时,作业的所有source都将被stop()方法调用,指导所有source
    正常关闭时,作业才会正常结束,这种方式,使作业正常处理完所有作业。

  • 相关阅读:
    sql except 用法,找两个表中非共同拥有的
    ‘堆’出你的洪荒之力
    原来你是个这样的JVM
    变形词
    54题
    最大对称子数组
    java 线程之间通信以及notify与notifyAll区别。
    大型网站架构系列:消息队列
    剑指offer第10题
    & 和 && 区别
  • 原文地址:https://www.cnblogs.com/LiuChang-blog/p/15414541.html
Copyright © 2011-2022 走看看