进阶算子:
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
/**
* 进阶算子
*/
object FunctionDemo2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("CreateRDD").setMaster("local")
val sc = new SparkContext(conf)
//1.遍历RDD获取RDD中每一个分区中元素的值并进行计算,然后返回一个新的RDD
val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),3)
/*
ps: RDD = 1,2,3,4,5,6 --> 分了3个区 --> (1,2)分区1 (3,4)分区2 (5,6)分区3
f: Iterator[T] => Iterator[U], 第一个参数是一个迭代器对象,这个迭代器对象获取的是对应分区中值
preservesPartitioning: Boolean = false 第二参数是否保留父RDD的Partition分区信息,会使用默认值(这个作用会关系宽窄依赖)
如果RDD的数据量不大,而且存在分区,建议使用mapPartition算子代替map算子,可以加快对数据的处速度
如果RDD中数量过于庞大例如10亿条,不建议使用mapPartition,因为出现oom
ps: 一条日志数据大小 1KB~2KB左右 --> 可以估算数据量
*/
//第一个下划线是迭代器对象 第二个下划线是迭代器对象中存储的值
val rdd2: RDD[Int] = rdd1.mapPartitions(_.map(_*10))
println(rdd2.collect.toList)
//对RDD中每个分区中数据进行遍历操作(可以打印 或 可以计算)
//写一个迭代方法或函数 执行迭代操作
/*
f: (Int, Iterator[T]) => Iterator[U], 第一采参数是操作当前分区数据的函数,元组汇总第一个参数是分区序号 第二个参数是分区数据封装到的迭代器对象
preservesPartitioning: Boolean = false 第二参数是否保留父RDD的Partition分区信息,会使用默认值(这个作用会关系宽窄依赖)
*/
val Iter =(index:Int,iter:Iterator[(Int)])=>{
iter.map(x => "[partID:"+index+" value:"+x+"]")
}
val rdd3:RDD[String] = rdd1.mapPartitionsWithIndex(Iter)
println(rdd3.collect.toList)
//1.排序
//1.1 sortByKey:
//根据key值进行排序,但是key必须实现Ordered接口,必须是一个对偶元组
//这个算子有第二参数,第二个参数是boolean值决定了 是升序true 还是降序 false
//默认是true
val rdd4 = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
val sorted: RDD[(Int, String)] = rdd4.sortByKey()
println(sorted.collect.toList)
//1.2 sortBy:
//与sortByKey类似,不同点在于sortBy可以根据传入的参数决定谁来排序
//第一个参数就是 根据谁排序
//第二个参数就是 决定升序还是降序 是升序true 还是降序false 默认是true
val rdd5 = sc.parallelize(List(23,432,123,5234,5423,5413,23,623,623,6423,623,25))
val sorted_1: RDD[Int] = rdd5.sortBy(x => x,false)
println(sorted_1.collect.toList)
//2.重新分区
/*
一般在计算阶段即转换阶段轻易不会分区,在触发job,才会对分区进行一些修改(控制输出)
如果在计算修改分区,绝对不会在发生shuffle的位置上修改的,而且不要频繁修改分区(只要修改分区就会触发shuffle)
*/
val rdd6 = sc.parallelize(List(1,2,3,4,5,6),3)
//2.1 partitions:
//通过partitions这个算子触发RDD中分区存储到一个数组中,然后获取数据长度即当前RDD分区的个数
println("初始分区值"+rdd6.partitions.length)
//2.2 repartition:更改分区repartition可以更改分区的数量 --> 会发生shuffle
val rdd6_1 = rdd6.repartition(1)//缩小分区
println("修改分区个数为:"+rdd6_1.partitions.length)
val rdd6_2 = rdd6.repartition(6)//扩大分区
println("修改分区个数为:"+rdd6_2.partitions.length)
//2.3 coalesce:允许发生将分区个数修改为小值,但是不允许将分区个数为大值
//ps:coalesce 算子默认shuffle是false 不会发生shuffle,所以不能修改为大的分区值
//一定要使用这个算子修改分区,建议可以开启shuffle但是没有repartition好用
//这个算子是repartition底层实现 默认开启shuffle
val rdd7_1 = rdd6.coalesce(1) //缩小分区是可以
println("修改分区个数为:"+rdd7_1.partitions.length)
val rdd7_2 = rdd6.coalesce(6) //扩大分区是不可以
println("修改分区个数为:"+rdd7_2.partitions.length)
//2.4 HashPartitioner
val rdd8 = sc.parallelize(List(("e",5),("c",3),("d",4),("a",2),("b",1)),2)
//更改分区,这个算子是需要传入自定分区器的,因为还没有学习自顶分区,所以这里可以使用一个万能分区 HashPartitioner
val rdd8_1 = rdd8.partitionBy(new HashPartitioner(4))
println(rdd8_1.partitions.length)
//更改分区 repartitionAndSortWithinPartitions 是repartition的一个变种,这个算子相当于集合了分区和排序
//这个算子只能对 对偶元组使用 --> 对偶元组(二元组),会根据key的值进行排序,
//如果需要在repartition之后再进行排序,此时就可以使用这个算子代替
//要去传入的是一个自定义分区器即可
rdd8.repartitionAndSortWithinPartitions(new HashPartitioner(1)).foreach(println)
//3.求和
//Spark中是很少使用Scala中求和方法sum reduce fold
//Scala中的reduce方法在Spark中是action算子
//3.1.根据相同key为一组进行求和(必须要有key)
val rdd9 = sc.parallelize(Array(("tom",1),("jerry",3),("kitty",2),("jerry",2),("tom",2),("dog",10)))
//返回值是一个全新的RDD,并且这个RDD中存储的是元组, key是原来的key value值时key所有value值的和
val rdd9_1: RDD[(String, Int)] = rdd9.reduceByKey(_+_)
println(rdd9_1.collect.toList)
//3.2
/*
aggregateByKey(zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U) 根据相同key 为一组计算value的值
在kv对的RDD中,按照key将value进行分区合并,合并完成后在进行全局合并
计算流程:
1.先做每个分区内数据的计算,zeroValue会参与到每个分区的计算
2.zeroValue会和seqOP的逻辑进行计算,将聚合的结果会传递到combOp这个计算逻辑中也是根据相同key一组进行全局聚合
zeroValue 是初始值(默认值) seqOP 局部聚合(分区内) combOp 全局聚合(聚合分区计算的值)
*/
val rdd10 = sc.parallelize(List(("cat",2),("cat",5),("pig",10),("dog",3),("dog",4),("cat",4)),2)
/**
* combineByKey 根据key进行求和计算看相同key为一组计算 分区值 ,然后在计算全局的值(分区计算值+分区计算值)
* createCombiner: V => C,遍历分区中所有元素,因此每个元素的key要么是还没有遇到,要么就是已经遇到
* 如果这个key是第一次出现,此时会调用createCombiner的函数来创建这各key的对应累加初始值
* mergeValue: (C, V) => C, 如果这各key在当前分区中遇到多个(多个相同key),它会使用mergeValue这各函数通过之前的累加初始值
* 和其他相同key的value进行求和计算 [分区求和]
* mergeCombiners: (C, C) => C 由于每个分区独立的,因此对于同一个key可以有多个累加器(不用分区内),如果有两个获取多个都有key累加器
* 就使用mergeCombiners函数进行分区结果聚合 [全局聚合]
*/
val rdd11= sc.parallelize(List(("cat",2),("cat",5),("pig",10), ("dog",3),("dog",4),("cat",4)),2)
//这个算子不能使用下划线
val rdd11_1 = rdd11.combineByKey(x=>x,(a:Int,b:Int)=>a+b,(m:Int,n:Int)=>m+n)
} }