- mapPartitionsWithIndex
def func(index:Int,iter:Iterator[(int)]) : Iterator[String] = { iter.toList.map(x = > "[prtID:"+index+",val:"+x+"]").iterator } val rdd = sc.parallelize(List(1,2,3,4,5,6,7),2) rdd.mapPartitionsWithIndex(fun).collect
传入参数是一个接受两个参数的函数,第一参数是partition的编号,第二个参数是各个分区的数据迭代器
- aggregate
//传入seqOp的是 reduce逻辑,(1,2,3,4) 就是 先传1,2 再传1,2的结果和3
(zeroValue: U)(seqOp:(U,T) => U,combOp:(U,U) = > U) : U seqOp 对每个parttition的元素作用 combOp 对每个partition的是聚合结果作用
val rdd = sc.parallelize(List(1,2,3,4,5,6,7),2)
rdd.aggregate(0)(_+_,_+_) //aggregate求和
res : Int = 28
rdd.aggregate(0)(math.max(_,_),_+_) //分区中的最大值求和
结果:10
rdd.aggregate(10)(math.max(_,_),_+_) //10 在combOp中的还会被操作一次
结果:30
val rdd1 =sc.parallelize(List("a","b","c","d","e","f"),2)
rdd1.aggregate("|")(_+_,_+_)
结果 ||abc|def
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
结果由于是并行计算,所以结果 可能是"42",或者"24"
- aggregateByKey
实用于key-value类型的rdd,pairrdd类型
首先对各自分区的pair,将key相同的提取出来,然后将value的集合运用后面传进来的方法
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } pairRDD.mapPartitionsWithIndex(func2).collect pairRDD.aggregateByKey(0)(_+_, _ + _).collect 结果: (dog,12), (cat 19),(mouse,6) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect 结果: (dog,12), (cat 17),(mouse,6) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect 结果: Array((dog,100), (cat,200), (mouse,200))
- com
文件内容:
hello tom
hello tom
hello tom
hello tom
hello tom
hello tom
hello jerry
hello jerry
hello jerry
hello kitty
hello kitty
hello kitty
hello world
hello world
hello world
//传入三个参数,第一个参数是进行分区迭代的初始值 //第二个分区局部聚合 //第三个分区汇总聚合 combineByKey()
sc.textFile("上面的文件内容").flatMap(_.split("")).map((_,1)).combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
结果:(tom,6) (hello,15) (kitty,3) (jerry,3)
sc.textFile("上面的文件内容").flatMap(_.split("")).map((_,1)).combineByKey(x => x+10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
//因为有三个分区,所以初始相加值+10,总和加30
结果:(tom,36) (hello,45) (kitty,33) (jerry,33)