package com.swust.seltop;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.*;
/**
*
* @author 雪瞳
* @Slogan 时钟尚且前行,人怎能再此止步!
* @Function 分组取TopN
*
*/
public class SortTopN {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("top");
JavaSparkContext jsc = new JavaSparkContext(conf);
jsc.setLogLevel("Error");
String inputPath = "./data/top.txt";
JavaRDD<String> input = jsc.textFile(inputPath,1);
//top10类
JavaPairRDD<String, Integer> pairRDD = input.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
// 14 cat1 cat1
String[] splits = line.split(" ");
Tuple2<String, Integer> tp = new Tuple2<>(splits[0]+" "+splits[1]+" "+splits[2], Integer.parseInt(splits[0]));
return tp;
}
});
//为每一个分区创建一个本地 top10列表
JavaRDD<SortedMap<Integer, String>> singleTop10 = pairRDD.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iterator) throws Exception {
SortedMap<Integer, String> top = new TreeMap<>();
while (iterator.hasNext()) {
Tuple2<String, Integer> next = iterator.next();
top.put(next._2, next._1);
//保留正序前10
if (top.size() > 10) {
top.remove(top.firstKey());
}
}
List<SortedMap<Integer, String>> list = Collections.singletonList(top);
return list.iterator();
}
});
//收集所有本地的top10 列表
List<SortedMap<Integer, String>> singleResult = singleTop10.collect();
SortedMap<Integer,String> finalResult = new TreeMap<>();
for (SortedMap<Integer, String> elements : singleResult){
//遍历map并将数据存储到finalResult内
Set<Map.Entry<Integer, String>> entries = elements.entrySet();
for (Map.Entry<Integer,String> entry:entries){
finalResult.put(entry.getKey(),entry.getValue());
}
if (finalResult.size()>10){
finalResult.remove(finalResult.firstKey());
}
}
//输出结果
for (Map.Entry<Integer,String> entry : finalResult.entrySet()){
System.err.println(entry.getKey()+"------"+entry.getValue());
}
// 替代方案 使用reduce进行数据迭代
/*singleTop10.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() {
@Override
public SortedMap<Integer, String> call(SortedMap<Integer, String> sm1, SortedMap<Integer, String> sm2) throws Exception {
SortedMap<Integer,String> top10 = new TreeMap<>();
for (Map.Entry<Integer,String> entry : sm1.entrySet()){
top10.put(entry.getKey(),entry.getValue());
if (top10.size()>10){
top10.remove(top10.firstKey());
}
}
for (Map.Entry<Integer,String> entry : sm2.entrySet()){
top10.put(entry.getKey(),entry.getValue());
if (top10.size()>10){
top10.remove(top10.firstKey());
}
}
return top10;
}
});*/
}
}