zoukankan      html  css  js  c++  java
  • Flink : Standalone Cluster

    Standalone Cluster 就是独立的 Flink 集群,相对应的有基于 YARN 的 Flink 集群

    要求

    1. Java 1.8 和 JAVA_HOME 环境变量
    2. 不同机器之间支持 SSH 免密码登陆
    3. 不同机器都有相同的 Flink 目录结构

    下载

    Flink 包 (https://flink.apache.org/downloads.html

    wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz
    
    tar xzf flink-1.10.0-bin-scala_2.12.tgz
    
    cd flink-1.10.0
    

    如果需要和 Hadoop 集成
    在 Flink 1.8 之前,需要下载带有 Hadoop 的 Flink 包
    从 Flink 1.8 开始,Hadoop 的包被单独分离出来

    cd flink-1.10.0/lib
    
    wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
    

    (官网那个 download 页面也写的不很清楚)

    配置

    conf/flink-conf.yaml

    lin@Ubuntu-VM-1:$ cat conf/flink-conf.yaml | grep -v "^#" | grep -v "^$"
    
    jobmanager.rpc.address: 192.168.1.1                # jobmanager ip
    jobmanager.rpc.port: 6123                          # jobmanager port,用于提交 job 
    jobmanager.heap.size: 1024m                        # jobmanager 内存
    
    taskmanager.memory.process.size: 1568m             # task manager 内存
    taskmanager.numberOfTaskSlots: 1                   # 每个 task manager 有多少个 slot
    
    parallelism.default: 1                             # flink run 命令不指定并行度时,默认使用 1
    
    jobmanager.execution.failover-strategy: region     # 故障重启策略
                                                       # region 代表只重启 ExecutionGraph 中对应的 Region 的 Task
                                                       # full 代表重启 Job 中所有的 Task,即重置整个 ExecutionGraph
    

    这是默认的配置项,还有很多其他选项

    conf/masters 配置 Job Manager 的 IP 和 Web UI 的端口

    192.168.1.1:8081
    

    conf/slaves 配置 Task Manager 的 IP

    192.168.1.2
    192.168.1.3
    

    把配置 copy 到所有节点

    启动

    bin/start-cluster.sh
    

    这个脚本到 conf/masters 配置的节点上调用下面的命令启动 Job Manager

    bin/jobmanager.sh
    

    再到 conf/slaves 配置的节点上调用下面的命令启动 Task Manager

    bin/taskmanager.sh
    

    在 master 节点上可以看到启动了下面的 Java 程序

    org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    

    在 slave 节点上可以看到启动了下面的 Java 程序

    org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    

    可以登录 master 的 localhost:8081 通过 Web UI 查看

    可以看到有 3 个 Task Manager,由于每个 Task Manager 只配了一个 slot,总共是 3 个 Task Slot

    提交 Job

    提交 examples 目录下的例子程序

    bin/flink run examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt
    

    会一直等待直到程序完成,输出日志如下

    Job has been submitted with JobID f038a66d9b9d6c9e9b80b866dde2dacf
    Program execution finished
    Job with JobID f038a66d9b9d6c9e9b80b866dde2dacf has finished.
    Job Runtime: 4508 ms
    

    可以通过 -d (detach) 参数提交后就退出,不用等待程序结束

    bin/flink run -d examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt
    

    输出日志如下

    Job has been submitted with JobID 161410edb6b9a28ca69e84e5fe0885c3
    

    可以到 Web UI 的 Completed Jobs 下查看

    bin/flink 除了 run 还有其他命令,简单介绍如下

    Action "run" compiles and runs a program.
      Syntax: run [OPTIONS] <jar-file> <arguments>
    
    Action "info" shows the optimized execution plan of the program (JSON).
      Syntax: info [OPTIONS] <jar-file> <arguments>
    
    Action "list" lists running and scheduled programs.
      Syntax: list [OPTIONS]
    
    Action "stop" stops a running program with a savepoint (streaming jobs only).
      Syntax: stop [OPTIONS] <Job ID>
    
    Action "cancel" cancels a running program.
      Syntax: cancel [OPTIONS] <Job ID>
    
    Action "savepoint" triggers savepoints for a running job or disposes existing ones.
      Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
    
    

    bin/flink run 的部分参数简单介绍

      "run" action options:
         -c,--class <classname>               Class with the program entry point
                                              ("main()" method). Only needed if the
                                              JAR file does not specify the class in
                                              its manifest.
    
         -C,--classpath <url>                 Adds a URL to each user code
                                              classloader  on all nodes in the
                                              cluster. The paths must specify a
                                              protocol (e.g. file://) and be
                                              accessible on all nodes (e.g. by means
                                              of a NFS share). You can use this
                                              option multiple times for specifying
                                              more than one URL. The protocol must
                                              be supported by the {@link
                                              java.net.URLClassLoader}.
    
         -d,--detached                        If present, runs the job in detached mode
    
         -n,--allowNonRestoredState           Allow to skip savepoint state that
                                              cannot be restored. You need to allow
                                              this if you removed an operator from
                                              your program that was part of the triggered.
    
         -p,--parallelism <parallelism>       The parallelism with which to run the
                                              program. Optional flag to override the
                                              default value specified in the configuration.
    
         -py,--python <pythonFile>            Python script with the program entry
                                              point. The dependent resources can be
                                              configured with the `--pyFiles` option.
    
         -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job from
    
         -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                              mode, perform a best-effort cluster
                                              shutdown when the CLI is terminated
                                              abruptly, e.g., in response to a user
                                              interrupt, such as typing Ctrl + C.
    
      Options for executor mode:
         -D <property=value>   Generic configuration options for
                               execution/deployment and for the configured executor.
                               The available options can be found at
                               https://ci.apache.org/projects/flink/flink-docs-stabl
                               e/ops/config.html
    
         -e,--executor <arg>   The name of the executor to be used for executing the
                               given job, which is equivalent to the
                               "execution.target" config option. The currently
                               available executors are: "remote", "local",
                               "kubernetes-session", "yarn-per-job", "yarn-session".
    
      Options for default mode:
         -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                         to connect. Use this flag to connect to a
                                         different JobManager than the one specified
                                         in the configuration.
    
         -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                         for high availability mode
    
    

    run 命令支持提交 python 程序,这里没列出来

    添加 JobManager/TaskManager 到集群

    在要添加的 master 节点执行

    bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
    

    在要添加的 slave 节点执行

    bin/taskmanager.sh start|start-foreground|stop|stop-all
    

    不需要停止服务

    停止集群

    stop-cluster.sh
    


  • 相关阅读:
    python3.6配置flask
    jquery匿名函数和闭包(它山之石)笔记
    .net扩展方法
    对象继承
    MAC OS X PKG FILES
    NLP——天池新闻文本分类 Task2
    Python基础TASK1:变量与数据类型
    NLP——天池新闻文本分类 Task1
    随机分析与随机过程中的一些基本概念
    Java线程池
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/12897992.html
Copyright © 2011-2022 走看看