1 partitionBy:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
val conf = new SparkConf().setMaster("local[*]").setAppName("word count") val sc = new SparkContext(conf) //----------------------- partitionBy ------------------------- val kvRDD: RDD[(Int, Char)] = sc.makeRDD(Array((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')), 4) val partitionByRDD: RDD[(Int, Char)] = kvRDD.partitionBy(new HashPartitioner(2)) val partitionByArray: Array[Array[(Int, Char)]] = partitionByRDD.glom().collect() partitionByArray.foreach(data => println(data.mkString(","))) //(2,b),(4,d) //(1,a),(3,c)
2 groupByKey:groupByKey也是对每个key进行操作,但只生成一个sequence。
val rdd = sc.makeRDD(List("java", "scala", "java", "spark")) val mapRDD = rdd.map((_, 1)) val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey() groupByKeyRDD.foreach(println) //(spark,CompactBuffer(1)) //(scala,CompactBuffer(1)) //(java,CompactBuffer(1, 1))
3 reduceByKey:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
//分组、所有区预聚合 val rdd2 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2))) val reduceByKeyRDD: RDD[(String, Int)] = rdd2.reduceByKey((result, v) => result + v) reduceByKeyRDD.collect().foreach(println) //(female,6) //(male,7)
reduceByKey和groupByKey的区别
1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。既分组又聚合。
2. groupByKey:按照key进行分组,直接进行shuffle。只分组。
3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
4 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
参数描述:
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp:函数用于合并每个分区中的结果。
案例:分区内相同key找出最大值,分区间相加
val rdd3: RDD[(String, Int)] = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2) rdd3.glom().collect().foreach(data => println(data.mkString(","))) //(a,3),(a,2),(c,4) //(b,3),(c,6),(c,8) //区内聚合,区间聚合 val aggregateByKeyRDD: RDD[(String, Int)] = rdd3.aggregateByKey(0)(_.max(_), _ + _) aggregateByKeyRDD.foreach(println) //(a,3) //(c,12) //(b,3)
5 foldByKey:aggregateByKey的简化操作,seqop和combop相同
案例:计算相同key的value累加值。
val rdd4: RDD[(Char, Int)] = sc.parallelize(List(('a', 3), ('a', 2), ('a', 4), ('b', 3), ('c', 6), ('c', 8)), 3) //相当于aggregateByKey的简化,区内和区间的操作一样 val foldByKeyRDD: RDD[(Char, Int)] = rdd4.foldByKey(0)(_ + _) foldByKeyRDD.collect().foreach(println) //(c,14) //(a,9) //(b,3)
6 combineByKey(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) :对相同K,把V合并成一个集合。
参数描述:
(1)createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
案例:求每个key所对应值的平均值
val rdd5: RDD[(String, Int)] = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2) //acc的类型是第一个函数的返回值类型,无法自动推断,因此不能用下划线简写 val combineByKeyRDD = rdd5.combineByKey( (_, 1), //对每一个v进行map (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //区内聚合 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) //区间聚合 combineByKeyRDD.collect().foreach(println) //(b,(286,3)) //(a,(274,3)) //求平均值.map入参只有一个,模式匹配更方便取出元组的值 val mapRDD1: RDD[(String, Double)] = combineByKeyRDD.map { case (k, t) => (k, t._1 / t._2.toDouble) } mapRDD1.collect().foreach(println) //(b,95.33333333333333) //(a,91.33333333333333)
7 sortByKey([ascending], [numTasks]):在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
val rdd6 = sc.parallelize(Array((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd"))) val sortByKeyRDD: RDD[(Int, String)] = rdd6.sortByKey(true) sortByKeyRDD.collect().foreach(println) //(1,dd) //(2,bb) //(3,aa) //(6,cc)
8 mapValues:针对于(K,V)形式的类型只对V进行操作。
val rdd7 = sc.parallelize(Array((1, "a"), (1, "d"), (2, "b"), (3, "c"))) val mapValuesRDD: RDD[(Int, String)] = rdd7.mapValues(_ + "|||") mapValuesRDD.collect().foreach(println) //(1,a|||) //(1,d|||) //(2,b|||) //(3,c|||)
9 join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD。
val rdd8 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "c"))) val rdd9 = sc.parallelize(Array((1, 4), (2, 5), (3, 6))) val rdd10 = sc.parallelize(Array((1, "x"), (2, "y"), (3, "z"))) val joinRDD1: RDD[(Int, (String, Int))] = rdd8.join(rdd9) //相当于内连接,双方都存在此key的保留 joinRDD1.collect().foreach(println) //(1,(a,4)) //(2,(b,5)) //(3,(c,6)) val joinRDD2: RDD[(Int, ((String, Int), String))] = rdd8.join(rdd9).join(rdd10) joinRDD2.collect().foreach(println) //(1,((a,4),x)) //(2,((b,5),y)) //(3,((c,6),z))
10 cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。
val rdd8 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "c"))) val rdd9 = sc.parallelize(Array((1, 4), (2, 5), (3, 6))) //相当于全外连接,只要某个数据集中出现了此key,就组合成元祖 val cogroupRDD1: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd8.cogroup(rdd9) cogroupRDD1.collect().foreach(println) //(4,(CompactBuffer(c),CompactBuffer())) //(1,(CompactBuffer(a),CompactBuffer(4))) //(2,(CompactBuffer(b),CompactBuffer(5))) //(3,(CompactBuffer(c),CompactBuffer(6))) val cogroupRDD2: RDD[(Int, (Iterable[Int], Iterable[String]))] = rdd9.cogroup(rdd8) cogroupRDD2.collect().foreach(println) //(4,(CompactBuffer(),CompactBuffer(c))) //(1,(CompactBuffer(4),CompactBuffer(a))) //(2,(CompactBuffer(5),CompactBuffer(b))) //(3,(CompactBuffer(6),CompactBuffer(c)))