zoukankan      html  css  js  c++  java
  • spark 数据分析 分组取TopN

    package com.swust.seltop;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.*;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能再此止步!
     * @Function 分组取TopN
     *
     */
    public class SortTopN {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("top");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            jsc.setLogLevel("Error");
    
            String inputPath = "./data/top.txt";
            JavaRDD<String> input = jsc.textFile(inputPath,1);
            //top10类
            JavaPairRDD<String, Integer> pairRDD = input.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String line) throws Exception {
                    // 14 cat1 cat1
                    String[] splits = line.split(" ");
                    Tuple2<String, Integer> tp = new Tuple2<>(splits[0]+"	"+splits[1]+"	"+splits[2], Integer.parseInt(splits[0]));
                    return tp;
                }
            });
            //为每一个分区创建一个本地 top10列表
            JavaRDD<SortedMap<Integer, String>> singleTop10 = pairRDD.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {
                @Override
                public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iterator) throws Exception {
                    SortedMap<Integer, String> top = new TreeMap<>();
                    while (iterator.hasNext()) {
                        Tuple2<String, Integer> next = iterator.next();
                        top.put(next._2, next._1);
                        //保留正序前10
                        if (top.size() > 10) {
                            top.remove(top.firstKey());
                        }
                    }
                    List<SortedMap<Integer, String>> list = Collections.singletonList(top);
                    return list.iterator();
                }
            });
            //收集所有本地的top10 列表
            List<SortedMap<Integer, String>> singleResult = singleTop10.collect();
            SortedMap<Integer,String> finalResult = new TreeMap<>();
            for (SortedMap<Integer, String> elements : singleResult){
                //遍历map并将数据存储到finalResult内
                Set<Map.Entry<Integer, String>> entries = elements.entrySet();
                for (Map.Entry<Integer,String> entry:entries){
                    finalResult.put(entry.getKey(),entry.getValue());
                }
    
                if (finalResult.size()>10){
                    finalResult.remove(finalResult.firstKey());
                }
            }
            //输出结果
            for (Map.Entry<Integer,String> entry : finalResult.entrySet()){
                System.err.println(entry.getKey()+"------"+entry.getValue());
            }
            // 替代方案 使用reduce进行数据迭代
            /*singleTop10.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() {
                @Override
                public SortedMap<Integer, String> call(SortedMap<Integer, String> sm1, SortedMap<Integer, String> sm2) throws Exception {
                    SortedMap<Integer,String> top10 = new TreeMap<>();
                    for (Map.Entry<Integer,String> entry : sm1.entrySet()){
                        top10.put(entry.getKey(),entry.getValue());
                        if (top10.size()>10){
                            top10.remove(top10.firstKey());
                        }
                    }
                    for (Map.Entry<Integer,String> entry : sm2.entrySet()){
                        top10.put(entry.getKey(),entry.getValue());
                        if (top10.size()>10){
                            top10.remove(top10.firstKey());
                        }
                    }
                    return top10;
                }
            });*/
    
        }
    }
    

      

  • 相关阅读:
    file_put_contents 和php://input 实现存储数据进图片中
    Oracle PL/SQL游标的学习
    三层的再理解
    取出字符串中任意的顺序匹配
    oracle的正则表达式(regular expression)简单介绍
    oracle动态游标的简单实现方法
    存储过程之游标笔记小结
    char,varchar,nvarchar的区别
    大连惊魂记
    让asp.net默认的上传组件支持进度条反映(转)
  • 原文地址:https://www.cnblogs.com/walxt/p/12794732.html
Copyright © 2011-2022 走看看