zoukankan      html  css  js  c++  java
  • 01Spark的TopN问题

    和hadoop的目的一样,给你数据,然后取TopN。数据如下:

    取出数据在排名前十的数据。

    代码如下:

    package com.test.book;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.SortedMap;
    import java.util.TreeMap;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    public class SparkTon {
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("SparkTon").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile("/Users/mac/Desktop/TopN2.txt");
    
            // 将数据读进来,拆分为Tuple(String,Integer)这种形式
            JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() {
    
                @Override
                public Tuple2<String, Integer> call(String t) throws Exception {
                    // TODO Auto-generated method stub
                    return new Tuple2<String, Integer>(t.split(",")[0], Integer.valueOf(t.split(",")[1]));
                }
            });
    
            // 按照整个分区来处理。
            JavaRDD<SortedMap<Integer, String>> pairspart = pairRDD
                    .mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {
    
                        private static final long serialVersionUID = 1L;
                        SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
    
                        @Override
                        public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> t)
                                throws Exception {
    
                            while (t.hasNext()) {
                                Tuple2<String, Integer> tuple2 = t.next();
    
                                top10.put(tuple2._2, tuple2._1);
                                if (top10.size() > 10) {
                                    top10.remove(top10.firstKey());
                                }
                            }
                            return Collections.singleton(top10);
                        }
                    });
    
            // 把各个分区处理好的数据拿过来。
            List<SortedMap<Integer, String>> allTop10 = pairspart.collect();
            // 在Reduce端用TreeMap对之前的分区数据排序。
            SortedMap<Integer, String> finalmap = new TreeMap<Integer, String>();
    
            // 遍历每个分区的SortedMap结构
            for (SortedMap<Integer, String> localTop10 : allTop10) {
    
                for (Map.Entry<Integer, String> entry : localTop10.entrySet()) {
    
                    finalmap.put(entry.getKey(), entry.getValue());
                    if (finalmap.size() > 10) {
                        finalmap.remove(finalmap.firstKey());
                    }
    
                }
            }
    
            // 打印出来。
            Set values = finalmap.keySet();
    
            Iterator<Integer> iterator = values.iterator();
    
            while (iterator.hasNext()) {
    
                System.out.println(finalmap.get(iterator.next()));
    
            }
    
        }
    
    }

    结果:

  • 相关阅读:
    集合
    Java异常处理机制
    LEACH分簇算法实现和能量控制算法实现
    利用MATLAB仿真最小发射功率下WSN的连通性和覆盖率
    利用MATLAB仿真节点个数和节点通信半径与网络连通率的关系
    STM32液晶显示HT1621驱动原理及程序代码
    LMC7660即-5V产生电路
    LM431精密+3.3V产生电路
    锂电池充电电路、锂电池充电保护电路
    AMS1117降压电路
  • 原文地址:https://www.cnblogs.com/shenxiaoquan/p/8697796.html
Copyright © 2011-2022 走看看