zoukankan      html  css  js  c++  java
  • flink简单上手

    创建一个maven项目

    maven依赖为

       <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
        </dependencies>

    批处理WordCount

    Java代码实现:

    package com.test;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.AggregateOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class wc {
        public static void main(String[] args) throws Exception {
            // 创建执行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            // 从文件中读取数据
            String inputPath = "D:\IDE\flinkTest\src\main\resources\wc.txt";
            DataSet<String> inputDataSet = env.readTextFile(inputPath);
    
            // 对数据集进行处理
            AggregateOperator<Tuple2<String, Integer>> sum = inputDataSet.flatMap(new myFlatMapper())
                    .groupBy(0)  // 将元祖的第一个位置的字段分组
                    .sum(1);// 将元祖的第二个位置求和
            sum.print();
        }
        // 自定义类实现接口
        public static class myFlatMapper implements FlatMapFunction<String, Tuple2<String ,Integer>>{
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 按空格分词
                String[] words = value.split(" ");
                // 遍历所有的word,包成二元组输出
                for (String word : words) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }
    
    }

    注:事先准备好要计算的文档哦~ 嘻嘻大家肯定都了解的

    流处理WordCount

    Java代码实现

    package com.test.wc;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class StreamWC {
        public static void main(String[] args) throws Exception {
            // 创建流处理执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 从文件中读取数据
            String inputPath = "D:\IDE\flinkTest\src\main\resources\wc.txt";
            DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);
    
            // 基于数据流进行转换计算
            DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new wc.myFlatMapper())
                    .keyBy(0)
                    .sum(1);
            resultStream.print();
    
            // 事件触发,启动任务
            env.execute();
        }
    }

    结果打印(因为flink是有状态的计算,所以他会多次输出,输出结果前的数据是1-8,表示当前线程有八个,这个数量默认和自己的电脑内核相同)

    3> (hello,1)
    5> (fine,1)
    6> (how,1)
    5> (you,1)
    5> (you,2)
    3> (thank,1)
    4> (me,1)
    3> (hello,2)
    4> (are,1)
    3> (hello,3)
    5> (you,3)
    3> (hello,4)
    3> (hello,5)
    3> (python,1)
    1> (scala,1)
    8> (and,1)
    2> (java,1)
    7> (flink,1)
  • 相关阅读:
    Elasticsearch Query DSL 整理总结(三)—— Match Phrase Query 和 Match Phrase Prefix Query
    Elasticsearch Query DSL 整理总结(二)—— 要搞懂 Match Query,看这篇就够了
    Elasticsearch Query DSL 整理总结(一)—— Query DSL 概要,MatchAllQuery,全文查询简述
    Elasticsearch Java Rest Client API 整理总结 (三)——Building Queries
    Elasticsearch date 类型详解
    python 历险记(五)— python 中的模块
    python 历险记(四)— python 中常用的 json 操作
    python 历险记(三)— python 的常用文件操作
    Elasticsearch Java Rest Client API 整理总结 (二) —— SearchAPI
    Elasticsearch Java Rest Client API 整理总结 (一)——Document API
  • 原文地址:https://www.cnblogs.com/lmr7/p/15438840.html
Copyright © 2011-2022 走看看