Transformation算子
intersection
交集
/*
交集
*/
@Test
def intersection(): Unit ={
val rdd1=sc.parallelize(Seq(1,2,3,4,5))
val rdd2=sc.parallelize(Seq(3,4,5,6,7))
rdd1.intersection(rdd2)
.collect()
.foreach(println(_))
}

union
并集
/*
并集
*/
@Test
def union(): Unit ={
val rdd1=sc.parallelize(Seq(1,2,3,4,5))
val rdd2=sc.parallelize(Seq(3,4,5,6,7))
rdd1.union(rdd2)
.collect()
.foreach(println(_))
}

subtract
差集
@Test
def subtract(): Unit ={
val rdd1=sc.parallelize(Seq(1,2,3,4,5))
val rdd2=sc.parallelize(Seq(3,4,5,6,7))
rdd1.subtract(rdd2)
.collect()
.foreach(println(_))
}
输出:

groupByKey
作用
-
GroupByKey 算子的主要作用是按照 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value
/*
groupByKey 运算结果的格式:(K,(value1,value2))
reduceByKey 能否在Map端做Combiner
*/
@Test
def groupByKey(): Unit ={
sc.parallelize(Seq(("a",1),("a",1),("b",1)))
.groupByKey()
.collect()
.foreach(println(_))
}

distinct
作用:用于去重
@Test
def distinct(): Unit ={
sc.parallelize(Seq(1,1,2,2,3))
.distinct()
.collect()
.foreach(println(_))
}
输出:1,2,3
combineByKey
作用
-
对数据集按照 Key 进行聚合
调用
-
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
参数
-
createCombiner将 Value 进行初步转换 -
mergeValue在每个分区把上一步转换的结果聚合 -
mergeCombiners在所有分区上把每个分区的聚合结果聚合 -
partitioner可选, 分区函数 -
mapSideCombiner可选, 是否在 Map 端 Combine -
serializer序列化器
例子:算个人得分的平均值
@Test
def combineByKey(): Unit ={
var rdd=sc.parallelize(Seq(
("zhangsan", 99.0),
("zhangsan", 96.0),
("lisi", 97.0),
("lisi", 98.0),
("zhangsan", 97.0)
))
//2.算子运算
// 2.1 createCombiner 转换数据
// 2.2 mergeValue 分区上的聚合
// 2.3 mergeCombiners 把所有分区上的结果再次聚合,生成最终结果
val combineResult = rdd.combineByKey(
createCombiner = (curr: Double) => (curr, 1),
mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),
mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
)
val resultRDD = combineResult.map(item => (item._1, item._2._1 / item._2._2))
resultRDD.collect().foreach(print(_))
}

aggregateByKey
作用
-
聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value
调用
-
rdd.aggregateByKey(zeroValue)(seqOp, combOp)
参数
-
zeroValue初始值 -
seqOp转换每一个值的函数 -
comboOp将转换过的值聚合的函数
/*
rdd.aggregateByKey(zeroValue)(seqOp, combOp)
zeroValue 初始值
seqOp 转换每一个值的函数
comboOp 将转换过的值聚合的函数
*/
@Test
def aggregateByKey(): Unit ={
val rdd=sc.parallelize(Seq(("手机",10.0),("手机",15.0),("电脑",20.0)))
rdd.aggregateByKey(0.8)(( zeroValue,item) =>item * zeroValue,(curr,agg) => curr+agg)
.collect()
.foreach(println(_))
// (手机,20.0)
// (电脑,16.0)
}

foldByKey
作用
-
和 ReduceByKey 是一样的, 都是按照 Key 做分组去求聚合, 但是 FoldByKey 的不同点在于可以指定初始值
/*
foldByKey可以指定初始值
*/
@Test
def foldByKey(): Unit ={
sc.parallelize(Seq(("a",1),("a",1),("b",1)))
.foldByKey(zeroValue = 10)( (curr,agg) => curr + agg )
.collect()
.foreach(println(_))
}

join
作用
-
将两个 RDD 按照相同的 Key 进行连接
@Test
def join(): Unit ={
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))
rdd1.join(rdd2).collect().foreach(println(_))
// (a,(1,10))
// (a,(1,11))
// (a,(1,12))
// (a,(2,10))
// (a,(2,11))
// (a,(2,12))
}

sortBy
sortBy`可以指定按照哪个字段来排序, `sortByKey`直接按照 Key 来排序
@Test
def sortBy(): Unit ={
val rdd=sc.parallelize(Seq(8,4,5,6,2,1,1,9))
val rdd2=sc.parallelize(Seq(("a",1),("b",3),("c",2)))
//rdd.sortBy(item =>item).collect().foreach(println(_))
rdd2.sortBy(item => item._2).collect().foreach(println(_))
rdd2.sortByKey().collect().foreach(println(_))
}
repartition
重新进行分区
@Test
def partitioning(): Unit ={
val rdd=sc.parallelize(Seq(1,2,3,4,5),2)
//println((rdd.repartition(5)).partitions.size)
println(rdd.coalesce(5,true).partitions.size)
}