今天开始学习spark中RDD算子
首先是Value类型
(1)map
练习代码:
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
//算子(转换)
val rdd=sparkContext.makeRDD(List(1,2,3,4))
//转换函数
// val dataRDD1: RDD[Int] = rdd.map(
// (num:Int) => {
// num * 2
// } )(完整形式)
val dataRDD1: RDD[Int] = rdd.map(
num => {
num * 2
} )
// val dataRDD2: RDD[String] = dataRDD1.map(
// num => {
// "" + num
// } )
dataRDD1.collect().foreach(println)
sparkContext.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
val rdd=sparkContext.textFile("data/apache.log")
//将apache.log文件中每一行的路径属性取出来,用空格为分隔,将每一行字符串分成成多个组,文件属性在第6个
val mapRDD: RDD[String] = rdd.map(
line => {
val datas = line.split(" ")
datas(6)
}
)
mapRDD.collect().foreach(println)
sparkContext.stop()
}
(2)mapPartitions
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
val rdd=sparkContext.makeRDD(List(1,2,3,4),2)
//性能比.map要好,一次性拿一个分区的数据,进行转换操作,但处理完的数据不会释放掉,因为有对象的引用,容易出现内存溢出。
val maprdd: RDD[Int] = rdd.mapPartitions(
iter => {
iter.map(_ * 2)
}
)
maprdd.collect().foreach(println)
sparkContext.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
val rdd=sparkContext.makeRDD(List(1,2,3,4),2)
//计算每个分区中的最大值
val maprdd: RDD[Int] = rdd.mapPartitions(
iter => {
List(iter.max).iterator
}
)
maprdd.collect().foreach(println)
sparkContext.stop()
}
mapPartitions在处理效率上出map要高,但容易出现内存溢出。mapPartitions可以增加或减少数据,map不行。总的来说在内存有限的情况下,使用map较好。
(3)mapPartitionsWithIndex
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator1")
val sparkContext = new SparkContext(sparkConf)
val rdd: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 2)
//将分区后进行编号,然后索引,当为编号为1的分区是返回这个迭代器,输入它
val maprdd = rdd.mapPartitionsWithIndex(
(index, iter) => {
if (index == 1) {
iter
} else {
Nil.iterator
}
}
)
maprdd.collect().foreach(println)
sparkContext.stop()
}