zoukankan      html  css  js  c++  java
  • Spark入门(七)--Spark的intersection、subtract、union和distinc

    Spark的intersection

    intersection顾名思义,他是指交叉的。当两个RDD进行intersection后,将保留两者共有的。因此对于RDD1.intersection(RDD2) 和RDD2.intersection(RDD1) 。应该是一致的。

    比如对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7},对于包含这两个List的RDD来说,他们进行一次intersection应该得到result={3,4,5}

    Spark的subtract

    subtract则和intersection不同,他是找出两者之间不一致的内容。

    比如对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他们进行一次subtract得到的结果是跟顺序有关的。

    list1.subtract(list2) 
    

    结果应该为

    1 2
    

    而对于

    list2.subtract(list1) 
    

    结果应该为

    6 7
    

    Spark的union

    union最好理解,他是把两个RDD进行整合,但不考虑其中重复的情况。比如对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他们进行一次union得到的结果是跟顺序无关的。结果应该为

    result = {1,2,3,4,5,3,4,5,6,7}
    

    Spark的distinct

    distinc 是将RDD中重复的内容剔除,注意,这个剔除的过程并不会把重复的元素都去掉,而是重复的元素只保留一份。这当然很好理解,比如result = {1,2,3,4,5,3,4,5,6,7},进行一次distinct,则得到{1,2,3,4,5,6,7}

    一个综合的例子

    考虑到intersection、subtract、union和distinct比较常用,且在一个案例中能够很好体现其特点。因此我们这次获取的数据集是两个课程,lesson1和lesson2。lesson1中有十位同学,每个同学都有着许多个能力的估值,该估值是一个Int类型数据。lesson2中也是如此。对于这两个数据集我将其分别放在lesson1中和lesson2中。数据集和下面的代码均可以在github上找到并下载。

    数据集分析

    对于lesson1,里面有很多同学,每个同学又有很多次能力估值。在Spark入门(六)--Spark的combineByKey、sortBykey中已经提到过给每个人的成绩求平均分,因此这里不做这个处理。

    这两个数据集我们解决如下的问题:

    • 0、计算lesson1和lesson2中每个同学的能力总估值
    • 1、找出lesson1中所有的同学(不重复)
    • 2、找出lesson2中所有同学(不重复)
    • 3、找出选了两门课程的同学
    • 4、找出只在lesson1而不在lesson2中的同学
    • 5、找出只在lesson2而不在lesson1中的同学

    数据的部分内容展示

    对于第0个问题,因为用到的并非本节的内容,因此标注为0。要求每个课程中的每个同学能力的总估值,首先要对数据进行处理,按空格拆分。拆分后的数据应该是(姓名,分数)的元组集合,然后根据姓名对分数进行累加。

    • 第一个问题中找出lesson1中所有同学,只要得到了每个同学能力的总估值,去掉分数,即可知道lesson1中的所有同学。

    • 第二题同理。

    • 第三题要找出选了两门课的同学,则要对两门课所有的同学进行一次整合,然后剔除重复的数据,即先union再distinc

    • 第四题要找到lesson1中而不在lesson二中的同学,则只要对lesson1的同学和lesson2中的同学进行一次substract即可

    • 第五题同理

    scala实现

    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkIntersectionAndSubtract {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtract")
    
        val sc = new SparkContext(conf)
    
        //课程一中的数据
        val lesson1Data = sc.textFile("./lesson1").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))
    
        //将课程一中每个人的分数相加
        val lesson1Grade = lesson1Data.reduceByKey(_+_)
    
        val lesson1Student = lesson1Grade.map(x=>x._1)
    
        //课程二中的数据处理
        val lesson2Data = sc.textFile("./lesson2").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))
    
        //将课程二中每个人的分数相加
        val lesson2Grade = lesson2Data.reduceByKey((x,y)=>x+y)
    
        val lesson2Student = lesson2Grade.map(x=>x._1)
    
        //在课程一中的人且在课程二中的人的集合
        println("Students On Lesson1 And On Lesson2")
        lesson1Student.intersection(lesson2Student).foreach(println)
    
        //在课程二中的人且在课程一中的人的集合,与上面的结果一致
        println("Students On Lesson1 And On Lesson2")
        lesson2Student.intersection(lesson1Student).foreach(println)
    
        //在课程一中的人但不在课程二中的人的集合
        println("Students Only In Lesson1")
        val onlyInLesson1 = lesson1Student.subtract(lesson2Student)
        onlyInLesson1.foreach(println)
    
        //在课程二中的人但不在课程二中的人的集合
        println("Students Only In Lesson2")
        val onlyInLesson2 = lesson2Student.subtract(lesson1Student)
        onlyInLesson2.foreach(println)
    
    
        //只选了一门课的同学
        println("Students Only Choose One Lesson")
        lesson1Student.union(lesson2Student).foreach(println)
    
        //两门课所有学生(不重复)
        println("All the students")
        lesson1Student.union(lesson2Student).distinct().foreach(print)
    
    
      }
    
    }
    

    java实现

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    
    public class SparkIntersectionAndSubtractJava {
    
        public static void main(String[] args){
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtractJava");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //java7实现
            intersectionAndSubtractJava(sc);
    
            //java8实现
            intersectionAndSubtractJava8(sc);
        }
    
    
        public static void intersectionAndSubtractJava(JavaSparkContext sc){
    
            JavaRDD<String> lesson1Data = sc.textFile("./lesson1");
    
            JavaRDD<String> lesson2Data = sc.textFile("./lesson2");
    
            JavaPairRDD<String,Integer> lesson1InfoData = lesson1Data.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
                }
            });
    
            JavaPairRDD<String,Integer> lesson2InfoData = lesson2Data.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
                }
            });
    
            JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer+integer2;
                }
            });
    
            JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer+integer2;
                }
            });
    
            JavaRDD<String> lesson1Students = lesson1Grades.map(new Function<Tuple2<String, Integer>, String>() {
                @Override
                public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    return stringIntegerTuple2._1;
                }
            });
    
            JavaRDD<String> lesson2Students = lesson2Grades.map(new Function<Tuple2<String, Integer>, String>() {
                @Override
                public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    return stringIntegerTuple2._1;
                }
            });
    
            //既在lesson1中又在lesson2中的学生
            System.out.println("Students On Lesson1 And On Lesson2");
            lesson1Students.intersection(lesson2Students).foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            //既在lesson2中又在lesson1中的学生,与上面的结果一致
            System.out.println("Students On Lesson1 And On Lesson2");
            lesson2Students.intersection(lesson1Students).foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            //只在lesson1中而不在lesson2中的学生
            JavaRDD<String> studensOnlyInLesson1 = lesson1Students.subtract(lesson2Students);
            System.out.println("Students Only In Lesson1");
            lesson1Students.subtract(lesson2Students).foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            //只在lesson2中而不在lesson1中的学生
            JavaRDD<String> studensOnlyInLesson2 = lesson2Students.subtract(lesson1Students);
            System.out.println("Students Only In Lesson2");
            studensOnlyInLesson2.foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            //只选了一门课的学生
            JavaRDD<String> onlyOneLesson = studensOnlyInLesson1.union(studensOnlyInLesson2);
            System.out.println("Students Only Choose One Lesson");
            onlyOneLesson.foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            System.out.println("All the students");
            lesson1Students.union(lesson2Students).distinct().foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
        }
    
        public static void intersectionAndSubtractJava8(JavaSparkContext sc){
    
            JavaRDD<String> lesson1Data = sc.textFile("./lesson1");
    
            JavaRDD<String> lesson2Data = sc.textFile("./lesson2");
    
    
            JavaPairRDD<String,Integer> lesson1InfoData =
            lesson1Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));
    
    
            JavaPairRDD<String,Integer> lesson2InfoData =
            lesson2Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));
    
    
            JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey((x,y) -> x+y);
    
            JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey((x,y) -> x+y);
    
    
            JavaRDD<String> studentsInLesson1 = lesson1Grades.map(x->x._1);
    
            JavaRDD<String> studentsInLesson2 = lesson2Grades.map(x->x._1);
    
            //既在lesson1中又在lesson2中的学生
            studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));
    
            //既在lesson2中又在lesson1中的学生,与上面的结果一致
            studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));
    
            //只在lesson1中的学生
            JavaRDD<String> studentsOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2);
            studentsOnlyInLesson1.foreach(name -> System.out.println(name));
    
            //只在lesson2中的学生
            JavaRDD<String> studentsOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1);
            studentsOnlyInLesson2.foreach(name -> System.out.println(name));
    
    
            //只选了一门课的学生
            JavaRDD<String> studentsOnlyOneLesson = studentsOnlyInLesson1.union(studentsInLesson2);
            studentsOnlyOneLesson.foreach(name -> System.out.println(name));
    
    
            studentsInLesson1.union(studentsInLesson2).distinct().foreach(name -> System.out.println(name));
    
    
        }
    
    }
    

    python实现

    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
    
    sc = SparkContext(conf=conf)
    
    #lesson1数据
    lesson1Data = sc.textFile("./lesson1").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))
    
    #lesson2数据
    lesson2Data = sc.textFile("./lesson2").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))
    
    #lesson1中每个人的总分
    lesson1InfoData = lesson1Data.reduceByKey(lambda x,y:x+y)
    
    #lesson2中每个人的总分
    lesson2InfoData = lesson2Data.reduceByKey(lambda x,y:x+y)
    
    #lesson1中的学生
    studentsInLesson1 = lesson1InfoData.map(lambda x:x[0])
    
    #lesson2中的学生
    studentsInLesson2 = lesson2InfoData.map(lambda x:x[0])
    
    #在lesson1中且在lesson2中的学生
    print("Students On Lesson1 And On Lesson2")
    studentsInLesson1.intersection(studentsInLesson2).foreach(print)
    
    #在lesson2中且在lesson1中的学生,与上面的结果一致
    print("Students On Lesson1 And On Lesson2")
    studentsInLesson2.intersection(studentsInLesson1).foreach(print)
    
    #只在lesson1中的学生
    print("Students Only In Lesson1")
    studensOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2)
    studensOnlyInLesson1.foreach(print)
    
    
    #只在lesson2中的学生
    print("Students Only In Lesson2")
    studensOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1)
    studensOnlyInLesson2.foreach(print)
    
    
    #只选了一门课的学生
    print("Students Only Choose One Lesson")
    studensOnlyInLesson1.union(studensOnlyInLesson2).foreach(print)
    
    #两门课所有学生(不重复)
    print("All the students")
    studentsInLesson1.union(studentsInLesson2).distinct().foreach(print)
    
    
    

    运行得到结果

    Students On Lesson1 And On Lesson2
    Vicky
    Amy
    Lili
    Bob
    Coco
    
    Students On Lesson1 And On Lesson2
    Vicky
    Amy
    Lili
    Coco
    Bob
    
    Students Only In Lesson1
    Bill
    David
    Mike
    Nancy
    Lucy
    
    Students Only In Lesson2
    White
    Jimmy
    Jason
    John
    Frank
    
    Students Only Choose One Lesson
    Bill
    David
    Mike
    Nancy
    Lucy
    White
    Jimmy
    Jason
    John
    Frank
    
    All the students
    Vicky
    Bill
    Amy
    White
    Jimmy
    Jason
    Lili
    David
    Bob
    Mike
    Coco
    Nancy
    Lucy
    John
    Frank
    

    通过上面的例子,非常具体地应用了intersection、subtract、union和distinct来解决具体的问题。并且利用好这几个方法能够很快速地进行一些数据集之间的关系操作。事实上,直接利用这几种方法比我们自己动手实现要好很多,因为spark中对这几种方法进行了优化。

    数据集和代码均可以在github上找到并下载



    转自:https://juejin.im/post/5c7b92276fb9a049bb7d0d10

  • 相关阅读:
    一文梳理Ubuntu下Eigen矩阵运算库总结教程
    Ubuntu下安装与使用Eigen矩阵运算库教程
    Ubuntu下cmake教程实践从入门到会用
    collection of vim vim tutorial for beginner
    利用ipython实现多线程
    如何快速地从mongo中提取数据到numpy以及pandas中去
    Git Push 避免用户名和密码方法
    如何使用scikit—learn处理文本数据
    format格式
    fk输入地壳模型容易出错的地方
  • 原文地址:https://www.cnblogs.com/tjp40922/p/12181639.html
Copyright © 2011-2022 走看看