- map算子
- flatMap算子
- mapParitions算子
- filter算子
- mapParttionsWithIndex算子
- sample算子
- distinct算子
- groupByKey算子
- reduceByKey算子
1、map算子
(1)任何类型的RDD都可以调用map算子;在java中,map算子接收的参数是Function对象,在Function中,需要设置第二个泛型类型为返回的新元素的类型;同时,call()方法的返回类型也需要与第二个泛型的返回类型一致。在call()方法中,对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素,所有新的元素组成一个新的RDD。
(2)map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
举例:
1 package map.xls; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaRDD; 5 import org.apache.spark.api.java.JavaSparkContext; 6 import org.apache.spark.api.java.function.Function; 7 import org.apache.spark.api.java.function.Function2; 8 import org.apache.spark.api.java.function.VoidFunction; 9 10 import java.util.Arrays; 11 import java.util.List; 12 13 public class TransFormation_Map { 14 15 public static void main(String[] args) { 16 // 例1:map算子案例,将集合中每一个元素都乘以2 17 map1(); 18 19 // 例2:map算子案例,将集合中每一个元素进行累加 20 map2(); 21 } 22 /* 23 * 我们可以通过2种方式创建RDD,一种方式是直接读取外部数据(比较常用),另一种是在驱动程序中分发驱动器中的对象集合(list或set),一般在调试中会使用 24 * */ 25 26 27 public static void map1(){ 28 29 //创建SparkConf 30 SparkConf conf = new SparkConf().setAppName("map").setMaster("local"); 31 32 //创建javaSparkContext 33 JavaSparkContext sc = new JavaSparkContext(conf); 34 35 //构建集合 36 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); 37 38 //并行化集合,parallelize对集合进行并行化处理(如果是在集群中,则会将list分配到集群的各个节点上),创建初始RDD 39 JavaRDD<Integer> numberRDD = sc.parallelize(numbers); 40 41 //使用map算子,将集合中每个元素都乘以2 42 43 //map算子,是对于任何类型的RDD,都可以调用的 44 //在java中,map算子接收的参数是Function对象 45 //创建的function对象,一定会让你设置第二个泛型,这个泛型类型,并返回一个新的元素 46 //所有新的元素就会组成一个新的RDD 47 48 JavaRDD<Integer> rdd = numberRDD.map(new Function<Integer, Integer>() { 49 public Integer call(Integer v1) throws Exception { 50 return v1 * 2; 51 } 52 }); 53 54 rdd.foreach(new VoidFunction<Integer>() { 55 56 public void call(Integer t) throws Exception { 57 System.out.println(t); 58 } 59 }); 60 61 //关闭资源 62 sc.close(); 63 64 65 } 66 public static void map2(){ 67 // 创建sparkConf 68 SparkConf conf = new SparkConf().setAppName("map2").setMaster("local"); 69 70 // 创建JavaSparkContext 71 JavaSparkContext sc = new JavaSparkContext(conf); 72 73 // 要通过并行化集合的方式创建RDD,就要调用sparkContext及其子类的parallelize()方法 74 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 75 JavaRDD<Integer> numberRDD = sc.parallelize(numbers); 76 77 // 执行reduce算子 78 // 相等于先进行1+2=3,然后再进行3+3=6... 79 int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() { 80 public Integer call(Integer num1, Integer num2) throws Exception { 81 return num1 + num2; 82 } 83 }); 84 85 // 输出累加和 86 System.out.println("1到10的累加和为:" + sum); 87 } 88 89 }
2、flatMap算子
1、Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
flatMap类似于map,但是每一个输出元素可以被映射成0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
2、map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。
flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。
举例:
1 public static void main(String agrs[]){ 2 3 SparkConf conf = new SparkConf().setAppName("map").setMaster("local"); 4 5 JavaSparkContext sc = new JavaSparkContext(conf); 6 7 int[] array=new int[]{1,2,3,4,5,6,7,8,9}; 8 9 List<Integer> list=new ArrayList<Integer>(); 10 11 for (Integer i : array) { 12 list.add(i); 13 } 14 15 JavaRDD<Integer> rdd=sc.parallelize(list,2); 16 17 //flatMap和map一样是一个一个的传,但是他可以在每一个传入的值新增多个参数 18 //list add方法:在指定位置插入元素,后面的元素都往后移一个元素。 19 JavaRDD<Integer> result=rdd.flatMap(new FlatMapFunction<Integer, Integer>() { 20 public java.util.Iterator<Integer> call(Integer t) throws Exception { 21 22 List<Integer> list = new ArrayList<Integer>(); 23 for(int i = 0; i < t; i++){ 24 list.add(t + i); 25 } 26 return list.iterator(); //返回的这个list就是传入的元素及新增的内容 27 } 28 }); 29 System.out.println(result.collect()); 30 31 }
输出结果:
[1, 2, 3, 3, 4, 5, 4, 5, 6, 7, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 7, 8, 9, 10, 11, 12, 13, 8, 9, 10, 11, 12, 13, 14, 15, 9, 10, 11, 12, 13, 14, 15, 16, 17]
3、mapParitions算子
(1)mapPartitions类似于map,但是独立的在RDD的每一个分片上运行,假设有N个元素,有M个分区,那么使用map将被调用N次,而mapPartitions被调用M次,即一次处理一个分区。
(2)
举例:
4、filter算子
(1)Return a new RDD containing only the elements that satisfy a predicate.返回一个新的过滤后的RDD,过滤规则:只返回条件为true的数据。
(2)函数原型:public JavaPairRDD<K,V> filter(Function<scala.Tuple2<K,V>,Boolean> f)
1 private static void filter01() { 2 // 创建SparkConf 3 SparkConf conf = new SparkConf().setAppName("filter").setMaster("local"); 4 5 // 创建JavaSparkContext 6 JavaSparkContext sc = new JavaSparkContext(conf); 7 8 // 模拟集合 9 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 10 11 // 并行化集合,创建初始RDD 12 JavaRDD<Integer> numberRDD = sc.parallelize(numbers); 13 14 // 对初始RDD执行filter算子,过滤出其中的偶数 15 JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() { 16 private static final long serialVersionUID = 1L; 17 18 //偶数会保留下来,放在新的RDD中 19 public Boolean call(Integer v1) throws Exception { 20 return v1 % 2 == 0; 21 } 22 23 }); 24 25 // 打印新的RDD 26 evenNumberRDD.foreach(new VoidFunction<Integer>() { 27 private static final long serialVersionUID = 1L; 28 29 public void call(Integer t) throws Exception { 30 System.out.println(t); 31 } 32 33 }); 34 35 // 关闭JavaSparkContext 36 sc.close(); 37 }
5、
6、
7、sample算子
1、JavaPairRDD<K,V> sample(boolean withReplacement,double fraction)
JavaPairRDD<K,V> sample(boolean withReplacement,double fraction,long seed)
2、sample算子可以对RDD进行抽样,其中参数withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;
fraction表示抽样比例;seed为随机数种子,比如当前时间戳。
3、sample应用的场景:在数据倾斜的时候,我们那么多数据如果想知道那个key倾斜了,就需要我们采样获取这些key,如果这些key数据不是很重要的话,可以过滤掉,这样就解决了数据倾斜。
例子:
1 package mapPartitions.xls; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaRDD; 5 import org.apache.spark.api.java.JavaSparkContext; 6 7 import java.util.ArrayList; 8 import java.util.List; 9 10 public class TransFormation07_sample { 11 public static void main(String args[]) { 12 sample01(); 13 } 14 15 public static void sample01(){ 16 17 SparkConf conf = new SparkConf().setAppName("TransFormation04_flatMap").setMaster("local"); 18 19 JavaSparkContext sc = new JavaSparkContext(conf); 20 21 List<Integer> list=new ArrayList<Integer>(); 22 23 for(int i = 1;i <= 100;i++){ 24 list.add(i); 25 } 26 JavaRDD<Integer> any = sc.parallelize(list); 27 28 //sample用来从RDD中抽取样本。他有三个参数 29 //withReplacement:表示样本是否放回 true放回 30 //fraction:抽取样本的比例 31 //seed:随机数生成种子 32 //由于样本的抽取其实是以一个固定的算法实现的,所以要达到随机抽样需用随机数生成seed 33 34 JavaRDD<Integer> sample = any.sample(true, 0.1, 0); 35 System.out.println("seed=0:" + sample.collect()); 36 sample = any.sample(true, 0.1, 0); 37 System.out.println("seed=0:"+ sample.collect()); //由于seed相同,所以抽出样本是相同的 38 39 //这里使用系统时间作为seed,发现抽出的样本是随机的 40 JavaRDD<Integer> sample1 = any.sample(true, 0.1, System.currentTimeMillis()); 41 System.out.println("seed随机生成1" + sample1.collect()); 42 sample1=any.sample(true, 0.1, System.currentTimeMillis()); 43 System.out.println("seed随机生成2" + sample1.collect()); 44 } 45 }
输出结果:
seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]
seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]
seed随机生成1[13, 28, 45, 46, 57, 63, 68, 92]
seed随机生成2[3, 9, 48, 57, 64, 65, 71, 86, 88, 92]
8、distinct算子
1、Return a new RDD containing the distinct elements in this RDD. 返回去重的一个新的RDD
2、public JavaPairRDD<K,V> distinct(); public JavaPairRDD<K,V> distinct(int numPartitions)
3、Distinct的操作其实是把原RDD进行MAP操作,根据原来的KEY-VALUE生成为KEY,value使用null来替换,并对新生成的RDD执行reduceByKey的操作,这个reduceByKey的操作中,传入的x,y都是null,这个地方执行reduceByKey的函数执行完成reducebykey的操作后,这个时候新的RDD就只相同的key就只包含一个结果值(其实就是一个null),最后执行下map操作,这个操作返回的是RDD的第一个值,第一个值就是原始rdd的key-value.执行reduceByKey操作的默认的分区算子是Hash.这个功能在执行时也需要做shuffle的操作.也就是说,Distinct的操作是根据key与value一起计算不重复的结果.只有两个记录中key与value都不重复才算是不重复的数据。
4、distinct不改变分区数,但是分区的数据会去重后改变,不是单独去重。而且参数numPartitions指定多少分区,就会生成多少分区。有可能会返回空数据的分区。
例子:
1 public static void distinct02(){ 2 3 SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("TransFormation08_distinct"); 4 5 JavaSparkContext sc = new JavaSparkContext(sparkConf); 6 7 JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList( 8 new Tuple2<String, String>("cat", "11"), 9 new Tuple2<String, String>("dog", "22"), 10 new Tuple2<String, String>("cat", "11"), 11 new Tuple2<String, String>("pig", "44"), 12 new Tuple2<String, String>("duck", "55"), 13 new Tuple2<String, String>("cat", "11"), 14 new Tuple2<String, String>("cat", "12"), 15 new Tuple2<String, String>("dog", "23"), 16 new Tuple2<String, String>("cat", "11"), 17 new Tuple2<String, String>("pig", "22"), 18 new Tuple2<String, String>("duck", "55"), 19 new Tuple2<String, String>("cat", "15")), 2); 20 // 先输出一次创建的Tuple2 21 javaPairRDD1.foreach(new VoidFunction<Tuple2<String, String>>() { 22 public void call(Tuple2<String, String> stringStringTuple2) throws Exception { 23 System.out.println(stringStringTuple2); 24 } 25 }); 26 27 // 去重操作 28 JavaPairRDD<String,String> javaPairRDD = javaPairRDD1.distinct(); 29 30 // 输出去重后的结果 31 javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() { 32 public void call(Tuple2<String, String> stringStringTuple2) throws Exception { 33 System.out.println(stringStringTuple2); 34 } 35 }); 36 37 // 输出分区数---验证去重是否影响分区 38 System.out.println("分区的个数:"+javaPairRDD.partitions().size()); 39 40 // 验证带有numPartitions参数的distinct 41 JavaPairRDD<String,String> javaPairRDD2 = javaPairRDD1.distinct(3); 42 43 javaPairRDD2.foreach(new VoidFunction<Tuple2<String, String>>() { 44 public void call(Tuple2<String, String> stringStringTuple2) throws Exception { 45 System.out.println("-->"+stringStringTuple2); 46 } 47 }); 48 // 输出分区数 49 System.out.println("分区的个数:"+javaPairRDD2.partitions().size()); 50 }
输出结果:
(cat,11)
(dog,22)
(cat,11)
(pig,44)
(duck,55)
(cat,11)
(cat,12)
(dog,23)
(cat,11)
(pig,22)
(duck,55)
(cat,15)
(cat,15)
(cat,12)
(cat,11)
(pig,22)
(pig,44)
(dog,23)
(dog,22)
(duck,55)
分区的个数:2
-->(cat,15)
-->(dog,22)
-->(duck,55)
-->(pig,22)
-->(pig,44)
-->(dog,23)
-->(cat,12)
-->(cat,11)
分区的个数:3
9、groupByKey
1 package mapPartitions.xls; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaPairRDD; 5 import org.apache.spark.api.java.JavaSparkContext; 6 import org.apache.spark.api.java.function.VoidFunction; 7 import scala.Tuple2; 8 9 import java.util.Arrays; 10 import java.util.Iterator; 11 import java.util.List; 12 13 public class TransFormation22_groupByKey { 14 public static void main(String args[]) { 15 groupByKey01(); 16 } 17 18 public static void groupByKey01(){ 19 // 创建SparkConf 20 SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local"); 21 // 创建JavaSparkContext 22 JavaSparkContext sc = new JavaSparkContext(conf); 23 24 // 模拟集合 25 List<Tuple2<String, Integer>> scoreList = Arrays.asList( 26 new Tuple2<String, Integer>("class1", 80), 27 new Tuple2<String, Integer>("class2", 75), 28 new Tuple2<String, Integer>("class1", 90), 29 new Tuple2<String, Integer>("class2", 65), 30 new Tuple2<String, Integer>("class3", 55), 31 new Tuple2<String, Integer>("class3", 65), 32 new Tuple2<String, Integer>("class4", 75), 33 new Tuple2<String, Integer>("class5", 95)); 34 35 // 并行化集合,创建JavaPairRDD 36 JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); 37 38 // 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组 39 JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey(); 40 41 // 打印groupedScores RDD 42 groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { 43 44 private static final long serialVersionUID = 1L; 45 46 public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { 47 48 System.out.println("class: " + t._1); 49 Iterator<Integer> ite = t._2.iterator(); 50 while(ite.hasNext()) { 51 System.out.println(ite.next()); 52 } 53 System.out.println("=============================="); 54 } 55 }); 56 57 // 关闭JavaSparkContext 58 sc.close(); 59 } 60 }
输出结果:
class: class5
95
==============================
class: class3
55
65
==============================
class: class1
80
90
==============================
class: class4
75
==============================
class: class2
75
65
==============================
10、groupByKey算子
1 /** 2 * reduceByKey案例:统计每个班级的总分 3 */ 4 private static void reduceByKey() { 5 // 创建SparkConf 6 SparkConf conf = new SparkConf() 7 .setAppName("reduceByKey") 8 .setMaster("local"); 9 // 创建JavaSparkContext 10 JavaSparkContext sc = new JavaSparkContext(conf); 11 12 // 模拟集合 13 List<Tuple2<String, Integer>> scoreList = Arrays.asList( 14 new Tuple2<String, Integer>("class1", 80), 15 new Tuple2<String, Integer>("class2", 75), 16 new Tuple2<String, Integer>("class1", 90), 17 new Tuple2<String, Integer>("class2", 65)); 18 19 // 并行化集合,创建JavaPairRDD 20 JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); 21 22 // 针对scores RDD,执行reduceByKey算子 23 JavaPairRDD<String, Integer> totalScores = scores.reduceByKey( 24 25 new Function2<Integer, Integer, Integer>() { 26 27 private static final long serialVersionUID = 1L; 28 29 public Integer call(Integer v1, Integer v2) throws Exception { 30 return v1 + v2; 31 } 32 33 }); 34 35 // 打印totalScores RDD 36 totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() { 37 38 private static final long serialVersionUID = 1L; 39 40 public void call(Tuple2<String, Integer> t) throws Exception { 41 System.out.println(t._1 + ": " + t._2); 42 } 43 44 }); 45 46 // 关闭JavaSparkContext 47 sc.close(); 48 }
总结groupByKey和reduceByKey的区别:
当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:
(2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:
因此,在对大数据进行复杂计算时,reduceByKey优于groupByKey。
另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
(1)combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。
(2)foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。