spark-聚合算子aggregatebykey
Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
使用给定的聚合函数和中性的“零值”聚合每个键的值。这个函数可以返回与这个RDD V中的值类型不同的结果类型U。
前一个操作用于合并分区内的值,而后一个操作用于合并分区之间的值。为了避免内存分配,允许这两个函数修改并返回它们的第一个参数,而不是创建一个新的U。
def aggregateByKey[U: ClassTag](zeroValue: U)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]{
...
}
/**
* 按key聚合Demo
*/
object AggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("wcDemo")
conf.setMaster("local[4]")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("file:///e:/wc/1.txt" , 3)
val rdd2 = rdd1.flatMap(_.split(" ")).mapPartitionsWithIndex((idx, it) => {
var list: List[(String, String)] = Nil
for (e <- it) {
list = (e, e + "_" + idx) :: list
}
list.iterator
})
rdd2.collect().foreach(println)
println("=======================")
val zeroU:String = "[]"
def seqOp(a:String,b:String) = {
a + b + " ,"
}
def comOp(a:String,b:String) = {
a + "$" + b
}
val rdd3 = rdd2.aggregateByKey(zeroU)(seqOp,comOp)
rdd3.collect().foreach(println)
}
}
(hello,hello_0) =>[hello_0]hello_0,hello_0,hello_0, =>[hello_0]hello_0,hello_0,hello_0,$[hello_1]hello_1,hello_1,$[hello_2]hello_2,hello_2, (hello,hello_0) (hello,hello_0) (hello,hello_0) (hello,hello_1) =>[hello_1]hello_1,hello_1, (hello,hello_1) (hello,hello_1) (hello,hello_2) =>[hello_2]hello_2,hello_2, (hello,hello_2) (hello,hello_2) (hello,[]hello_0 ,hello_0 ,hello_0 ,hello_0 ,$[]hello_1 ,hello_1 ,hello_1 ,$[]hello_2 ,hello_2 ,hello_2 ,) (tom2,tom2_0) (world,world_0) (tom1,tom1_0) (world,world_0) (tom7,tom7_1) (world,world_1) (tom6,tom6_1) (world,world_1) (tom5,tom5_1) (world,world_1) (tom10,tom10_2) (world,world_2) (tom9,tom9_2) (world,world_2) (tom8,tom8_2) (world,world_2)
spark PairRDDFunction聚合函数
------------------------------
1.reduceByKey
V类型不变,有map端合成。
2.groupByKey
按照key分组,生成的v是集合,map端不能合成。
3.aggregateByKey
可以改变v的类型,map端还可以合成。
4.combineByKeyWithClassTag
按照key合成,可以指定是否进行map端合成、任意的combiner创建函数,值合并函数以及合成器合并函数。