zoukankan      html  css  js  c++  java
  • Spark练习之Transformation操作开发

    一、map:将集合中的每个元素乘以2

    1.1 Java

    /**
         * map算子:将集合中的每一个元素都乘以2
         */
        private static void map() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("map")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //使用map算子,将集合中的每个元素都乘以2
            //map算子,是对任何类型的RDD,都可以调用的
            //在Java中,map算子接收的参数时Function对象
            //创建的Function对象,一定会让你设置第二个泛型参数,这个泛型参数,就是返回的新元素的类型
            //同时call()方法的返回类型,也必须与第二个泛型类型同步
            //在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素
            //所有新的元素就会组成一个新的RDD
            JavaRDD<Integer> multipleNUmberRDD = numberRDD.map(new Function<Integer, Integer>() {
                private static final long serivalVersionUID = 1L;
    
                //传入call方法的,是1,2,3,4,5
                //返回的就是2,4,6,8,10
                @Override
                public Integer call(Integer integer) throws Exception {
                    return integer * 2;
                }
            });
            //打印新的RDD
            multipleNUmberRDD.foreach(new VoidFunction<Integer>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public void call(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
    
            //关闭JavaSparkContext
            sc.close();
        }
    

    1.2 Scala

    def map(): Unit = {
        val conf = new SparkConf().setAppName("map").setMaster("local")
        val sc = new SparkContext(conf)
        val numbers = Array(1, 2, 3, 4, 5)
        val numberRDD = sc.parallelize(numbers, 1)
        val multipleNumberRDD = numberRDD.map(num => num * 2)
        multipleNumberRDD.foreach(num => println(num))
    
      }
    

    二、filter:过滤出集合中的偶数

    2.1 Java

    /**
         * filter算子:过滤集合中的偶数
         */
        private static void filter() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("map")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            //并行化集合,创建初始RDD
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            //对初始化RDD执行filter算子,过滤出其中的偶数
            //filter算子操作,传入的也是Function,其他的使用注意点,和map是一样的
            //但是,唯一的不同,就是call()方法的返回类型是Boolean
            //每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑
            //来判断这个元素是否是你想要的
            //如果想在新的RDD中保留这个元素,那么就返回true,否则,返回false
            JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
                private static final long serivalVersionUID = 1L;
    
                //传入call方法的,是1,2,3,4,5
                //返回的就是2,4,6,8,10
                @Override
                public Boolean call(Integer integer) throws Exception {
                    return integer % 2 == 0;
                }
            });
            //打印新的RDD
            evenNumberRDD.foreach(new VoidFunction<Integer>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public void call(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
    
            //关闭JavaSparkContext
            sc.close();
        }
    

    2.2 Scala

    def filter(): Unit = {
        val conf = new SparkConf().setAppName("filter").setMaster("local")
        val sc = new SparkContext(conf)
        val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numberRDD = sc.parallelize(numbers, 1)
        val multipleNumberRDD = numberRDD.filter(num => num % 2 == 0)
        multipleNumberRDD.foreach(num => println(num))
      }
    

    三、flatMap:将行拆分为单词

    3.1 Java

    /**
         * flatMap算子:过滤集合中的偶数
         */
        private static void flatMap() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("flatMap")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            //构造集合
            List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
            //并行化集合,创建初始RDD
            JavaRDD<String> lines = sc.parallelize(lineList);
            //对初始化RDD执行flatMap算子,将每一行文本,拆分为多个单词
            //flatMap算子,在Java中,接收的参数的FlagMapFunction
            //需要自定义FlatMapFunction的第二个泛型类型,即代表了返回的新元素的类型
            //call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同
            //flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素
            //多个元素,即封装在Iterator集合中,可以使用ArrayList等集合
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return (Iterator<String>) Arrays.asList(s.split(" "));
                }
            });
            //打印新的RDD
            words.foreach(new VoidFunction<String>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public void call(String t) throws Exception {
                    System.out.println(t);
                }
            });
    
            //关闭JavaSparkContext
            sc.close();
        }
    

    3.2 Scala

     def flatMap(): Unit = {
        val conf = new SparkConf().setAppName("flatMap").setMaster("local")
        val sc = new SparkContext(conf)
        val lineArray = Array("hello you", "hello me", "hello world")
        val lines = sc.parallelize(lineArray, 1)
        val words = lines.flatMap(line => line.split(" "))
        words.foreach(word => println(word))
      }
    

    四、groupByKey:将每个班级的成绩进行分组

    4.1 Java

    /**
         * groupNyKey算子:按照班级对成绩进行分组
         */
        private static void groupByKey() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("groupByKey")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //构造集合
            List<Tuple2<String, Integer>> scoresList = Arrays.asList(
                    new Tuple2<>("class1", 80),
                    new Tuple2<>("class2", 88),
                    new Tuple2<>("class1", 80),
                    new Tuple2<>("class2", 90));
            //并行化集合,创建JavaPairRDD
            JavaPairRDD<String, Integer> scores = sc.<String, Integer>parallelizePairs(scoresList);
            //针对Scores RDD,执行groupByKey算子,对每个班级的成绩进行分组
            //groupByKey算子,返回的还是JavaPairRDD
            //但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型会变成Iterable这种集合类型
            //也就是说,按照了Key进行分组,每个key可能都会有多个value,此时多个value聚合成了Iterable
            //那么接下来,就可以通过groupedScores这种JavaPairRDD处理某个分组内的数据
            JavaPairRDD<String, Iterable<Integer>> groupEdScores = scores.groupByKey();
            //打印groupedScores RDD
            groupEdScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
                private static final long serivalVersionUID = 1L;
    
                //对每个key,都会将其value,依次传入call方法
                //从而聚合出每个key对应的一个value
                //然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素
                @Override
                public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                    System.out.println("class:" + stringIterableTuple2._1);
                    Iterator<Integer> ite = stringIterableTuple2._2.iterator();
                    while (ite.hasNext()) {
                        System.out.println(ite.next());
                    }
                    System.out.println("====================================");
                }
            });
            //关闭JavaSparkContext
            sc.close();
        }
    

    2.2 Scala

    def groupByKey(): Unit = {
        val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
        val sc = new SparkContext(conf)
        val scoreList = Array(new Tuple2[String, Integer]("class1", 80),
          new Tuple2[String, Integer]("class2", 88),
          new Tuple2[String, Integer]("class1", 80),
          new Tuple2[String, Integer]("class2", 90))
        val scores = sc.parallelize(scoreList, 1)
        val groupedScores = scores.groupByKey()
        groupedScores.foreach(
          score => {
            println(score._1)
            score._2.foreach(singleScore => println(singleScore))
            println("===========")
          }
        )
      }
    

    五、reduceByKey:统计每个班级的总分

    5.1 Java

    /**
         * reduceNyKey算子:统计每个班级的总分
         */
        private static void reduceNyKey() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("reduceNyKey")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //构造集合
            List<Tuple2<String, Integer>> scoresList = Arrays.asList(
                    new Tuple2<>("class1", 80),
                    new Tuple2<>("class2", 88),
                    new Tuple2<>("class1", 80),
                    new Tuple2<>("class2", 90));
            //并行化集合,创建JavaPairRDD
            JavaPairRDD<String, Integer> scores = sc.<String, Integer>parallelizePairs(scoresList);
            //针对Scores RDD,执行reduceByKey算子
            //reduceByKey,接收的参数时Function2类型,它有三个泛型参数,实际上代表了3个值
            //第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型
            //因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入
            //因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型
            //第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的
            //reduceByKey算子返回的RDD,还是JavaPairRDD<key,value>
            JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
            //打印totalScore RDD
            //打印groupedScores RDD
            totalScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t._1 + ":" + t._2);
    
                }
            });
            //关闭JavaSparkContext
            sc.close();
        }
    

    5.2 Scala

    def reduceByKey(): Unit = {
        val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
        val sc = new SparkContext(conf)
        val scoreList = Array(new Tuple2[String, Integer]("class1", 80),
          new Tuple2[String, Integer]("class2", 88),
          new Tuple2[String, Integer]("class1", 80),
          new Tuple2[String, Integer]("class2", 90))
        val scores = sc.parallelize(scoreList, 1)
        val totalScores = scores.reduceByKey(_ + _)
        totalScores.foreach(classScore => println(classScore._1 + ":" + classScore._2))
      }
    

    六、sortByKey:将学生分数进行排序

    6.1 Java

    /**
         * sortByKey算子:按照学生分数进行排序
         */
        private static void sortByKey() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("sortByKey")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //构造集合
            List<Tuple2<Integer, String>> scoresList = Arrays.asList(
                    new Tuple2<>(65, "leo"),
                    new Tuple2<>(60, "tom"),
                    new Tuple2<>(90, "marry"),
                    new Tuple2<>(88, "jack"));
            //并行化集合,创建JavaPairRDD
            JavaPairRDD<Integer, String> scores = sc.<Integer, String>parallelizePairs(scoresList);
    
            //对scoreRDD执行sortByKey算子
            //sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序
            //返回的,还是JavaPairRDD,其中的元素内容和原始的RDD一样
            //只是RDD中的元素顺序不同了
            JavaPairRDD<Integer, String> sortedScored = scores.sortByKey();
    
            //打印sortedScored RDD
            sortedScored.foreach(new VoidFunction<Tuple2<Integer, String>>() {
                private static final long serivalVersionUID = 1L;
    
                @Override
                public void call(Tuple2<Integer, String> t) throws Exception {
                    System.out.println(t._1 + ":" + t._2);
                }
            });
            //关闭JavaSparkContext
            sc.close();
        }
    

    6.2 Scala

    def sortByKey(): Unit = {
        val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
        val sc = new SparkContext(conf)
        val scoreList = Array(new Tuple2[Integer, String](90, "cat"),
          new Tuple2[Integer, String](80, "leo"),
          new Tuple2[Integer, String](80, "opp"),
          new Tuple2[Integer, String](55, "lll"))
        val scores = sc.parallelize(scoreList, 1)
        val totalScores = scores.sortByKey()
        totalScores.foreach(studentScore => println(studentScore._1 + ":" + studentScore._2))
      }
    

    七、join:打印每个学生的成绩

    7.1 Java

    /**
         * join算子:打印学生成绩
         */
        private static void join() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("join")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //学生集合
            List<Tuple2<Integer, String>> studentList = Arrays.asList(
                    new Tuple2<>(1, "leo"),
                    new Tuple2<>(2, "tom"),
                    new Tuple2<>(3, "marry"),
                    new Tuple2<>(4, "jack"));
            //分数集合
            List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                    new Tuple2<>(1, 100),
                    new Tuple2<>(2, 80),
                    new Tuple2<>(3, 50),
                    new Tuple2<>(4, 20));
            //并行化两个RDD
            JavaPairRDD<Integer, String> student = sc.<Integer, String>parallelizePairs(studentList);
            JavaPairRDD<Integer, Integer> scores = sc.<Integer, Integer>parallelizePairs(scoreList);
    
            //使用join算子关联两个RDD
            //join以后,还是会根据key进行join,并返回JavaPairRDD
            //但是JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为是通过key进行join的
            //第二个泛型类型,是Tuple2<v1,v2>的类型,Tuple2的两个反省分别为原始RDD的value的类型
            //join,就返回的RDD的每一个元素,就是通过Key join上的一个pair
            //比如有(1,1)(1,2)(1,3)的一个RDD和(1,4)(2,1)(2.2)的一个RDD
            //join以后,实际上会得到(1,(1,4))(1,(2,4))(1,(3,4))
            JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = student.<Integer>join(scores);
    
            //打印sortedScored RDD
            studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
                @Override
                public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
                    System.out.println("student id:" + t._1);
                    System.out.println("student name:" + t._2._1);
                    System.out.println("student score:" + t._2._2);
                    System.out.println("==============");
                }
            });
    
    
            //关闭JavaSparkContext
            sc.close();
        }
    

    7.2 Scala

    def join(): Unit = {
        val conf = new SparkConf().setAppName("join").setMaster("local")
        val sc = new SparkContext(conf)
    
        //学生集合
        val studentList = Array(
          new Tuple2[Integer, String](1, "leo"),
          new Tuple2[Integer, String](2, "tom"),
          new Tuple2[Integer, String](3, "marry"))
        //分数集合
        val scoreList = Array(
          new Tuple2[Integer, Integer](1, 100), new Tuple2[Integer, Integer](2, 80),
          new Tuple2[Integer, Integer](3, 50), new Tuple2[Integer, Integer](1, 70),
          new Tuple2[Integer, Integer](2, 10), new Tuple2[Integer, Integer](3, 40))
    
        val student = sc.parallelize(studentList)
        val scores = sc.parallelize(scoreList)
    
        val studentScores = student.join(scores)
        studentScores.foreach(studentScore => println({
          System.out.println("student id:" + studentScore._1)
          System.out.println("student name:" + studentScore._2._1)
          System.out.println("student score:" + studentScore._2._2)
          System.out.println("==============")
        }))
      }
    

    八、cogroup:打印每个学生的成绩

    8.1 Java

    /**
         * cogroup算子:打印学生成绩
         */
        private static void cogroup() {
            //创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("cogroup")
                    .setMaster("local");
            //创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //学生集合
            List<Tuple2<Integer, String>> studentList = Arrays.asList(
                    new Tuple2<>(1, "leo"),
                    new Tuple2<>(2, "tom"),
                    new Tuple2<>(3, "marry"));
            //分数集合
            List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                    new Tuple2<>(1, 100),
                    new Tuple2<>(2, 80),
                    new Tuple2<>(3, 50),
                    new Tuple2<>(1, 70),
                    new Tuple2<>(2, 10),
                    new Tuple2<>(3, 40));
            //并行化两个RDD
            JavaPairRDD<Integer, String> student = sc.<Integer, String>parallelizePairs(studentList);
            JavaPairRDD<Integer, Integer> scores = sc.<Integer, Integer>parallelizePairs(scoreList);
    
            //cogroup与join不同
            //相当于是,一个key join上的所有value,都给放到一个Iterable里面去了
            JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = student.<Integer>cogroup(scores);
    
            //打印sortedScored RDD
            studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
                @Override
                public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
                    System.out.println("student id:" + t._1);
                    System.out.println("student name:" + t._2._1);
                    System.out.println("student score:" + t._2._2);
                    System.out.println("==============");
                }
            });
            //关闭JavaSparkContext
            sc.close();
        }
    

    九、main函数

    9.1 Java

    public static void main(String[] args) {
            //map();
            //filter();
            //flatMap();
            //groupByKey();
            //reduceNyKey();
            //sortByKey();
            //join();
            cogroup();
        }
    

    9.2 Scala

    def main(args: Array[String]) {
        //map()
        //filter()
        //flatMap()
        //groupByKey()
        //reduceByKey()
        //sortByKey()
        join()
      }
    
  • 相关阅读:
    Django【进阶篇-缓存类型】
    深度剖析Kubernetes API Server三部曲
    深度剖析Kubernetes API Server三部曲
    深度剖析Kubernetes API Server三部曲
    Istio技术与实践03:最佳实践之sidecar自动注入
    原来你是这样的PaaS!
    5分钟APIG实战: 使用Rust语言快速构建API能力开放
    Log4J日志配置详解
    cookie是如何保存到客户端,又是如何发送到服务端
    session cookie
  • 原文地址:https://www.cnblogs.com/aixing/p/13327440.html
Copyright © 2011-2022 走看看