zoukankan      html  css  js  c++  java
  • Flink部署-standalone模式

    Flink部署-standalone模式

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/paicMis/article/details/84642263

    安装环境信息

    flink-1.6.2-bin-hadoop27-scala_2.11.tgz
    hadoop-2.7.5
    java 1.8
    zookeeper 3.4.6
    os:centos 6.4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1、下载
    直接去flink的社区下载就可以了。http://flink.apache.org/downloads.html
    2、解压

     tar -zxvf flink-1.6.2-bin-hadoop27-scala_2.11.tgz 
    
    • 1

    3、修改环境变量 ~.bash_profile

    export FLINK_HOME=/opt/flink-1.6.2
    export PATH=$FLINK_HOME/bin:$PATH
    
    • 1
    • 2

    4、修改flink-conf.yaml配置文件,先配置一个简单版本,standalone的模式

    Hadoop的nameservice
    jobmanager.rpc.address: cdh1
    jobmanager.rpc.port: 6123
    jobmanager.heap.size: 1024m
    taskmanager.heap.size: 1024m
    taskmanager.numberOfTaskSlots: 4
    parallelism.default: 12
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5、修改slaves和masters2个文件,用来配置taskManager和JobManager信息

    [hadoop@cdh1 conf]$ cat slaves 
    cdh2
    cdh3
    cdh4
    cdh5
    [hadoop@cdh1 conf]$ cat masters 
    cdh1:8081
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    6、将flink安装所有信息已经环境信息同步到其他机器上面,这里有几台机器就要执行几次

    scp .bash_profile hadoop@cdh3:~/.bash_profile
    scp -r ./flink-1.6.2 hadoop@cdh3:/opt/
    
    • 1
    • 2

    7、启动flink
    [hadoop@cdh1 bin]$ ./start-cluster.sh
    8、启动完成已经我们可以用jps。分别可以看到JobManager和TaskManager的2个进程

    [hadoop@cdh1 bin]$ jps
    3866 StandaloneSessionClusterEntrypoint
    [hadoop@cdh2 ~]$ jps
    3534 TaskManagerRunner
    
    • 1
    • 2
    • 3
    • 4

    8、登录JobManager的地址查看ui http://192.168.18.160:8081
    在这里插入图片描述

    已经表示搭建完成了,现在我们开始验证一下集群

    使用start-scala-shell.sh来验证
    ${FLINK_HOME}/bin/start-scala-shell.sh是flink提供的交互式clinet,可以用于代码片段的测试,方便开发工作,它有两种启动方式,一种是工作在本地,另一种是工作到集群。本例中因为机器连接非常方便,就直接使用集群进行测试,在开发中,如果集群连接不是非常方便,可以连接到本地,在本地开发测试通过后,再连接到集群进行部署工作。如果程序有依赖的jar包,则可以使用 -a <path/to/jar.jar> 或 --addclasspath <path/to/jar.jar>参数来添加依赖。

    1.本地连接

    ${FLINK_HOME}/bin/start-scala-shell.sh local
    
    • 1

    2.集群连接

    ${FLINK_HOME}/bin/start-scala-shell.sh remote <hostname> <portnumber>
    
    • 1

    3.带有依赖包的格式

    ${FLINK_HOME}/bin/start-scala-shell.sh [local|remote<host><port>] --addclasspath<path/to/jar.jar>
    
    • 1

    4.查看帮助

    ${FLINK_HOME}/bin/start-scala-shell.sh --help
    
    [hadoop@cdh2 bin]$ ./start-scala-shell.sh --help
    Flink Scala Shell
    Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
    
    Command: local [options]
    Starts Flink scala shell with a local Flink cluster
      -a, --addclasspath <path/to/jar>
                               Specifies additional jars to be used in Flink
    Command: remote [options] <host> <port>
    Starts Flink scala shell connecting to a remote cluster
      <host>                   Remote host name as string
      <port>                   Remote port as integer
    
      -a, --addclasspath <path/to/jar>
                               Specifies additional jars to be used in Flink
    Command: yarn [options]
    Starts Flink scala shell connecting to a yarn cluster
      -n, --container arg      Number of YARN container to allocate (= Number of TaskManagers)
      -jm, --jobManagerMemory arg
                               Memory for JobManager container
      -nm, --name <value>      Set a custom name for the application on YARN
      -qu, --queue <arg>       Specifies YARN queue
      -s, --slots <arg>        Number of slots per TaskManager
      -tm, --taskManagerMemory <arg>
                               Memory per TaskManager container
      -a, --addclasspath <path/to/jar>
                               Specifies additional jars to be used in Flink
      --configDir <value>      The configuration directory.
      -h, --help               Prints this usage text
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    我们 使用集群模式去验证

    [hadoop@cdh1 bin]$ ./start-scala-shell.sh remote 192.168.18.160 8081
    
    • 1

    运行如下案例代码

    Scala> val text = benv.fromElements(
      "To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer",
      "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")
    Scala> val counts = text
        .flatMap { _.toLowerCase.split("\W+") }
        .map { (_, 1) }.groupBy(0).sum(1)
    Scala> counts.print()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    运行结果
    在这里插入图片描述

    web url也可以看到详细的信息
    在这里插入图片描述

    遇到异常情况:
    我们这边是因为安装了Scala导致通信失败,将Scala的环境信息去掉就可以了。具体问题还不是很清楚,待后续查明白。

    java.net.ConnectException: Connection refused (Connection refused)
            at java.net.PlainSocketImpl.socketConnect(Native Method)
            at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
            at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
            at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
            at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
            at java.net.Socket.connect(Socket.java:589)
            at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
            at java.lang.Thread.run(Thread.java:745)
    2018-11-19 01:49:52,298 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
     - Job Socket Window WordCount (8b38f995aa8e61fd520b61e0888ecd46) switched from state RUNNING to FAILING.
    java.net.ConnectException: Connection refused (Connection refused)
            at java.net.PlainSocketImpl.socketConnect(Native Method)
            at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
            at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
            at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
            at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
            at java.net.Socket.connect(Socket.java:589)
            at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    
  • 相关阅读:
    redis系列:分布式锁
    Netty实现高性能IOT服务器(Groza)之手撕MQTT协议篇上
    DelayQueue
    java内置锁实现锁住代码块方案(同一个对象或锁住整个类.class)
    Java的类加载机制
    数据库事务特性ACID
    MQ关于实现最终一致性分布式事务原理解析
    数据库分库分表容量划分建议参考阿里云DRDS原则
    异常解决:Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
    OpenResty 简介
  • 原文地址:https://www.cnblogs.com/bigben0123/p/10253901.html
Copyright © 2011-2022 走看看