zoukankan      html  css  js  c++  java
  • spark 数据分析 分组取TopN

    package com.swust.seltop;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.*;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能再此止步!
     * @Function 分组取TopN
     *
     */
    public class SortTopN {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("top");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            jsc.setLogLevel("Error");
    
            String inputPath = "./data/top.txt";
            JavaRDD<String> input = jsc.textFile(inputPath,1);
            //top10类
            JavaPairRDD<String, Integer> pairRDD = input.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String line) throws Exception {
                    // 14 cat1 cat1
                    String[] splits = line.split(" ");
                    Tuple2<String, Integer> tp = new Tuple2<>(splits[0]+"	"+splits[1]+"	"+splits[2], Integer.parseInt(splits[0]));
                    return tp;
                }
            });
            //为每一个分区创建一个本地 top10列表
            JavaRDD<SortedMap<Integer, String>> singleTop10 = pairRDD.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {
                @Override
                public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iterator) throws Exception {
                    SortedMap<Integer, String> top = new TreeMap<>();
                    while (iterator.hasNext()) {
                        Tuple2<String, Integer> next = iterator.next();
                        top.put(next._2, next._1);
                        //保留正序前10
                        if (top.size() > 10) {
                            top.remove(top.firstKey());
                        }
                    }
                    List<SortedMap<Integer, String>> list = Collections.singletonList(top);
                    return list.iterator();
                }
            });
            //收集所有本地的top10 列表
            List<SortedMap<Integer, String>> singleResult = singleTop10.collect();
            SortedMap<Integer,String> finalResult = new TreeMap<>();
            for (SortedMap<Integer, String> elements : singleResult){
                //遍历map并将数据存储到finalResult内
                Set<Map.Entry<Integer, String>> entries = elements.entrySet();
                for (Map.Entry<Integer,String> entry:entries){
                    finalResult.put(entry.getKey(),entry.getValue());
                }
    
                if (finalResult.size()>10){
                    finalResult.remove(finalResult.firstKey());
                }
            }
            //输出结果
            for (Map.Entry<Integer,String> entry : finalResult.entrySet()){
                System.err.println(entry.getKey()+"------"+entry.getValue());
            }
            // 替代方案 使用reduce进行数据迭代
            /*singleTop10.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() {
                @Override
                public SortedMap<Integer, String> call(SortedMap<Integer, String> sm1, SortedMap<Integer, String> sm2) throws Exception {
                    SortedMap<Integer,String> top10 = new TreeMap<>();
                    for (Map.Entry<Integer,String> entry : sm1.entrySet()){
                        top10.put(entry.getKey(),entry.getValue());
                        if (top10.size()>10){
                            top10.remove(top10.firstKey());
                        }
                    }
                    for (Map.Entry<Integer,String> entry : sm2.entrySet()){
                        top10.put(entry.getKey(),entry.getValue());
                        if (top10.size()>10){
                            top10.remove(top10.firstKey());
                        }
                    }
                    return top10;
                }
            });*/
    
        }
    }
    

      

  • 相关阅读:
    【字符集及字符编码】UTF-8、UTF-16和UTF-32
    【Android】SQLite基本用法(转)
    【eclipse】导入/导出开发环境(包括编辑器字体颜色大小等)
    一个Android Socket的例子(转)
    Linux中与Windows对应的InterlockedIncrement()函数
    Linux互斥锁pthread_mutex_t
    C++读写文本文件
    C++回调函数调用Java接口抽象函数
    Overlapped I/O模型--事件通知【摘录自《Windows网络编程》】
    Linux C++中需要的头文件
  • 原文地址:https://www.cnblogs.com/walxt/p/12794732.html
Copyright © 2011-2022 走看看