zoukankan      html  css  js  c++  java
  • Spark Java版本wordCount

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import scala.Tuple2;
    
    import java.net.URL;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class wordcount{
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf();
            conf.setMaster("local[*]").setAppName("wc");
            JavaSparkContext sc = new JavaSparkContext(conf);
            URL url = wordcount.class.getResource("/wc.txt");
            JavaRDD<String> lineRDD = sc.textFile(url.getPath());
            JavaRDD<String> flatRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
    
                    String[] fields = line.split(" ");
                    List<String> list = Arrays.asList(fields);
                    return list.iterator();
                }
            });
    
            JavaPairRDD<String, Integer> mapRDD = flatRDD.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    Tuple2<String, Integer> stringIntegerTuple2 = new Tuple2<String, Integer>(s, 1);
                    return stringIntegerTuple2;
                }
            });
    
            JavaPairRDD<String, Integer> resultRDD = mapRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
    //        resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
    //            @Override
    //            public void call(Tuple2<String, Integer> tuple2) throws Exception {
    //                System.out.println(tuple2._1 +"  :  " + tuple2._2.toString());
    //            }
    //        });
    
            Iterator<Tuple2<String, Integer>> iter = resultRDD.sortByKey(false).collect().iterator();
            while(iter.hasNext())
            {
                Tuple2<String, Integer> wc = iter.next();
                System.out.println(wc._1 + " : " + wc._2.toString());
            }
            sc.stop();
        }
    }

    太烦了

  • 相关阅读:
    关于npm无法安装依赖包以及安装包缓慢的解决方法
    centos 上安装nodejs v8.0.0
    nginx 负载均衡
    关于前端
    递归函数
    多重循环
    闭包
    spring boot集成mybatis(2)
    spring boot集成mybatis(3)
    spring boot集成mybatis(1)
  • 原文地址:https://www.cnblogs.com/kpwong/p/14036123.html
Copyright © 2011-2022 走看看