一、Tranformation算子
Transformations类算子叫做转换算子,该类算子是延迟加载,也叫懒加载,必须有action类算子才会触发。
1.1 filter
保留符合条件的数据,类似于SQL中的where子句。true保留,false过滤掉
val rdd = sc.makeRDD(List(1, 1, 3, 4, 5, 6, 7, 8))
rdd.filter(_ % 2 == 0).foreach(println)
// 输出:
4
6
8
1.2 filterByRange
作用KV格式的RDD上,保留Key值符合范围的数据
val rdd1 = sc.parallelize(List(("a", 2), ("b", 1), ("d", 4), ("c", 1), ("e", 4)))
// key值符合b-d范围的
rdd1.filterByRange("b", "d").foreach(println)
// 输出:
(b,1)
(d,4)
(c,1)
1.3 map
指定一个函数,将给定数据集中的记录一条一条的输入该函数处理以得到新的记录。一对一
val words = sc.parallelize(List("spark", "flink", "java", "python"))
words.map((_, 1)).foreach(println)
// 输出:
(spark,1)
(flink,1)
(java,1)
(python,1)
1.4 flapMap
是对map算子的一个简单扩展,将RDD中的每一条数据映射多个输出项,flap映射要求输出的是一个可迭代类型。一对多
val rdd = sc.parallelize(List(("Hello Java"), ("Flink"), ("Hello Spark")))
rdd.flatMap(_.split(" ")).foreach(println)
// 输出:
Hello
Java
Flink
Hello
Spark
1.5 flapMapValues
作用在KV格式的RDD上,对value进行处理。一对多
val rdd = sc.parallelize(List(("a", "2 2"), ("b", "1 5"), ("d", "4 6")))
val rdd2 = rdd1.flatMapValues(_.split(" "))
// 输出:
(a,2)
(a,2)
(b,1)
(b,5)
(d,4)
(d,6)
1.6 mapValues
作用在KV格式的RDD上,对value进行处理,一对一
val rdd = sc.parallelize(List(("a", "2 2"), ("b", "1 5"), ("d", "4 6")))
rdd.mapValues(_ + "-hello").foreach(println)
// 输出:
(a,2 2-hello)
(b,1 5-hello)
(d,4 6-hello)
1.7 mapPartitions
与map类似,遍历的单位是每个partition上的数据。在将数据写入外部数据库(如mysql、redis)的场景时,建议使用mapPartitions代替map,避免频繁的创建关闭连接,性能更高。func函数输入输出必须都是可迭代类型。
// 指定两个分区
val rdd: RDD[Int] = sc.parallelize(List[Int](1, 2, 3, 4, 5), 2)
rdd.mapPartitions(iter => {
val list = new ListBuffer[Int]
println("创建连接")
while(iter.hasNext) {
list.append(iter.next())
}
println("数据插入 " + list.toString())
println("关闭连接")
list.iterator
}).count()
// 输出结果:
创建连接
数据插入 ListBuffer(1, 2)
关闭连接
创建连接
数据插入 ListBuffer(3, 4, 5)
关闭连接
1.8 mapPartitionsWithIndex
与mapPartitions类似,遍历的单位是一个partition上的数据。除此之外,还会携带分区的编号。
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val result: RDD[String] = rdd.mapPartitionsWithIndex((index, iter) => {
val list = new ListBuffer[String]
iter.foreach(value => list.append("rdd index【" + index + "】,value【" + value + "】"))
list.iterator
})
result.foreach(println)
// 输出结果:
rdd index【0】,value【1】
rdd index【0】,value【2】
rdd index【0】,value【3】
rdd index【1】,value【4】
rdd index【1】,value【5】
rdd index【1】,value【6】
1.9 sample
随机抽样。三个参数:是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed)。seed为可选参数,当传入seed值时,每次抽样的结果相同。
val rdd = sc.makeRDD(List(1, 1, 3, 4, 5, 6, 7, 8))
rdd.sample(true,0.5)foreach(println)
// 输出结果(每次结果不同):
1
1
4
5
7
1.10 reduceByKey
作用在KV格式RDD上,将相同的key的数据根据相应的逻辑进行处理。
val rdd = sc.parallelize(List(("Spark", 1), ("Java", 1), ("Flink", 1),("Spark", 1)))
rdd.reduceByKey(_ + _).foreach(println)
// 输出:
(Flink,1)
(Spark,2)
(Java,1)
1.11 aggregateByKey
作用在KV格式RDD上,先在分区内对key相同的数据进行聚合处理,然后对这些不同分区的小聚合结果进行大聚合。aggregateByKey(zeroValue)(seqOp, combOp):zeroValue表示每个组的初始值,seqOp是分区内小聚合逻辑,combOp是不同分区对小聚合结果进行大聚合的逻辑。
val rdd1 = sc.parallelize(List(("dog", 2), ("cat", 1), ("mouse", 4), ("cat", 1), ("mouse", 4), ("dog", 2)),2)
val result = rdd1.aggregateByKey(0)(_ + _, _ + _)
// 输出结果:
(dog,4)
(cat,2)
(mouse,8)
1.12 groupBy
根据指定元素分组,返回(K,Iterable
val rdd: RDD[(String, String, Int)] = sc.parallelize(List(("Spark", "汪老师", 2), ("Java", "张老师", 1), ("Spark", "李老师", 4), ("Flink", "赵老师", 1)))
rdd.groupBy(_._1).foreach(println)
// 输出结果:
(Flink,CompactBuffer((Flink,赵老师,1)))
(Spark,CompactBuffer((Spark,汪老师,2), (Spark,李老师,4)))
(Java,CompactBuffer((Java,张老师,1)))
1.13 groupByKey
作用在KV格式RDD上,根据Key进行分组。返回(K,Iterable
val rdd: RDD[(String, Int)] = sc.parallelize(List(("Spark", 2), ("Java", 1), ("Spark",4), ("Flink", 1)))
rdd.groupByKey().foreach(println)
// 输出结果:
(Flink,CompactBuffer(1))
(Spark,CompactBuffer(2, 4))
(Java,CompactBuffer(1))