zoukankan      html  css  js  c++  java
  • Flink学习笔记——SocketWindowWordCount

    参考Flink官方代码的example

    https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
    

    引入pom

            <!--flink-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
    

    代码

    package com.xxx.xx.flink;
    
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    public class SocketWindowWordCount {
    
        /**
         * Data type for words with count.
         */
        public static class WordWithCount {
    
            public String word;
            public long count;
    
            public WordWithCount() {
            }
    
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString() {
                return word + " : " + count;
            }
        }
    
        public static void main(String[] args) throws Exception {
            // the host and the port to connect to
            final String hostname;
            final int port;
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                hostname = params.has("hostname") ? params.get("hostname") : "localhost";
                port = 9999;
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                        "and port is the address of the text server");
                System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                        "type the input text into the command line");
                return;
            }
    
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // get input data by connecting to the socket
            DataStream<String> text = env.socketTextStream(hostname, port, "
    ");
    
            // parse the data, group it, window it, and aggregate the counts
            DataStream<WordWithCount> windowCounts = text
    
                    .flatMap(new FlatMapFunction<String, WordWithCount>() {
                        @Override
                        public void flatMap(String value, Collector<WordWithCount> out) {
                            for (String word : value.split("\s")) {
                                out.collect(new WordWithCount(word, 1L));
                            }
                        }
                    })
    
                    .keyBy("word")
                    .timeWindow(Time.seconds(5))
    
                    .reduce(new ReduceFunction<WordWithCount>() {
                        @Override
                        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                            return new WordWithCount(a.word, a.count + b.count);
                        }
                    });
    
            // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1);
    
            env.execute("Socket Window WordCount");
    
        }
    
    }
    

    运行

    nc -lk 9999
    1 2 3 4
    

    结果

  • 相关阅读:
    【LeetCode】204
    【LeetCode】231
    【LeetCode】58
    解决error104 socket error问题
    爬虫问题
    80端口被system占用的问题
    Linux命令行下批量重命名文件名为数字索引编号(0~N.xxx)的方法
    [转]利用excel进行线性规划求解
    python——时间与时间戳之间的转换
    最全中文停用词表整理(1893个)
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/12486665.html
Copyright © 2011-2022 走看看