zoukankan      html  css  js  c++  java
  • Flink : Docker Playground

    Flink 提供一个基于 Docker 的 Playground 给用户了解学习 Flink

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/docker-playgrounds/flink-operations-playground.html

    https://github.com/apache/flink-playgrounds

    搭建环境

    下载

    git clone --branch release-1.10 https://github.com/apache/flink-playgrounds.git
    

    查看 flink-playgrounds/operations-playground/docker-compose.yaml 的内容

    version: "2.1"
    services:
      client:
        build: ../docker/ops-playground-image
        image: apache/flink-ops-playground:1-FLINK-1.10-scala_2.11
        command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
        depends_on:
          - jobmanager
          - kafka
        volumes:
          - ./conf:/opt/flink/conf
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
      clickevent-generator:
        image: apache/flink-ops-playground:1-FLINK-1.10-scala_2.11
        command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
        depends_on:
          - kafka
      jobmanager:
        image: flink:1.10.0-scala_2.11
        command: "jobmanager.sh start-foreground"
        ports:
          - 8081:8081
        volumes:
          - ./conf:/opt/flink/conf
          - flink-checkpoints-directory:/tmp/flink-checkpoints-directory
          - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
      taskmanager:
        image: flink:1.10.0-scala_2.11
        depends_on:
          - jobmanager
        command: "taskmanager.sh start-foreground"
        volumes:
          - ./conf:/opt/flink/conf
          - flink-checkpoints-directory:/tmp/flink-checkpoints-directory
          - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
      zookeeper:
        image: wurstmeister/zookeeper:3.4.6
      kafka:
        image: wurstmeister/kafka:2.12-2.2.1
        environment:
          KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
          KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
          KAFKA_CREATE_TOPICS: "input:2:1, output:2:1"
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        ports:
          - 9094:9094
    volumes:
      flink-checkpoints-directory:
    

    可以看到这个文件定义了 kafka、zookeeper、jobmanager、taskmanager、clickevent-generator、client 等多个容器,其中 client 是通过 dockerfile 生成的,而 clickevent-generator 是基于 client 生成的 image,其他的是直接下载 image

    build client image 的时候用到 maven,最好换成国内的源
    到 flink-playgrounds/docker/ops-playground-image 添加 settings.xml 文件,内容如下

    <?xml version="1.0" encoding="UTF-8"?>
    
    <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
        <mirrors>
           <mirror>
              <id>aliyunmaven</id>
              <mirrorOf>central</mirrorOf>
              <name>aliyun maven</name>
              <url>https://maven.aliyun.com/repository/public</url>
           </mirror>
        </mirrors>
    </settings>
    

    再修改 flink-playgrounds/docker/ops-playground-image/Dockerfile 添加下面两行

    COPY ./settings.xml /usr/share/maven/conf/settings.xml
    COPY ./settings.xml /root/.m2/settings.xml
    

    编译和启动服务

    cd flink-playgrounds/operations-playground
    docker-compose build
    docker-compose up -d
    

    查看新的本地镜像

    docker images
    

    查看容器

    docker-compose ps
    

    可以看到 kafka、zookeeper、jobmanager、taskmanager 组成的集群在运行,clickevent-generator 在产生数据,而 client 已经成功提交了 job 并退出了

                        Name                                  Command               State                   Ports
    -----------------------------------------------------------------------------------------------------------------------------
    operations-playground_clickevent-generator_1   /docker-entrypoint.sh java ...   Up       6123/tcp, 8081/tcp
    operations-playground_client_1                 /docker-entrypoint.sh flin ...   Exit 0
    operations-playground_jobmanager_1             /docker-entrypoint.sh jobm ...   Up       6123/tcp, 0.0.0.0:8081->8081/tcp
    operations-playground_kafka_1                  start-kafka.sh                   Up       0.0.0.0:9094->9094/tcp
    operations-playground_taskmanager_1            /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp
    operations-playground_zookeeper_1              /bin/sh -c /usr/sbin/sshd  ...   Up       2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
    

    可以通过下面的命令退出所有容器

    docker-compose down
    

    也可以通过 docker 命令单独操作

    UI 界面

    登陆 http://localhost:8081 可以看 UI 界面

    可以看到有一个 Click Event Count 的 Job 正在运行

    查看集群状态

    通过下面的命令可以看到 log

    docker-compose logs -f jobmanager
    docker-compose logs -f taskmanager
    

    登陆其中一个容器,看一下 flink 命令

    root@68b39518cf16:/#
    root@68b39518cf16:/#
    root@68b39518cf16:/# flink -v
    Version: 1.10.0, Commit ID: aa4eb8f
    root@68b39518cf16:/#
    root@68b39518cf16:/#
    root@68b39518cf16:/#
    root@68b39518cf16:/# flink help
    "help" is not a valid action.
    
    Valid actions are "run", "list", "info", "savepoint", "stop", or "cancel".
    
    Specify the version option (-v or --version) to print Flink version.
    
    Specify the help option (-h or --help) to get help on the command.
    root@68b39518cf16:/#
    root@68b39518cf16:/#
    root@68b39518cf16:/#
    root@68b39518cf16:/# flink list
    Waiting for response...
    ------------------ Running/Restarting Jobs -------------------
    10.05.2020 11:19:18 : c763a6a422d9392da4e9a9678fb66287 : Click Event Count (RUNNING)
    --------------------------------------------------------------
    No scheduled jobs.
    root@68b39518cf16:/#
    root@68b39518cf16:/# flink --help
    root@68b39518cf16:/#
    

    通过 REST API 也可以查看 Job

    curl localhost:8081/jobs
    

    clickevent-generator 容器有一个一直运行的 java 程序
    不断的往 kafka 的 input topic 发数据,可以查看发的信息

    docker-compose exec kafka kafka-console-consumer.sh 
      --bootstrap-server localhost:9092 --topic input
    

    client 容器通过 flink run 命令提交了一个一直在运行的 Click Event Count 的 flink 程序
    不断从 input 读数据、统计、发往 output topic

    docker-compose exec kafka kafka-console-consumer.sh 
      --bootstrap-server localhost:9092 --topic output
    

    这几个容器都没有安装 ps 命令,自己装一个

    apt-get update
    apt-get install procps
    

    如果命令太长 ps 可能不会全显示出来,可以先重定向到文件再看

    ps -ef > temp.txt
    

    登上 jobmanager 可以看到启动的 Java 程序是

    org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    

    登上 taskmanager 可以看到启动的 Java 程序是

    org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    

    generator 上同样有个 java 程序在生成数据发到 kafka

    错误恢复机制

    先连上 kafka 的 output topic

    docker-compose exec kafka kafka-console-consumer.sh 
      --bootstrap-server localhost:9092 --topic output
    

    然后 kill 掉 taskmanager

    docker-compose kill taskmanager
    

    可以看到 UI 上的 taskmanager 不见了(但还显示有 job 在 running,奇怪)
    通过 flink list 命令可以看到 job 状态变成 RESTARTING
    同时可以看到 output topic 没有数据输出了

    实际上 JobManager 在发现 TaskManager 不可用后,就 cancel 了受影响的正在运行的 job,并立刻重新提交 Job,这个 Job 会一直等待直到 TaskManager 可用

    重启 TaskManager

    docker-compose up -d taskmanager
    

    可以看到 TaskManager 和 Job 的状态都恢复了,output topic 也继续有输出了

    实际上 Job 会从上次的 checkpoint 处恢复状态,并迅速处理积累的所有数据

    升级和伸缩

    flink stop 命令可以停止 Job 并保存 save point

    lin@Ubuntu-VM-1:$ docker-compose run --no-deps client flink stop c2cbdb0bca115d2375706b2c95d8a323
    Suspending job "c2cbdb0bca115d2375706b2c95d8a323" with a savepoint.
    Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f
    

    注意这里用 docker-compose run 运行,checkpoint 会保存在宿主机本地

    如果要取消 Job 应该用

    flink cancel <job-id>
    

    现在 output topic 没有输出了

    假设我们对程序或配置做了改变,然后重启程序

    docker-compose run --no-deps client flink run 
        -s /tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f/ 
        -d /opt/ClickCountJob.jar 
        --bootstrap.servers kafka:9092 
        --checkpointing 
        --event-time
    
    Job has been submitted with JobID 29094005784e0e164f79ae7ce28fc250
    

    可以看到 output topic 又重新有输出了,Job 会从 check point 开始处理积累的数据

    现在再把 Job 停掉

    lin@Ubuntu-VM-1:$ docker-compose run --no-deps client flink stop 29094005784e0e164f79ae7ce28fc250
    Suspending job "29094005784e0e164f79ae7ce28fc250" with a savepoint.
    Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-290940-e4915465904b
    

    以并发度为 3 重启

    docker-compose run --no-deps client flink run 
        -p 3 
        -s /tmp/flink-savepoints-directory/savepoint-290940-e4915465904b 
        -d /opt/ClickCountJob.jar 
        --bootstrap.servers kafka:9092 
        --checkpointing 
        --event-time
    
    Job has been submitted with JobID f6011eec5bf4b66f1770e9f03441a8a3
    

    但 output topic 并没有数据输出

    查看 Job 状态显示是 Running 的

    root@13e5e296d72f:/opt/flink# flink list
    Waiting for response...
    ------------------ Running/Restarting Jobs -------------------
    11.05.2020 17:04:34 : f6011eec5bf4b66f1770e9f03441a8a3 : Click Event Count (RUNNING)
    --------------------------------------------------------------
    No scheduled jobs.
    

    这是因为现在只有一个 task manager,而每个 task manager 只配置了 2 个 slot

    查看 flink-playgrounds/operations-playground/conf/flink-conf.yaml

    jobmanager.rpc.address: jobmanager
    blob.server.port: 6124
    query.server.port: 6125
    
    taskmanager.memory.process.size: 1568m
    taskmanager.numberOfTaskSlots: 2
    
    state.backend: filesystem
    state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
    state.savepoints.dir: file:///tmp/flink-savepoints-directory
    
    heartbeat.interval: 1000
    heartbeat.timeout: 5000
    

    所以要实现并发度 3 至少要两个 task manager,我们再启动一个

    lin@Ubuntu-VM-1:$ docker-compose scale taskmanager=2
    WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
    Starting operations-playground_taskmanager_1 ... done
    Creating operations-playground_taskmanager_2 ... done
    

    通过 docker-compose ps 查看,现在有两个 task manager

    operations-playground_taskmanager_1      /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp
    operations-playground_taskmanager_2      /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp
    

    很快就可以看到 output topic 又有数据输出了

    变量:--checkpointing 和 --event-time

    • --checkpointing enables checkpoint, which is Flink’s fault-tolerance mechanism. If you run without it and go through failure and recovery, you should will see that data is actually lost.

    • --event-time enables event time semantics for your Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of the timestamp of the ClickEvent. Consequently, the number of events per window will not be exactly one thousand anymore.

    Metrics

    下面的命令在宿主机操作

    curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3/metrics"
    
    [{
    	"id": "numberOfFailedCheckpoints"
    }, {
    	"id": "lastCheckpointSize"
    }, {
    	"id": "lastCheckpointExternalPath"
    }, {
    	"id": "totalNumberOfCheckpoints"
    }, {
    	"id": "lastCheckpointRestoreTimestamp"
    }, {
    	"id": "lastCheckpointAlignmentBuffered"
    }, {
    	"id": "restartingTime"
    }, {
    	"id": "uptime"
    }, {
    	"id": "numberOfInProgressCheckpoints"
    }, {
    	"id": "downtime"
    }, {
    	"id": "numberOfCompletedCheckpoints"
    }, {
    	"id": "numRestarts"
    }, {
    	"id": "fullRestarts"
    }, {
    	"id": "lastCheckpointDuration"
    }]
    
    curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3/metrics?get=lastCheckpointSize"
    
    [{"id":"lastCheckpointSize","value":"12466"}]
    
    curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3"
    
    {
    	"jid": "f6011eec5bf4b66f1770e9f03441a8a3",
    	"name": "Click Event Count",
    	"isStoppable": false,
    	"state": "RUNNING",
    	"start-time": 1589216674777,
    	"end-time": -1,
    	"duration": 407088,
    	"now": 1589217081865,
    	"timestamps": {
    		"RECONCILING": 0,
    		"RUNNING": 1589216674820,
    		"CANCELED": 0,
    		"RESTARTING": 0,
    		"FAILING": 0,
    		"FAILED": 0,
    		"FINISHED": 0,
    		"CANCELLING": 0,
    		"SUSPENDED": 0,
    		"CREATED": 1589216674777
    	},
    	"vertices": [{
    		"id": "bc764cd8ddf7a0cff126f51c16239658",
    		"name": "Source: ClickEvent Source",
    		"parallelism": 3,
    		"status": "RUNNING",
    		"start-time": 1589216946118,
    		"end-time": -1,
    		"duration": 135747,
    		"tasks": {
    			"FAILED": 0,
    			"CANCELING": 0,
    			"SCHEDULED": 0,
    			"RUNNING": 3,
    			"RECONCILING": 0,
    			"CREATED": 0,
    			"FINISHED": 0,
    			"CANCELED": 0,
    			"DEPLOYING": 0
    		},
    		"metrics": {
    			"read-bytes": 0,
    			"read-bytes-complete": true,
    			"write-bytes": 5979476,
    			"write-bytes-complete": true,
    			"read-records": 0,
    			"read-records-complete": true,
    			"write-records": 197511,
    			"write-records-complete": true
    		}
    	}, {
    		"id": "0a448493b4782967b150582570326227",
    		"name": "Timestamps/Watermarks",
    		"parallelism": 3,
    		"status": "RUNNING",
    		"start-time": 1589216946147,
    		"end-time": -1,
    		"duration": 135718,
    		"tasks": {
    			"FAILED": 0,
    			"CANCELING": 0,
    			"SCHEDULED": 0,
    			"RUNNING": 3,
    			"RECONCILING": 0,
    			"CREATED": 0,
    			"FINISHED": 0,
    			"CANCELED": 0,
    			"DEPLOYING": 0
    		},
    		"metrics": {
    			"read-bytes": 5998885,
    			"read-bytes-complete": true,
    			"write-bytes": 5998076,
    			"write-bytes-complete": true,
    			"read-records": 197499,
    			"read-records-complete": true,
    			"write-records": 197499,
    			"write-records-complete": true
    		}
    	}, {
    		"id": "ea632d67b7d595e5b851708ae9ad79d6",
    		"name": "ClickEvent Counter",
    		"parallelism": 3,
    		"status": "RUNNING",
    		"start-time": 1589216946172,
    		"end-time": -1,
    		"duration": 135693,
    		"tasks": {
    			"FAILED": 0,
    			"CANCELING": 0,
    			"SCHEDULED": 0,
    			"RUNNING": 3,
    			"RECONCILING": 0,
    			"CREATED": 0,
    			"FINISHED": 0,
    			"CANCELED": 0,
    			"DEPLOYING": 0
    		},
    		"metrics": {
    			"read-bytes": 6032525,
    			"read-bytes-complete": true,
    			"write-bytes": 27913,
    			"write-bytes-complete": true,
    			"read-records": 197458,
    			"read-records-complete": true,
    			"write-records": 198,
    			"write-records-complete": true
    		}
    	}, {
    		"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
    		"name": "Sink: ClickEventStatistics Sink",
    		"parallelism": 3,
    		"status": "RUNNING",
    		"start-time": 1589216946208,
    		"end-time": -1,
    		"duration": 135657,
    		"tasks": {
    			"FAILED": 0,
    			"CANCELING": 0,
    			"SCHEDULED": 0,
    			"RUNNING": 3,
    			"RECONCILING": 0,
    			"CREATED": 0,
    			"FINISHED": 0,
    			"CANCELED": 0,
    			"DEPLOYING": 0
    		},
    		"metrics": {
    			"read-bytes": 36133,
    			"read-bytes-complete": true,
    			"write-bytes": 0,
    			"write-bytes-complete": true,
    			"read-records": 198,
    			"read-records-complete": true,
    			"write-records": 0,
    			"write-records-complete": true
    		}
    	}],
    	"status-counts": {
    		"FAILED": 0,
    		"CANCELING": 0,
    		"SCHEDULED": 0,
    		"RUNNING": 4,
    		"RECONCILING": 0,
    		"CREATED": 0,
    		"FINISHED": 0,
    		"CANCELED": 0,
    		"DEPLOYING": 0
    	},
    	"plan": {
    		"jid": "f6011eec5bf4b66f1770e9f03441a8a3",
    		"name": "Click Event Count",
    		"nodes": [{
    			"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
    			"parallelism": 3,
    			"operator": "",
    			"operator_strategy": "",
    			"description": "Sink: ClickEventStatistics Sink",
    			"inputs": [{
    				"num": 0,
    				"id": "ea632d67b7d595e5b851708ae9ad79d6",
    				"ship_strategy": "FORWARD",
    				"exchange": "pipelined_bounded"
    			}],
    			"optimizer_properties": {}
    		}, {
    			"id": "ea632d67b7d595e5b851708ae9ad79d6",
    			"parallelism": 3,
    			"operator": "",
    			"operator_strategy": "",
    			"description": "ClickEvent Counter",
    			"inputs": [{
    				"num": 0,
    				"id": "0a448493b4782967b150582570326227",
    				"ship_strategy": "HASH",
    				"exchange": "pipelined_bounded"
    			}],
    			"optimizer_properties": {}
    		}, {
    			"id": "0a448493b4782967b150582570326227",
    			"parallelism": 3,
    			"operator": "",
    			"operator_strategy": "",
    			"description": "Timestamps/Watermarks",
    			"inputs": [{
    				"num": 0,
    				"id": "bc764cd8ddf7a0cff126f51c16239658",
    				"ship_strategy": "FORWARD",
    				"exchange": "pipelined_bounded"
    			}],
    			"optimizer_properties": {}
    		}, {
    			"id": "bc764cd8ddf7a0cff126f51c16239658",
    			"parallelism": 3,
    			"operator": "",
    			"operator_strategy": "",
    			"description": "Source: ClickEvent Source",
    			"optimizer_properties": {}
    		}]
    	}
    }
    


  • 相关阅读:
    第三十章 混合线程同步构造
    第二十九章 基元线程同步构造
    第二十八章 I/O限制的异步操作
    第二十七章 计算限制的异步操作
    第二十六章 线程基础
    第二十五章 与WinRT组件互操作
    css实现排序箭头
    js中的toFixed神坑
    react使用中碰到的小问题
    看到一个js中sleep的例子,挺好玩的
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/12873594.html
Copyright © 2011-2022 走看看