zoukankan      html  css  js  c++  java
  • flink 安装及wordcount

    1、下载

    http://mirror.bit.edu.cn/apache/flink/

    2、安装

    确保已经安装java8以上
    
    解压flink
    tar zxvf flink-1.8.0-bin-scala_2.11.tgz
    
    启动本地模式
    $ ./bin/start-cluster.sh  # Start Flink
    [hadoop@bigdata-senior01 flink-1.8.0]$ ./bin/start-cluster.sh 
    Starting cluster.
    Starting standalonesession daemon on host bigdata-senior01.home.com.
    Starting taskexecutor daemon on host bigdata-senior01.home.com.
    [hadoop@bigdata-senior01 flink-1.8.0]$ jps
    1995 StandaloneSessionClusterEntrypoint
    2443 TaskManagerRunner
    2526 Jps

    3、访问flink

    http://localhost:8081

     4、第一个程序wordcount,从一个socket流中读出字符串,计算10秒内的词频

    4.1 引入依赖

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.8.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.8.0</version>
                <scope>provided</scope>
            </dependency>
    
        </dependencies>

    4.2 代码

    public class SocketWindowWordCount {
    
        public static void main(String args[]) throws Exception {
    
            // the host and the port to connect to
            final String hostname;
            final int port;
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                hostname = params.has("hostname") ? params.get("hostname") : "localhost";
                port = params.getInt("port");
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                        "and port is the address of the text server");
                System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                        "type the input text into the command line");
                return;
            }
    
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // get input data by connecting to the socket
            DataStream<String> text = env.socketTextStream(hostname, port, "
    ");
    
            // parse the data, group it, window it, and aggregate the counts
            DataStream<WordWithCount> windowCounts = text
                    .flatMap(new FlatMapFunction<String, WordWithCount>() {
                        @Override
                        public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                            for (String word : value.split("\s")) {
                                out.collect(new WordWithCount(word,1L));
                            }
                        }
                    })
                    .keyBy("word")
                    .timeWindow(Time.seconds(10))
                    .reduce(new ReduceFunction<WordWithCount>() {
                        @Override
                        public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception {
                            return new WordWithCount(value1.word,value1.count+value2.count);
                        }
                    });
    
            // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1);
    
            env.execute("Socket Window WordCount");
        }
    
        /**
         * Data type for words with count.
         */
        public static class WordWithCount {
            public String word;
            public long count;
    
            public WordWithCount() {
            }
    
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString() {
                return word + " : " + count;
            }
        }
    }

    4.4 编译成jar包上传

    先用nc启动侦听并接受连接
    
    nc -lk 9000
    
    启动SocketWindowWordCount
    [hadoop@bigdata-senior01 bin]$ ./flink run /home/hadoop/SocketWindowWordCount.jar --port 9000
    
    
    查看输出
    [root@bigdata-senior01 log]# tail -f flink-hadoop-taskexecutor-0-bigdata-senior01.home.com.out
    在nc端输入字符串,在日志监控端10秒为一个周期就可以看到输出合计。


  • 相关阅读:
    HDU 5585 Numbers
    HDU 3308 LCIS
    POJ 2991 Crane
    POJ 1436 Horizontally Visible Segments
    POJ 3667 Hotel
    HaiHongOJ 1003 God Wang
    【SDOI 2008】 递归数列
    5月19日省中提高组题解
    【HDU 1588】 Gauss Fibonacci
    【POJ 3233】Matrix Power Series
  • 原文地址:https://www.cnblogs.com/asker009/p/10926615.html
Copyright © 2011-2022 走看看