zoukankan      html  css  js  c++  java
  • spark记录(3)spark算子之Transformation

    一、map、flatMap、mapParations、mapPartitionsWithIndex

    1.1 map

    map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。

    (1) 使用Java进行编写

    public static void map() {
            List<String> list = Arrays.asList("李光洙","刘在石","哈哈","宋智孝");
            JavaRDD<String> rdd = jsc.parallelize(list);
            
            JavaRDD<String> map = rdd.map(new Function<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public String call(String name) throws Exception {
                    return "hello,"+name;
                }
            });
            map.foreach(new VoidFunction<String>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String msg) throws Exception {
                    System.out.println(msg);
                }
            });
            
        }

    (2) 使用scala进行编写

      def map(): Unit = {
        val list = List("李光洙","刘在石","哈哈","宋智孝");
        val rdd = sc.parallelize(list)
        val map = rdd.map(s => "hello," + s).foreach(println) 
      }

    (3)运行结果

    (4) 总结

    可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。

    1.2 flatMap

    flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。

    (1) 使用Java进行编写

        public static void flatmap() {
            List<String> list = Arrays.asList("李光洙 刘在石","哈哈 宋智孝");
            JavaRDD<String> rdd = jsc.parallelize(list);
            
            JavaRDD<String> map = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            }).map(new Function<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public String call(String s) throws Exception {
                    return "你好," + s;
                }
            });
            
            map.foreach(new VoidFunction<String>() {
                
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
            
        }

    (2) 使用scala进行编写

      def flatmap(): Unit = {
        val list = List("李光洙 刘在石","哈哈 宋智孝");
        val rdd = sc.parallelize(list)
        rdd.flatMap(_.split(" ")).map("你好,"+_).foreach(println)
      }

    (3) 运行结果

    (4) 总结

    flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。

    map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。

    map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。而flatMap操作是将函数应用于RDD中每一个元素,将返回的迭代器的所有内容构成RDD。

    flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。

    1.3 mapPartitions

    与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

    (1) 使用Java进行编写

        public static void mapPartitions() {
            JavaRDD<String> textFile = jsc.textFile("words",3);
            textFile.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(Iterator<String> is) throws Exception {
                    System.out.println("创建数据库连接。。。。");
                    List<String> list = new ArrayList<String>();
                    while(is.hasNext()) {
                        list.add(is.next());
                        System.out.println("模拟向数据库插入批量数据。。。");
                    }
                    System.out.println("关闭数据库连接。。。");
                    return list;
                }
            }).collect();
            
        }

    (2) 使用scala进行编写

      def mapPartitions: Unit = {
        val rdd1 = sc.textFile("words")
        val mapResult = rdd1.mapPartitions(iter =>{
            println("打开数据库。。。")
            val list = List()
            while(iter.hasNext){
              list.addString(new StringBuilder(iter.next()))
              println("插入数据库。。。")
            }
            println("关闭数据库。。。")
            list.iterator
          }, false)
        mapResult.foreach(println)
      }

    (3) 运行结果

    (4)总结

    mapPartitions比较适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支。

    1.4 mapPartitionsWithIndex

    每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是什么

    (1)使用Java编写

        public static void mapPartitionsWithIndex() {
            List<String> list = Arrays.asList("李光洙","刘在石","哈哈","宋智孝");
            JavaRDD<String> rdd = jsc.parallelize(list,3);
            
            JavaRDD<String> rdd2 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
                    List<String> list =  new ArrayList<String>();
                    while(iter.hasNext()) {
                        list.add(index+"_"+iter.next());
                    }
                    return list.iterator();
                }
            }, true);
            
            rdd2.foreach(new VoidFunction<String>() {
                
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
            
        }

    (2)使用scala编写

    def mapPartitionsWithIndex: Unit = {
        val list = List("李光洙","刘在石","哈哈","宋智孝")
        val rdd1 = sc.parallelize(list, 3) 
        val rdd2 = rdd1.mapPartitionsWithIndex((index,iter)=>{
          val l = ListBuffer[String]()
          while(iter.hasNext){
            val v = iter.next()
            l.append(index+"_"+v)
          }
          l.iterator
        }, true).foreach(println)
      }

    (3)结果

     二.

    2.1 fillter

     过滤操作,满足filter内function函数为true的RDD内所有元素组成一个新的数据集。如:filter(a == 1)。

    (1)使用Java编写

        public static void fillter() {
            List<Integer> list = Arrays.asList(1,2,3,4,5,6);
            JavaRDD<Integer> rdd = jsc.parallelize(list);
            rdd.filter(new Function<Integer, Boolean>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Boolean call(Integer i) throws Exception {
                    return i%2==0;
                }
            }).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer a) throws Exception {
                    System.out.println(a);
                }
            });
        }

    (2)使用scala编写

      def fillter: Unit = {
        val list = List(1,2,3,4,5,6)
        val rdd1 = sc.parallelize(list)
        rdd1.filter(_%2==0).foreach(println)
      }

    (3)结果

    2.2 sample(withReplacement, fraction, seed)

    采样操作,用于从样本中取出部分数据。withReplacement是否放回,fraction采样比例,seed用于指定的随机数生成器的种子。(是否放回抽样分true和false,fraction取样比例为(0, 1]。seed种子为整型实数。)

    (1)使用Java编写

        public static void sample() {
            List<Integer> list = Arrays.asList(1,2,3,4,5,6);
            JavaRDD<Integer> rdd = jsc.parallelize(list);
            rdd.sample(false, 0.5, 1).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer s) throws Exception {
                    System.out.println(s);
                }
            });
            
        }

    (2)使用scala编写

      def sample: Unit = {
        val list = List(1,2,3,4,5,6)
        val rdd1 = sc.parallelize(list)
        rdd1.sample(false,0.5, 1).foreach(println) 
      }

    (3)结果

    2.3 cartesian

    cartesian是用于求笛卡尔积的,该操作不会执行shuffle操作。

    (1)使用Java编写

        public static void cartesian() {
            List<String> list1 = Arrays.asList("A","B","C");
            List<Integer> list2 = Arrays.asList(1,2,3);
            
            JavaRDD<String> rdd1 = jsc.parallelize(list1);
            JavaRDD<Integer> rdd2 = jsc.parallelize(list2);
            
            rdd1.cartesian(rdd2).foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                @Override
                public void call(Tuple2<String, Integer> tuple) throws Exception {
                    System.out.println(tuple._1+"---->"+tuple._2);
                }
            });
        }

    (2)使用scala编写

      def cartesian: Unit = {
        val list1 = List("A","B","C")
        val list2 = List(1,2,3)
        
        val rdd1 = sc.parallelize(list1)
        val rdd2 = sc.parallelize(list2)
        rdd1.cartesian(rdd2).foreach(tuple =>println(tuple._1+"--->"+tuple._2))
      }

    (3)结果

    2.4 reduceByKey(function,[numTasks])

    reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。

    (1)使用Java编写

        public static void reduceByKey() {
            JavaRDD<String> rdd = jsc.textFile("words");
            rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" "));
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    // TODO Auto-generated method stub
                    return v1+v2;
                }
            }).foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                @Override
                public void call(Tuple2<String, Integer> s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)使用scala编写

      def reduceByKey: Unit = {
        sc.textFile("words").flatMap(_.split(" ")).map(new Tuple2(_,1)).reduceByKey((_+_)).foreach(println)
      }

    (3)结果

    三、union,join和groupByKey 

    3.1 union

    当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。不去重。

    (1)使用Java编写

        public static void union() {
            List<Integer> list1 = Arrays.asList(1,2,3,4);
            List<Integer> list2 = Arrays.asList(3,4,5,6);
            JavaRDD<Integer> rdd1 = jsc.parallelize(list1);
            JavaRDD<Integer> rdd2 = jsc.parallelize(list2);
            rdd1.union(rdd2).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer i) throws Exception {
                    System.out.println(i);
                }
            });
        }

    (2)使用scala编写

      def union: Unit = {
        val list1 = List(1,2,3,4)
        val list2 = List(3,4,5,6)
        val rdd1 = sc.parallelize(list1)
        val rdd2 = sc.parallelize(list2)
        rdd1.union(rdd2).foreach(println)
      }

    (3)结果

    3.2 groupByKey

    groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。

    (1)使用Java编写

        public static void groupByKey() {
            List<Tuple2<String,String>> list = Arrays.asList(
                        new Tuple2("Doctor","A"),
                        new Tuple2("Actor","B"),
                        new Tuple2("Doctor","C"),
                        new Tuple2("Actor","D")
                    );
            JavaPairRDD<String, String> rdd = jsc.parallelizePairs(list);
            JavaPairRDD<String, Iterable<String>> rdd2 = rdd.groupByKey();
            rdd2.foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {
                
                @Override
                public void call(Tuple2<String, Iterable<String>> t) throws Exception {
                    String s = t._1;
                    Iterator<String> iter = t._2.iterator();
                    String person = "";
                    while(iter.hasNext()) {
                        person = person + iter.next()+" ";
                    }
                    System.out.println("职业:"+s+",人员:"+person);
                }
            });
            
        }

    (2)使用scala编写

      def groupByKey: Unit = {
        val list = List(
                new Tuple2("Doctor","A"),
                        new Tuple2("Actor","B"),
                        new Tuple2("Doctor","C"),
                        new Tuple2("Actor","D")
                                )
                                
        val rdd = sc.parallelize(list)
        rdd.groupByKey().foreach(t =>{
            val s = t._1
            val iter = t._2.iterator
            var person = ""
            while(iter.hasNext) 
              person = person + iter.next + " "
            println("职业:"+s+",人员:"+person)
            
          }
        )
      }

    (3)结果

    3.3 join

    join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合

    (1)使用Java编写

        public static void join() {
            List<Tuple2<Integer,String>> list1 = Arrays.asList(
                        new Tuple2<Integer,String>(1,"小明"),
                        new Tuple2<Integer,String>(2,"小米"),
                        new Tuple2<Integer,String>(3,"晓红"),
                        new Tuple2<Integer,String>(4,"小绿")
                    );
            List<Tuple2<Integer,Integer>> list2 = Arrays.asList(
                    new Tuple2<Integer,Integer>(1,88),
                    new Tuple2<Integer,Integer>(2,99),
                    new Tuple2<Integer,Integer>(3,100),
                    new Tuple2<Integer,Integer>(4,98)
                );
            JavaPairRDD<Integer, String> rdd1 = jsc.parallelizePairs(list1);
            JavaPairRDD<Integer, Integer> rdd2 = jsc.parallelizePairs(list2);
            JavaPairRDD<Integer, Tuple2<String, Integer>> rdd = rdd1.join(rdd2);
            rdd.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
                
                @Override
                public void call(Tuple2<Integer, Tuple2<String, Integer>> tuple) throws Exception {
                    System.out.println("学号:"+tuple._1+",姓名:"+tuple._2._1+",分数:"+tuple._2._2);
                }
            });
        }

    (2)使用scala编写

      def join: Unit = {
        val list1 = List(
                new Tuple2(1,"小明"),
                        new Tuple2(2,"小米"),
                        new Tuple2(3,"晓红"),
                        new Tuple2(4,"小绿")  
          )
        val list2 = List(
              new Tuple2(1,88),
                      new Tuple2(2,99),
                      new Tuple2(3,100),
                      new Tuple2(4,98)  
          )
          val rdd1 = sc.parallelize(list1)
          val rdd2 = sc.parallelize(list2)
          rdd1.join(rdd2).foreach(tuple => {
            println("学号:"+tuple._1+",姓名:"+tuple._2._1+",分数:"+tuple._2._2)
          })
      }

    (3)结果

    四、distinct、intersection、subtract

    4.1 distinct([numTasks])

    返回一个在源数据集去重之后的新数据集,即去重,并局部无序而整体有序返回。

    (1)使用Java编写

        public static void distinct() {
            List<Integer> list = Arrays.asList(1,1,2,3,4,55,55,6,4,32,2,1,3);
            jsc.parallelize(list).distinct().foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)使用scala编写

      def distinct: Unit = {
        val list = List(1,1,2,3,4,55,55,6,4,32,2,1,3)
        sc.parallelize(list).distinct().foreach(println)
      }

    (3)结果

    4.2 intersection

    对于源数据集和其他数据集求交集,并去重,且无序返回。

    (1)使用Java编写

        public static void intersection() {
            List<Integer> list1 = Arrays.asList(1,2,3,4);
            List<Integer> list2 = Arrays.asList(3,4,5,6,1,3);
            
            JavaRDD<Integer> rdd1 = jsc.parallelize(list1);
            JavaRDD<Integer> rdd2 = jsc.parallelize(list2);
            rdd1.intersection(rdd2).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)使用scala编写

      def intersection: Unit = {
        val list1 = List(1,2,3,4)
            val list2 = List(3,4,5,6,1,3)
            val rdd1 = sc.parallelize(list1)
            val rdd2 = sc.parallelize(list2)
            rdd1.intersection(rdd2).foreach(println)
      }

    (3)结果

    4.3 subtract

    subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

    (1)使用Java编写

        public static void subtract() {
            List<Integer> list1 = Arrays.asList(1,2,3,4,11,12);
            List<Integer> list2 = Arrays.asList(3,4,5,6,1,3);
            
            JavaRDD<Integer> rdd1 = jsc.parallelize(list1);
            JavaRDD<Integer> rdd2 = jsc.parallelize(list2);
            rdd1.subtract(rdd2).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)结果

    五、coalesce、repartition、repartitionAndSortWithinPartitions

    5.1 coalesce

     该函数用于将RDD进行重分区,使用HashPartitioner。 
    重新分区,减少RDD中分区的数量到numPartitions。

    (1)使用Java编写

        public static void coalesce() {
            JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,3,45,2,4,542,2),3);
            rdd.coalesce(1).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)结果

    5.2   repartition

    进行重分区,解决的问题:本来分区数少  -》 增加分区数

    (1)使用Java编写

        public static void repartition() {
            jsc.parallelize(Arrays.asList(1,3,4,24,14,421,1),2).repartition(4).foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)结果

    5.3 repartitionAndSortWithinPartitions

    repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。 

    (1)使用Java编写

        public static void repartitionAndSortWithinPartitions() {
            JavaPairRDD<Integer, Integer> rdd = jsc.parallelize(Arrays.asList(1,3,4,24,14,421,1),1)
                    .mapToPair(new PairFunction<Integer, Integer, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<Integer, Integer> call(Integer s) throws Exception {
                            return new Tuple2<Integer, Integer>(s, s);
                        }
                    });
            JavaPairRDD<Integer, Integer> rdd2 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public int numPartitions() {
                    return 2;
                }
                
                @Override
                public int getPartition(Object key) {
                    Integer p = Integer.valueOf(key.toString());
                    if(p%2==0) {
                        return 0;
                    }else {
                        return 1;
                    }
                }
            });
            JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<String>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, Integer>> iter) throws Exception {
                    List<String> list = new ArrayList<>();
                    while(iter.hasNext()) {
                        list.add(index+" "+iter.next());
                    }
                    return list.iterator();
                }
            }, true);
            rdd3.foreach(new VoidFunction<String>() {
                
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)结果

    六、cogroup、sortBykey、aggregateByKey

    6.1 cogroup

    对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。

    (1)使用Java编写

        public static void cogroup() {
            List<Tuple2<Integer, String>> list1 = Arrays.asList(
                    new Tuple2<Integer, String>(1, "张三"),
                    new Tuple2<Integer, String>(2, "李四")
            );
    
            List<Tuple2<Integer, String>> list2 = Arrays.asList(
                    new Tuple2<Integer, String>(1, "王五"),
                    new Tuple2<Integer, String>(2, "赵柳"),
                    new Tuple2<Integer, String>(3, "田七")
            );
    
            List<Tuple2<Integer, String>> list3 = Arrays.asList(
                    new Tuple2<Integer, String>(1, "张三"),
                    new Tuple2<Integer, String>(2, "李四"),
                    new Tuple2<Integer, String>(3, "尾巴")
            );
    
            JavaPairRDD<Integer, String> list1RDD = jsc.parallelizePairs(list1);
            JavaPairRDD<Integer, String> list2RDD = jsc.parallelizePairs(list2);
            JavaPairRDD<Integer, String> list3RDD = jsc.parallelizePairs(list3);
            
            list1RDD.cogroup(list2RDD,list3RDD).foreach(new VoidFunction<Tuple2<Integer,Tuple3<Iterable<String>,Iterable<String>,Iterable<String>>>>() {
                
                @Override
                public void call(Tuple2<Integer, Tuple3<Iterable<String>, Iterable<String>, Iterable<String>>> s)
                        throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)结果

    6.2 sortBykey

    sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
        : RDD[(K, V)] =
    {
      val part = new RangePartitioner(numPartitions, self, ascending)
      new ShuffledRDD[K, V, V](self, part)
        .setKeyOrdering(if (ascending) ordering else ordering.reverse)
    }

    从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。参数主要有两个,一是指定升降序排序,默认是true,二是指定分区数,默认是父rdd的分区数。下面对sortByKey的使用进行说明:

    (1)使用Java编写

        public static void sortByKey() {
            JavaRDD<String> rdd = jsc.textFile("words");
            JavaPairRDD<String, Integer> rdd2 = rdd.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" "));
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
            JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            });
            rdd3.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                    return new Tuple2<Integer, String>(t._2, t._1);
                }
            }).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                    return new Tuple2<String, Integer>(t._2, t._1);
                }
            }).foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                @Override
                public void call(Tuple2<String, Integer> s) throws Exception {
                    System.out.println(s);
                }
            });
        }

    (2)结果

    6.3 aggregateByKey

    aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

    (1)使用Java编写

        public static void aggregateByKey() {
            List<Tuple2<String, Integer>> list = Arrays.asList(
                    new Tuple2<>("cat", 10),
                    new Tuple2<>("mouse",22),
                    new Tuple2<>("mouse", 5),
                    new Tuple2<>("cat", 10),
                    new Tuple2<>("mouse",22),
                    new Tuple2<>("dog", 5)
                    );
            JavaPairRDD<String, Integer> rdd = jsc.parallelizePairs(list,2);
            rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<String,Integer>>, Iterator<Tuple2<String,Integer>>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Tuple2<String, Integer>> call(Integer index
                        , Iterator<Tuple2<String, Integer>> iter)
                        throws Exception {
                    while(iter.hasNext()) {
                        System.out.println("partitionIndex:"+index+"---->"+iter.next());
                    }
                    return iter;
                }
            }, true).collect();
            /*
                partitionIndex:0---->(cat,10)
                partitionIndex:0---->(mouse,22)
                partitionIndex:0---->(mouse,5)
                
                partitionIndex:1---->(cat,10)
                partitionIndex:1---->(mouse,22)
                partitionIndex:1---->(dog,5)
                
                该数据源被分成两个partition
            */
            /**
             * 该函数第一个参数为起始值,该值在一个函数执行时,会在每个key的values中加入该值
             */
            JavaPairRDD<String, Integer> rdd2 = rdd.aggregateByKey(100, new Function2<Integer, Integer, Integer>() {
    
                private static final long serialVersionUID = 1L;
                /*
                 * 该函数统计同个partition内的数据
                 * */
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return Math.max(v1, v2);
                }
            }, new Function2<Integer, Integer, Integer>() {
    
                private static final long serialVersionUID = 1L;
                /**
                 * 该函数统计不同partition内的数据
                 */
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            });
            rdd2.foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                @Override
                public void call(Tuple2<String, Integer> s) throws Exception {
                    System.out.println(s);
                }
            });
            
        }

    (2)结果

    6.4 combineByKey

     combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据的类型不同的返回值。

    要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。

    如果这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这个过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。

    如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。

    由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果合并。

    (1)使用Java编写

        public static void combineByKey() {
            List<Tuple2<String, Integer>> list = Arrays.asList(
                    new Tuple2<>("晓红",11 ),
                    new Tuple2<>("小绿",12 ),
                    new Tuple2<>("小黑",13 ),
                    new Tuple2<>("晓红",14 ),
                    new Tuple2<>("小绿",15 ),
                    new Tuple2<>("晓红",16 )
                    );
            JavaPairRDD<String, Integer> rdd = jsc.parallelizePairs(list,2);
            rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<String,Integer>>, Iterator<Tuple2<String,Integer>>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Tuple2<String, Integer>> call(Integer index,
                        Iterator<Tuple2<String, Integer>> iter) throws Exception {
                    List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
                    while(iter.hasNext()){
                        Tuple2<String, Integer> next = iter.next();
                        System.out.println("partitionindex ="+index+",value="+next);
                        list.add(next);
                    }
                    return list.iterator();
                }
    
            }, true).collect();
            /**数据分区结果:
                partitionindex =0,value=(晓红,11)
                partitionindex =0,value=(小绿,12)
                partitionindex =0,value=(小黑,13)
                
                partitionindex =1,value=(晓红,14)
                partitionindex =1,value=(小绿,15)
                partitionindex =1,value=(晓红,16)
             */
            
            rdd.combineByKey(new Function<Integer, Integer>() {
                
                /**
                 * 该方法与aggregateByKey相似(底层貌似是combineByKey)
                 * 该方法的第一个参数的方法实现数据转化的功能,可以将收到的数据转化成任意类型的数据
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer a) throws Exception {
                    return a;
                }
            }, new Function2<Integer,Integer,Integer>() {
                /**
                 * 第二个参数的方法处理来自同一个partition内的数据
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer a, Integer b) throws Exception {
                    return Math.max(a, b);
                }
            }, new Function2<Integer,Integer,Integer>() {
                /**
                 * 第三个参数的方法处理来自不同partition的数据
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer a, Integer b) throws Exception {
                    return a+b;
                }
            }).foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                @Override
                public void call(Tuple2<String, Integer> a) throws Exception {
                    System.out.println(a);
                }
            });
        }

    (2)结果

    七、zip相关算子

  • 相关阅读:
    Dbcp2抛出org.apache.commons.dbcp2.LifetimeExceededException
    DbUtils使用时抛出Cannot get a connection
    Spring注解【非单例】
    Exception in thread java.lang.IllegalThreadStateException
    eclipse选中变量,相同变量高亮。
    git push 403
    java虚拟机能并发的启动多少个线程
    产品的随想
    【转载】学习新东西的唯一方法
    Mysql错误问题记录
  • 原文地址:https://www.cnblogs.com/kpsmile/p/10428695.html
Copyright © 2011-2022 走看看