假设有这样一个文件,文件内容如下
class1 90 class2 89 class1 87 class1 99 class2 100 class2 95 class1 78 class2 85 class1 92 class2 91
要求按照班级分组取出每个班前三名,源码如下:
package swy.spark.spark_study_java.core; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 分组取top3 * @author swy * */ public class GroupTop3 { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("ActionOperation") .setMaster("local"); //.setMaster("spark://192.168.43.124:7077"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("E://swy//resource//workspace-neno//spark-study-java//txt//score.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair( new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String s) throws Exception { String[] lineSpited = s.split(" "); return new Tuple2<String, Integer>(lineSpited[0], Integer.valueOf(lineSpited[1])); } }); //将成绩按班级分组 JavaPairRDD<String, Iterable<Integer>> groupPairs = pairs.groupByKey(); JavaPairRDD<String, Iterable<Integer>> top3Sores = groupPairs.mapToPair( new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>(){ private static final long serialVersionUID = 1L; public Tuple2<String, Iterable<Integer>> call ( Tuple2<String, Iterable<Integer>> classSorces) throws Exception { Integer[] top3 = new Integer[3]; String className = classSorces._1; Iterator<Integer> scores = classSorces._2.iterator(); while (scores.hasNext()) { Integer score = scores.next(); for (int i = 0; i<3; i++) { if (top3[i] == null) { top3[i] = score; break; } else if (score > top3[i]) { int tmp = top3[i]; top3[i] = score; if (i < top3.length - 1) { top3[i+1] = tmp; } break; } } } return new Tuple2<String, Iterable<Integer>>(className, Arrays.asList(top3)); } }); top3Sores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){ private static final long serialVersionUID = 1L; public void call(Tuple2<String, Iterable<Integer>> v) throws Exception { System.out.println("班级:" + v._1); System.out.println("前三名:" + v._2); } }); sc.close(); } }
topN的排序算法可以理解:
假如有三个山洞,一群土匪排着队来抢占山洞,按如下规则占领山洞,下面算法保证第一个山洞主人永远是最厉害的,以此类推
土匪 :待排序序列
三个山洞:top3
妻子:一个临时tmp变量
打架:比较大小
for (土匪 : i) { for (山洞 : i) { if( 山洞空)
入住 ;
break;
else (山洞有人:和原主人打架) {
if (打赢){ 入住 原主人带着妻子住下一个山洞(假如还有的话) }
break; } } }
实现:
while (scores.hasNext()) { Integer score = scores.next(); for (int i = 0; i<3; i++) { if (top3[i] == null) { top3[i] = score;
break; } else if (score > top3[i]) { int tmp = top3[i]; top3[i] = score; if (i < top3.length - 1) { top3[i+1] = tmp; }
break; } } }