zoukankan      html  css  js  c++  java
  • Flink on yarn

    1、准备

    准备集群

    • Zookeeper集群
    • Hadoop集群

    准备flink jar包

    官网地址:https://flink.apache.org/downloads.html

    flink-1.8之后没有集成hadoop,需要下载对应的hadoop jar包

    1.8之前:

    1.8之后:

    需要下载对应hadoop的组件(然后放入flink的lib目录下)

    配置Hadoop的环境变量

    配置flink配置文件

    置jobmanager地址

    jobmanager.rpc.address: bigdata-03

    其他自己看情况配置

    配置master和slaves

    2、Flink yarn

    Session Cluster模式

    (1) 启动hadoop集群(略)

    (2)需要自己自定义配置的话,可以使用来查看参数:

    bin/yarn-session.sh –help
    Usage:
       Required
         -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
       Optional
         -D <property=value>             use value for given property
         -d,--detached                   If present, runs the job in detached mode
         -h,--help                       Help for the Yarn session CLI.
         -id,--applicationId <arg>       Attach to running YARN session
         -j,--jar <arg>                  Path to Flink jar file
         -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
         -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
         -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
         -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
         -nm,--name <arg>                Set a custom name for the application on YARN
         -q,--query                      Display available YARN resources (memory, cores)
         -qu,--queue <arg>               Specify YARN queue.
         -s,--slots <arg>                Number of slots per TaskManager
         -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                         as typing Ctrl + C.
         -st,--streaming                 Start Flink in streaming mode
         -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
         -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
         -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
         -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
    yarn-session的参数介绍
      -n : 指定TaskManager的数量;
      -d: 以分离模式运行;
      -id:指定yarn的任务ID;
      -j:Flink jar文件的路径;
      -jm:JobManager容器的内存(默认值:MB);
      -nl:为YARN应用程序指定YARN节点标签;
      -nm:在YARN上为应用程序设置自定义名称;
      -q:显示可用的YARN资源(内存,内核);
      -qu:指定YARN队列;
      -s:指定TaskManager中slot的数量;
      -st:以流模式启动Flink;
      -tm:每个TaskManager容器的内存(默认值:MB);
      -z:命名空间,用于为高可用性模式创建Zookeeper子路径;

    (3)启动yarn-session

    ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
    其中:
    -n(--container):TaskManager的数量。
    -s(--slots):	每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
    -jm:JobManager的内存(单位MB)。
    -tm:每个taskmanager的内存(单位MB)。
    -nm:yarn 的appName(现在yarn的ui上的名字)。 
    -d:后台执行。

    (4)执行程序:

    ./flink run -c com.duoduo.WordCount  wordcount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

    (5)yarn上查看

    (6)取消yarn-session

    yarn application --kill application_1577588252906_0001

    Per Job Cluster模式

    (1)启动hadoop集群(略)

    (2)不启动yarn-session,直接执行job

    ./flink run –m yarn-cluster -c com.atguigu.WordCount  WordCount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

    3、遇到的问题

    3.1 java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

    2020-05-21 16:09:11,255 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli   - Error while running the Flink session.
    java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71)
        at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
        at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
    Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 26 more
    
    ------------------------------------------------------------
     The program finished with the following exception:
    
    java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71)
        at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
        at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
    Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 26 more

    解决:

    我的hadoop是2.73,放入flink-shaded-hadoop-2-uber-2.7.7-10.0.jar依旧报上面错误,换成高一个版本就好了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

    3.2 在启动日志中发现如下错误

    org.apache.flink.yarn.cli.FlinkYarnSessionCli.
    java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
            at java.lang.Class.forName0(Native Method)
            at java.lang.Class.forName(Class.java:264)
            at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076)
            at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030)
            at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
            at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
            ... 5 more

    可以参考以下Flink官网的提示https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html

    解决办法就是在环境变量中增加

    export HADOOP_CLASSPATH=`hadoop classpath`

    3.3 Couldn't deploy Yarn session cluster

    org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
        at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:548)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
    Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 10 exceeds the maximum number of virtual cores 7 available in the YarnCluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.'
        at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:292)
        at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
        at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:373)
        ... 7 more

    解决:

    yarn.containers.vcores设置了虚拟的cores=7,taskManager的slot我设置了10个,资源不够
    方法一:要么调大vcores参数

    方法二:减少slot个数taskmanager.numberOfTaskSlots: 5

    参考地址:https://www.jianshu.com/p/1b05202c4fb6
    参考地址:https://blog.csdn.net/joseph25/article/details/88878350

  • 相关阅读:
    Mybatis学习(3)实现数据的增删改查
    Mybatis学习(2)以接口的方式编程
    Mybatis学习(1)开发环境搭建
    Apache Commons 工具类介绍及简单使用
    JAVA枚举使用详解
    Spring中的<context:annotation-config/>配置
    SpringMVC(12)完结篇 基于Hibernate+Spring+Spring MVC+Bootstrap的管理系统实现
    SpringMVC(11)表单标签
    面试题
    开发辅助网址
  • 原文地址:https://www.cnblogs.com/hyunbar/p/12932377.html
Copyright © 2011-2022 走看看