zoukankan      html  css  js  c++  java
  • spark学习(六)Java版RDD基本的基本操作

    1.map算子

    private static void map() {  
            //创建SparkConf  
            SparkConf conf = new SparkConf()  
                    .setAppName("map")  
                    .setMaster("local");  
    
            //创建JavasparkContext  
            JavaSparkContext sc = new JavaSparkContext(conf);  
    
            //构造集合  
            List<Integer> numbers = Arrays.asList(1,2,3,4,5);  
    
            //并行化集合,创建初始RDD  
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);  
    
            //使用map算子,将集合中的每个元素都乘以2  
            JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {  
                @Override  
                public Integer call(Integer v1) throws Exception {  
                    return v1 * 2;  
                }  
            });  
            //打印新的RDD  
            multipleNumberRDD.foreach(new VoidFunction<Integer>() {  
                @Override  
                public void call(Integer t) throws Exception {  
                    System.out.println(t);  
                }  
            });  
            //关闭JavasparkContext  
            sc.close();  
        }

    2.filter算子

    private static void filter() {  
            //创建SparkConf  
            SparkConf conf = new SparkConf()  
                        .setAppName("filter")  
                        .setMaster("local");  
    
            //创建JavaSparkContext   
            JavaSparkContext sc = new JavaSparkContext(conf);  
    
            //模拟集合  
            List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);  
    
            //并行化集合,创建初始RDD  
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);  
    
            //对集合使用filter算子,过滤出集合中的偶数  
            JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {  
                @Override  
                public Boolean call(Integer v1) throws Exception {  
                    return v1%2==0;  
                }  
            });  
            evenNumberRDD.foreach(new VoidFunction<Integer>() {  
                @Override  
                public void call(Integer t) throws Exception {  
                    System.out.println(t);  
                }  
    
            });  
            sc.close();  
        }

    3.flatMap算子

    Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;

    而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:

    操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象

    操作2:最后将所有对象合并为一个对象

    private static void flatMap() {  
            SparkConf conf = new SparkConf()  
                .setAppName("flatMap")  
                .setMaster("local");  
    
            JavaSparkContext sc = new JavaSparkContext(conf);  
    
            List<String> lineList = Arrays.asList("hello you","hello me","hello world");  
    
            JavaRDD<String> lines = sc.parallelize(lineList);  
    
            //对RDD执行flatMap算子,将每一行文本,拆分为多个单词  
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {  
                //在这里,传入第一行,hello,you  
                //返回的是一个Iterable<String>(hello,you)  
                @Override  
                public Iterable<String> call(String t) throws Exception {  
                    return Arrays.asList(t.split(" "));  
                }  
            });  
    
            words.foreach(new VoidFunction<String>() {  
                @Override  
                public void call(String t) throws Exception {  
                    System.out.println(t);  
                }  
            });  
            sc.close();  
        }

    4.groupByKey算子

    private static void groupByKey() {  
            SparkConf conf = new SparkConf()  
                    .setAppName("groupByKey")  
                    .setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);  
            List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
                    new Tuple2<String, Integer>("class1", 80),  
                    new Tuple2<String, Integer>("class2", 90),  
                    new Tuple2<String, Integer>("class1", 97),  
                    new Tuple2<String, Integer>("class2", 89));  
    
            JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);  
            //针对scoresRDD,执行groupByKey算子,对每个班级的成绩进行分组  
            //相当于是,一个key join上的所有value,都放到一个Iterable里面去了  
            JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();  
            groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {  
    
                @Override  
                public void call(Tuple2<String, Iterable<Integer>> t)  
                        throws Exception {  
                    System.out.println("class:" + t._1);  
                    Iterator<Integer> ite = t._2.iterator();  
                    while(ite.hasNext()) {  
                        System.out.println(ite.next());  
                    }  
                }  
            });  
        }

    5.reduceByKey算子

    private static void reduceByKey() {  
            SparkConf conf = new SparkConf()  
                    .setAppName("reduceByKey")  
                    .setMaster("local");  
    
            JavaSparkContext sc = new JavaSparkContext(conf);  
            List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
                    new Tuple2<String, Integer>("class1", 80),  
                    new Tuple2<String, Integer>("class2", 90),  
                    new Tuple2<String, Integer>("class1", 97),  
                    new Tuple2<String, Integer>("class2", 89));  
    
            JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);  
    
            //reduceByKey算法返回的RDD,还是JavaPairRDD<key,value>  
            JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {  
                @Override  
                public Integer call(Integer v1, Integer v2) throws Exception {  
                    return v1 + v2;  
                }  
            });  
    
            totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {  
                @Override  
                public void call(Tuple2<String, Integer> t) throws Exception {  
                    System.out.println(t._1 + ":" + t._2);  
    
                }  
            });  
            sc.close();  
        }

    6.sortByKey算子

    private static void sortByKey() {  
            SparkConf conf = new SparkConf()  
                    .setAppName("sortByKey")  
                    .setMaster("local");  
    
            JavaSparkContext sc = new JavaSparkContext(conf);  
    
            List<Tuple2<Integer, String>> scoreList = Arrays.asList(  
                    new Tuple2<Integer, String>(78, "marry"),  
                    new Tuple2<Integer, String>(89, "tom"),  
                    new Tuple2<Integer, String>(72, "jack"),  
                    new Tuple2<Integer, String>(86, "leo"));  
    
            JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);  
    
            JavaPairRDD<Integer, String> sortedScores = scores.sortByKey();  
            sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {  
                @Override  
                public void call(Tuple2<Integer, String> t) throws Exception {  
                    System.out.println(t._1 + ":" + t._2);  
                }  
            });  
            sc.close();  
        }

    7.join算子
    join算子用于关联两个RDD,join以后,会根据key进行join,并返回JavaPairRDD。JavaPairRDD的第一个泛型类型是之前两个JavaPairRDD的key类型,因为通过key进行join的。第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型

    private static void join() {  
            SparkConf conf = new SparkConf()  
                    .setAppName("join")  
                    .setMaster("local");  
    
            JavaSparkContext sc = new JavaSparkContext(conf);  
    
            List<Tuple2<Integer, String>> studentList = Arrays.asList(  
                    new Tuple2<Integer, String>(1, "tom"),  
                    new Tuple2<Integer, String>(2, "jack"),  
                    new Tuple2<Integer, String>(3, "marry"),  
                    new Tuple2<Integer, String>(4, "leo"));  
    
            List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(  
                    new Tuple2<Integer, Integer>(1, 78),  
                    new Tuple2<Integer, Integer>(2, 87),  
                    new Tuple2<Integer, Integer>(3, 89),  
                    new Tuple2<Integer, Integer>(4, 98));  
    
            //并行化两个RDD  
            JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);;  
            JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);  
    
            //使用join算子关联两个RDD  
            //join以后,会根据key进行join,并返回JavaPairRDD  
            //JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的  
            //第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型  
            JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);  
    
            //打印  
            studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {  
                @Override  
                public void call(Tuple2<Integer, Tuple2<String, Integer>> t)  
                        throws Exception {  
                    System.out.println("student id:" + t._1);  
                    System.out.println("student name:" + t._2._1);  
                    System.out.println("student score:" + t._2._2);  
                    System.out.println("==========================");  
                }  
            });  
            sc.close();  
        }

    更深的方法参见:
    http://blog.csdn.net/liulingyuan6/article/details/53397780
    http://blog.csdn.net/liulingyuan6/article/details/53410832
    https://www.2cto.com/net/201608/543044.html

  • 相关阅读:
    Docker删除某个容器时失败解决方案
    Docker搭建redis
    Django优雅集成MongoDB
    MongoDB学习笔记:文档Crud Shell
    MongoDB学习笔记:MongoDB 数据库的命名、设计规范
    MongoDB学习笔记:快速入门
    MongoDB学习笔记:Python 操作MongoDB
    在Docker中安装MongoDB
    Linux 挂载盘
    java中Array/List/Map/Object与Json互相转换详解(转载)
  • 原文地址:https://www.cnblogs.com/aibabel/p/10835488.html
Copyright © 2011-2022 走看看