zoukankan      html  css  js  c++  java
  • Flink基础之实现WordCount程序(Java版本多种写法)

    一、概述

    WordCount(单词计数)一直是大数据入门的经典案例,下面用 Java 实现 Flink 的 WordCount 代码

    二、创建 Maven 工程

    下面是 pom.xml 文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.duniqb</groupId>
        <artifactId>learn-flink</artifactId>
        <version>1.0</version>
        <properties>
            <java.version>11</java.version>
            <flink.version>1.10.1</flink.version>
            <log4j.version>1.2.17</log4j.version>
            <slf4j.version>1.7.7</slf4j.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
            <!-- Flink 的 Java api -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>${project.build.scope}</scope>
            </dependency>
            <!-- Flink Streaming 的 Java api -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.version}</artifactId>
                <version>${flink.version}</version>
                <scope>${project.build.scope}</scope>
            </dependency>
            <!-- Flink 的 Web UI -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web_${scala.version}</artifactId>
                <version>${flink.version}</version>
                <scope>${project.build.scope}</scope>
            </dependency>
        </dependencies>
    </project>
    

    三、编写SocketWordCount.java

    public class SocketWordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
            // 此处数据源是的 Linux 主机,通过 socket 的方式传输数据
            DataStreamSource<String> socket = env.socketTextStream("ubuntu", 6666, "
    ");
    //         lambda(socket);
    //        function(socket);
    //        lambdaAndFunction(socket);
            richFunction(socket);
            env.execute();
        }
    	// 富函数方式
        private static void richFunction(DataStreamSource<String> socket) {
            SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                }
    
                @Override
                public void close() throws Exception {
                    super.close();
                }
    
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] strings = value.split(" ");
                    for (String s : strings) {
                        out.collect(Tuple2.of(s, 1));
                    }
                }
            });
            KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = flatMap.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            });
    
            tuple2StringKeyedStream.sum(1).print();
        }
    
    	// Lambda 和函数混合完成
        private static void lambdaAndFunction(DataStreamSource<String> socket) {
            SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] strings = value.split(" ");
                    for (String s : strings) {
                        out.collect(Tuple2.of(s, 1));
                    }
    
                }
            });
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(f -> f.f0).sum(1);
            sum.print();
        }
    	
        // 纯函数完成
        private static void function(DataStreamSource<String> socket) {
            SingleOutputStreamOperator<String> flatMap = socket.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] strings = value.split(" ");
                    for (String s : strings) {
                        out.collect(s);
                    }
                }
            });
            SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy("f0").sum(1);
            sum.print();
        }
    	
        // Lambda 方式完成
        private static void lambda(DataStreamSource<String> socket) {
            socket
                    .flatMap((String value, Collector<String> out) -> {
                        Arrays.stream(value.split(" ")).forEach(out::collect);
                    }).returns(Types.STRING)
                    .map(f -> Tuple2.of(f, 1)).returns(Types.TUPLE(Types.STRING, Types.INT))
                    .keyBy(0)
                    .sum(1)
                    .print();
        }
    
    }
    

    四、运行效果

    在任意一台 Linux 主机,启动 nc -lk 6666,便可临时启用发送 socket 文本的服务器,并在启动后发送一些字符串:

    1606828183643

    然后启动 Java 程序,即可在终端看到如下消息:

    1606828257365

    没有修不好的电脑
  • 相关阅读:
    最大公约数与最小公倍数
    素数筛
    基础数学问题
    考试前打模板
    斐波那契公约数
    期望及期望dp
    状压dp总结
    树链剖分学习
    B君的教育
    [noip2016]愤怒的小鸟
  • 原文地址:https://www.cnblogs.com/duniqb/p/14070809.html
Copyright © 2011-2022 走看看