zoukankan      html  css  js  c++  java
  • windows环境下flink入门demo实例

    Apache Flink是什么?

    Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态计算。可部署在各种集群环境,对各种大小的数据规模进行快速计算。上面是非常官方的描述,说白了我们为什么选择Flink,是因为他在社区口碑非常不错。在国内的话有阿里这种大数据大流量的公司一直在输出,当然像腾讯、华为、饿了么、滴滴等也都有使用Apache Flink。

    进入正题

    本篇博文涉及到的软件工具以及下载地址:

    Apache Flink :https://flink.apache.org/downloads.html

    Netcat:https://eternallybored.org/misc/netcat/

    Netcat是一个有“瑞士军刀”美誉的网络工具,这里用来绑定端口等待Apache Flink的连接

    第一步:启动Flink

    从上面的地址下载Flink后是一个压缩包,解压后的目录结构如下:

    /conf/flink-conf.yaml里有一些Flink的基本配置信息,如,jobmanager、taskmanager的端口和jvm内存(默认1024M)大小,web控制台的端口(默认8081)等。我们可以不该任何配置,然后进入到bin下,执行start-cluster.bat。这里要注意不是并不是flink.bat。flink.bat是用来提交job的。还有要确保相关的端口没有被占用

    运行成功后会有两个java黑窗口(一个TaskManager、一个JobManager),如果只有一个java黑窗口,很可能是你的TaskManager因为端口占用没有启动起来,成功后访问:http://localhost:8081.就会看到如下的web管理控制台了:

    如果启动失败的话,上面箭头所指向的地方应该是0.

    第二步:job任务编写

    1.首先需要新建一个maven工程,然后导入Flink的接口依赖

    1. <groupId>org.apache.flink</groupId>
    2. <artifactId>flink-java</artifactId>
    3. <version>1.7.1</version>
    1. <groupId>org.apache.flink</groupId>
    2. <artifactId>flink-streaming-java_2.11</artifactId>
    3. <version>1.7.1</version>
    1. <groupId>org.apache.flink</groupId>
    2. <artifactId>flink-clients_2.11</artifactId>
    3. <version>1.7.1</version>

    2.编写具体的job,官方提供了一个单词统计的demo

    package com.kl;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {

    1. // the host and the port to connect to
    2. final String hostname;
    3. final int port;
    4. try {
    5. final ParameterTool params = ParameterTool.fromArgs(args);
    6. hostname = params.has("hostname") ? params.get("hostname") : "localhost";
    7. port = params.has("port") ? params.getInt("port"):9000;
    8. } catch (Exception e) {
    9. System.err.println("No port specified. Please run 'SocketWindowWordCount " +
    10. "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
    11. "and port is the address of the text server");
    12. System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
    13. "type the input text into the command line");
    14. return;
    15. }
    16. // get the execution environment
    17. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    18. // get input data by connecting to the socket
    19. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
    20. // parse the data, group it, window it, and aggregate the counts
    21. DataStream<WordWithCount> windowCounts = text
    22. .flatMap(new FlatMapFunction<String, WordWithCount>() {
    23. public void flatMap(String value, Collector<WordWithCount> out) {
    24. for (String word : value.split("\s")) {
    25. out.collect(new WordWithCount(word, 1L));
    26. } }})
    27. .keyBy("word")
    28. .timeWindow(Time.seconds(5))
    29. .reduce(new ReduceFunction<WordWithCount>() {
    30. public WordWithCount reduce(WordWithCount a, WordWithCount b) {
    31. return new WordWithCount(a.word, a.count + b.count);
    32. }});
    33. // print the results with a single thread, rather than in parallel
    34. windowCounts.print().setParallelism(1);
    35. env.execute("Socket Window WordCount");

    }
    /**

    1. * Data type for words with count.
    2. */

    public static class WordWithCount {

    1. public String word;
    2. public long count;
    3. public WordWithCount() {}
    4. public WordWithCount(String word, long count) {
    5. this.word = word;
    6. this.count = count;
    7. }
    8. @Override
    9. public String toString() {
    10. return word + " : " + count;
    11. }

    }
    }

    上面demo实现了从启动参数中获取ip和端口,然后连接从输入流接收文本信息,然后统计文本里单词出现的次数。因为要打成可运行的jar,所以,还需要引入maven的jar打包插件,如下:

    1. <plugins>
    2. <plugin>
    3. <groupId>org.apache.maven.plugins</groupId>
    4. <artifactId>maven-shade-plugin</artifactId>
    5. <version>1.2.1</version>
    6. <executions>
    7. <execution>
    8. <phase>package</phase>
    9. <goals>
    10. <goal>shade</goal>
    11. </goals>
    12. <configuration>
    13. <transformers>
    14. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    15. <mainClass>com.kl.SocketWindowWordCount</mainClass>
    16. </transformer>
    17. </transformers>
    18. </configuration>
    19. </execution>
    20. </executions>
    21. </plugin>
    22. </plugins>

    mainClass标签中就是你的main方法所在类全类名。然后mvn install就可以打出一个可运行的jar包了。

    第三步:Netcat监听端口,等待连接

    从上面贴的地址下载Netcat后,是一个压缩包,有些安全软件可能会报病毒,请忽略就好了。然后解压文件目录如下:

    进入到这个目录,然后执行: nc64.exe -l -p 9000。相当于打开了9000端口,并监听了入站信息。最后实现的效果就是从这个窗口中输入的数据,回车后会发送Apache Flink中我们提交的job中处理输出,所以这里的9000端口,要和我们等下启动job的启动参数端口一致。

    第四步:提交job运行

    运行job有两种方式:可以通过Flink.bat运行,也可以通过web控制台运行。

    命令行运行:

    flink run E:flinkWorkingspceflinkdemo argetfinlk-demo-1.0-SNAPSHOT.jar --port 9000

    web控制台运行:

    如上图,点击Add New后选择你的jar包然后上传,上传成功就会在列表里列出来。然后选中你上传的jar。就会出现如下图的输入框,可以输入你的启动参数,然后点击submit提交就可以了

    第五步:验证效果

    提交后如果没有问题,job的详情页面如下:

    这个时候我们从Netcat的监听的黑窗口中敲入一些长文本,就会在Flink的job里统计输出出来如:

  • 相关阅读:
    Qt之表单布局(QFormLayout)
    Qt之格栅布局(QGridLayout)
    Qt之水平/垂直布局(QBoxLayout、QHBoxLayout、QVBoxLayout)
    Qt之手动布局
    MAC OS下JDK版本切换指南
    Qt之自定义布局管理器(QBorderLayout)
    Qt之自定义布局管理器(QFlowLayout)
    Qt之自定义布局管理器(QCardLayout)
    springMVC获取file,几种转换
    java将白色背景图片转换成无色
  • 原文地址:https://www.cnblogs.com/ldsweely/p/12033590.html
Copyright © 2011-2022 走看看