zoukankan      html  css  js  c++  java
  • Flink(1):Flink的基础案例

    相关文章链接

    1、批处理的WordCount案例

    // 创建执行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // 获取数据
    DataSource<String> dataSource = env.fromElements("flink spark hadoop", "hadoop spark", "flink flink");
    
    // 转换数据
    AggregateOperator<Tuple2<String, Integer>> result = dataSource
        .flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                for (String field : s.split(" ")) {
                    collector.collect(field);
                }
            }
        })
        .map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        })
        .groupBy(0)
        .sum(1);
    
    // 输出数据
    result.print();
    

    2、流处理的WordCount案例

    // 执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    //env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    
    // source数据源
    DataStreamSource<String> lines = env.socketTextStream("localhost", 9999);
    
    // 数据转换
    SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines
        .flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(word);
                }
            }
        })
        .map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        })
        .keyBy(t -> t.f0)
        .sum(1);
    
    // sink
    result.print();
    
    env.execute();
    

    3、流处理的基于Lambda表达式的WordCount案例

    // 执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    
    // 获取数据
    DataStreamSource<String> dataStreamSource = env.fromElements("abc abc abc");
    
    // 数据转换
    SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource
        .flatMap((String value, Collector<String> out) -> {
            Arrays.stream(value.split(" ")).forEach(out::collect);
        }).returns(Types.STRING)
        .map((String value) ->
                Tuple2.of(value, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}
        ))
        .keyBy(t -> t.f0)
        .sum(1);
    
    // 数据输出
    result.print();
    
    // 执行程序
    env.execute();
    

      

    你现在所遭遇的每一个不幸,都来自一个不肯努力的曾经
  • 相关阅读:
    Typora Writings
    Xcode7.3 beta 新功能
    最美应用API接口分析
    'Project Name' was compiled with optimization
    web前端开发与iOS终端开发的异同[转]
    2015-12-19_16_30_15
    Xcode搭建Python编译环境
    jsPach.qq.com
    Q&AApple’s Craig Federighi talks open source Swift, Objective-C and the next 20 years of development
    .NET Core项目与传统vs项目的细微不同
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14907795.html
Copyright © 2011-2022 走看看