和hadoop的目的一样,给你数据,然后取TopN。数据如下:
取出数据在排名前十的数据。
代码如下:
package com.test.book; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkTon { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SparkTon").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("/Users/mac/Desktop/TopN2.txt"); // 将数据读进来,拆分为Tuple(String,Integer)这种形式 JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String t) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(t.split(",")[0], Integer.valueOf(t.split(",")[1])); } }); // 按照整个分区来处理。 JavaRDD<SortedMap<Integer, String>> pairspart = pairRDD .mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() { private static final long serialVersionUID = 1L; SortedMap<Integer, String> top10 = new TreeMap<Integer, String>(); @Override public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> t) throws Exception { while (t.hasNext()) { Tuple2<String, Integer> tuple2 = t.next(); top10.put(tuple2._2, tuple2._1); if (top10.size() > 10) { top10.remove(top10.firstKey()); } } return Collections.singleton(top10); } }); // 把各个分区处理好的数据拿过来。 List<SortedMap<Integer, String>> allTop10 = pairspart.collect(); // 在Reduce端用TreeMap对之前的分区数据排序。 SortedMap<Integer, String> finalmap = new TreeMap<Integer, String>(); // 遍历每个分区的SortedMap结构 for (SortedMap<Integer, String> localTop10 : allTop10) { for (Map.Entry<Integer, String> entry : localTop10.entrySet()) { finalmap.put(entry.getKey(), entry.getValue()); if (finalmap.size() > 10) { finalmap.remove(finalmap.firstKey()); } } } // 打印出来。 Set values = finalmap.keySet(); Iterator<Integer> iterator = values.iterator(); while (iterator.hasNext()) { System.out.println(finalmap.get(iterator.next())); } } }
结果: