zoukankan      html  css  js  c++  java
  • Spark查找某人最高的N次成绩

    `package com.shsxt.java.core.demo;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;

    public class TopN {
    public static void main(String[] args) {
    SparkConf conf;
    conf = new SparkConf().setMaster("local").setAppName("a");

        JavaSparkContext sc = new JavaSparkContext(conf);
    
        //a 80
        //a 78
        JavaRDD<String> linesRDD = sc.textFile("data\scores.txt");
    
        /**
         * 将数据转换为kv格式
         */
        JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] strings = s.split(" ");
                return new Tuple2<>(strings[0], Integer.valueOf(strings[1]));
            }
        });
    
        /**
         * 将数据按名字进行分组,通过迭代器筛选出个人分数的前N
         */
        JavaPairRDD<String, Iterable<Integer>> groupRdd = pairRDD.groupByKey();
        groupRdd.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> s) throws Exception {
                String clazzName = s._1;
                Iterator<Integer> iterator = s._2.iterator();
    
                /**
                 * 创建数组[N]
                 * 遍历迭代器{
                 * 先将数组[N]装满
                 * 当前next()>[i]时,查看[i]是否在[N]中最小,next()替代[N]中最小的值}
                 */
                Integer[] top3 = new Integer[3];
                while (iterator.hasNext()) {
                    Integer score = iterator.next();
                    for (int i = 0; i < top3.length; i++) {
                        if (top3[i] == null) {
                            top3[i] = score;
                            break;
                        } else if (score > top3[i]) {
                            for (int j = 2; j > i; j--) {
                                top3[j] = top3[j - i];
                            }
                            top3[i] = score;
                            break;
                        }
                    }
                }
                for (Integer score : top3) {
                    System.out.println(clazzName + " " + score);
                }
            }
        });
    
        /**
         * 对结果数据进行排序、输出
         * 
         */
        groupRdd.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tuple2) throws Exception {
                String key = tuple2._1;
                Iterator<Integer> iterator = tuple2._2.iterator();
                List<Integer> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    list.add(iterator.next());
                }
                Collections.sort(list);
                for (int i = 0; i < Math.min(3, list.size()); i++) {
                    System.out.println(key + " " + list.get(list.size() - i - 1));
                }
            }
        });
        sc.stop();
    }
    

    }**------------恢复内容开始------------** package com.shsxt.java.core.demo;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;

    public class TopN {
    public static void main(String[] args) {
    SparkConf conf;
    conf = new SparkConf().setMaster("local").setAppName("a");

        JavaSparkContext sc = new JavaSparkContext(conf);
    
        //a 80
        //a 78
        JavaRDD<String> linesRDD = sc.textFile("data\scores.txt");
    
        /**
         * 将数据转换为kv格式
         */
        JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] strings = s.split(" ");
                return new Tuple2<>(strings[0], Integer.valueOf(strings[1]));
            }
        });
    
        /**
         * 将数据按名字进行分组,通过迭代器筛选出个人分数的前N
         */
        JavaPairRDD<String, Iterable<Integer>> groupRdd = pairRDD.groupByKey();
        groupRdd.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> s) throws Exception {
                String clazzName = s._1;
                Iterator<Integer> iterator = s._2.iterator();
    
                /**
                 * 创建数组[N]
                 * 遍历迭代器{
                 * 先将数组[N]装满
                 * 当前next()>[i]时,查看[i]是否在[N]中最小,next()替代[N]中最小的值}
                 */
                Integer[] top3 = new Integer[3];
                while (iterator.hasNext()) {
                    Integer score = iterator.next();
                    for (int i = 0; i < top3.length; i++) {
                        if (top3[i] == null) {
                            top3[i] = score;
                            break;
                        } else if (score > top3[i]) {
                            for (int j = 2; j > i; j--) {
                                top3[j] = top3[j - i];
                            }
                            top3[i] = score;
                            break;
                        }
                    }
                }
                for (Integer score : top3) {
                    System.out.println(clazzName + " " + score);
                }
            }
        });
    
        /**
         * 对结果数据进行排序、输出
         * 
         */
        groupRdd.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tuple2) throws Exception {
                String key = tuple2._1;
                Iterator<Integer> iterator = tuple2._2.iterator();
                List<Integer> list = new ArrayList<>();
                while (iterator.hasNext()) {
                    list.add(iterator.next());
                }
                Collections.sort(list);
                for (int i = 0; i < Math.min(3, list.size()); i++) {
                    System.out.println(key + " " + list.get(list.size() - i - 1));
                }
            }
        });
        sc.stop();
    }
    

    }`
    ------------恢复内容结束------------

  • 相关阅读:
    错误libvirtError: invalid argument: could not find capabilities for domaintype=kvm
    容器部署ES 和 ES head插件
    squid配置yum源代理服务器
    coredns 1.2.2 反复重启问题
    ansible debugger 模块
    入门篇-contrail-command(对接openstack)All-In-One
    目标文件是什么鬼?
    汇编指令集
    切换GCC编译器版本
    kubernetes-dashboard登录出现forbidden 403
  • 原文地址:https://www.cnblogs.com/DemonQin/p/13049662.html
Copyright © 2011-2022 走看看