zoukankan      html  css  js  c++  java
  • 3、Flink批处理案例实现-Java

    在创建好的Flink项目的基础上,新建一个类

    package com.gong.batch;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.util.Collector;
    
    import java.lang.reflect.Parameter;
    
    public class WordCount {
        public static void main(String[] args) throws Exception{
            //解析命令行传过来的参数args
          ParameterTool params=ParameterTool.fromArgs(args);
    
            //获取一个flink的执行环境
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
          //读取输入数据
          DataSet<String> dataSet =null;
          if(params.has("input")){//判断参数是否带有input
             dataSet =env.readTextFile(params.get("input"));
          }else {
            System.out.println("数据不存在");
          }
          //单词词频统计
          DataSet<Tuple2<String,Integer>> counts=dataSet.flatMap(new Tokenizer())
                  .groupBy(0)
                  .sum(1);
    
          if(params.has("output")){
              //数据输出为csv格式
              counts.writeAsCsv(params.get("output"),"
    "," ");
              //提交执行flink应用
              env.execute("wordcount exmple ");
          }else {
              counts.print();
          }
        }
        public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens =value.toLowerCase().split("\W+");
                for (String token:tokens){
                    out.collect(new Tuple2<>(token,1));
                }
            }
        }
    }
  • 相关阅读:
    《Java算法》Java贪心算法
    《Java知识应用》Java下Linux系统下word转PDF
    《MySQL数据库》MySQL常用语法(二)
    《MySQL数据库》MySQL常用语法(一)
    《Java算法》Java判重算法-整数判重
    《Java算法》Java排序算法-快速排序
    POJ 1113:Wall
    POJ 1584:A Round Peg in a Ground Hole
    51nod 1035:最长的循环节
    51nod 1022 石子归并 环形+四边形优化
  • 原文地址:https://www.cnblogs.com/braveym/p/13628384.html
Copyright © 2011-2022 走看看