RDD:RDD分区数,若从HDFS创建RDD,RDD的分区就是和文件块一一对应,若是集合并行化形式创建,RDD分区数可以指定,一般默认值是CPU的核数。
task:task数量就是和分区数量对应。
这个全:https://www.cnblogs.com/frankdeng/p/9301672.html
一、transformation算子:
(1)map(func):将函数应用于RDD中的每一个元素,将返回值构成新的RDD。输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。
rdd.map(x=>x+1)
如:{1,2,3,3} 结果为 {2,3,4,4}
hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive
//读取HDFS文件到RDD
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子 scala> var mapresult = data.map(line => line.split("\s+")) mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
//结果 scala> mapresult.collect res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
(2)flatMap(func):比map多一步合并操作,首先将数组元素进行映射,然后合并压平所有的数组。
//使用flatMap算子 scala> var flatmapresult = data.flatMap(line => line.split("\s+")) flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
//结果 scala> flatmapresult.collect res1: Array[String] = Array(hello, world, hello, spark, hello, hive)
参考博客:https://www.cnblogs.com/devin-ou/p/8028305.html
(3)mapPartitions(func):函数中传入的参数是迭代器,迭代器里面保存的是一个分区里面的数据。
/** * makeRDD方法的第一个参数代表的是RDD中的 元素 * 第二个参数:RDD的分区数 * rdd[Int] */ val rdd = sc.makeRDD(1 to 10,3) /** * mapPartitions这个算子遍历的单位是partition * 会将一个partition的数据量全部加载到一个集合里面 */ val mapPartitonsRDD = rdd.mapPartitions(iterator=>{ val list = new ListBuffer[Int]() //创建一个数据库连接 while(iterator.hasNext){ val num = iterator.next() list.+=(num+100) } //批量插入数据库 list.iterator }, false) /** * 想要执行,必须有action类的算子 * collect算子会将集群中计算的结果回收到Driver端,慎用 */ val resultArr = mapPartitonsRDD.collect() resultArr.foreach { println }
map和mapPartition的异同:
mapPartition function一次处理一个分区的数据,性能比较高;
map的function一次只处理一条数据。
如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
SparkSql或DataFrame默认会对程序进行mapPartition的优化。
参考博客:https://blog.csdn.net/wuxintdrh/article/details/80278479
(4)distinct:对RDD中的元素进行去重操作。
scala> data.flatMap(line => line.split("\s+")).collect res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark) scala> data.flatMap(line => line.split("\s+")).distinct.collect res62: Array[String] = Array(hive, hello, world, spark, hi)
(5)reduceByKey(func,[numTask]):找到相同的key,对其进行聚合,聚合的规则由func指定。
reduce任务的数量可以由numTask指定
goodsSaleRDD.reduceByKey((x,y) => x+y)
参考博客:https://www.jianshu.com/p/af175e66ce99
(6)groupByKey():对相同的key进行分组。
(7)aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U)
第一个参数代表着 初始值
第二个参数是中间聚合,在每个分区内部按照key执行聚合操作。这个分两步,第一步先将每个value和初始值作为函数参数进行计算,返回的结果作为新的kv对。然后在对结果再带入到函数中计算。
第三个参数是最终聚合,对中间聚合结果进行最终聚合。
例如:一个RDD有两个分区,
patition1:(1,1) (1,2) (2,1)
patition2:(2,3)(2,4)(1,7)
首先,在每个patition中将value和初始值三带入到seqFunc函数中,得到中间结果kv:
patition1:(1,3) (1,3) (2,3)
patition2:(2,3)(2,4)(1,7)
再将中间结果kv带入到seqFunc函数中,按照key进行聚合
patition1:(1,3)(2,3)
patition2:(2,4)(1,7)
最后,进行整体聚合,将上一步结果带入combFunc
(1,10)(2,7)
def seqFunc(a,b): print "seqFunc:%s,%s" %(a,b) return max(a,b) #取最大值 def combFunc(a,b): print "combFunc:%s,%s" %(a ,b) return a + b #累加起来 ''' aggregateByKey这个算子内部肯定有分组 ''' aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
参考博客:https://blog.csdn.net/qq_35440040/article/details/82691794 这个写的挺乱,但有地方可以参考
(8)combineByKey ( createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C,C) =>C ) :
主要分为三步,第一步,对value进行初始化处理;第二步,在分区内部对(key,value)进行处理,第三步,所有分区间对(key,value)进行处理。
https://www.jianshu.com/p/b77a6294f31c
参考博客:https://www.jianshu.com/p/b77a6294f31c
(9)sortBy():排序操作
二、action算子
基本RDD的action操作
1、reduce():接收一个函数作为参数,这个函数操作两个相同元素类型的RDD并返回一个同样类型的新元素。
val sum=rdd.reduce( (x,y) => x+y )
2、aggregate(zeroValue)(seqOp, combOp):期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来并放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。
val result = input.aggregate((0,0))( (acc, value) => (acc._1 + value, acc._2+1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val avg = result._1 / result._2.toDouble
2、collect():以普通集合或者值的形式,把数据返回驱动器程序,它会将整个RDD的内容返回。通常在单元测试中使用,由于需要将数据复制到驱动器进程中,collect()要求所有的数据都必须能一同放入单台机器的内存中。
rdd.collect()
结果:{1,2,3,3}
Spark的collect方法,是Action类型的一个算子,会从远程集群拉取数据到driver端。最后,将大量数据汇集到一个driver节点上,将数据用数组存放,占用了jvm堆内存,非常用意造成内存溢出,只用作小型数据的观察。
如何避免使用collect:
若需要遍历RDD中元素,可以使用foreach语句;
若需要打印RDD中元素,可用take语句,返回数据集前n个元素,data.take(1000).foreach(println)
,这点官方文档里有说明;
若需要查看其中内容,可用saveAsTextFile方法;
总之,单机环境下使用collect问题并不大,但分布式环境下尽量规避,如有其他需要,手动编写代码实现相应功能就好。
参考博客:https://blog.csdn.net/chaoshengmingyue/article/details/82021746
3、first:返回RDD中的第一个元素,不排序。
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21 scala> rdd1.first res14: (String, String) = (A,1)
4、take():返回RDD中的n个元素,并尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。
rdd.take(2)
结果:{1,2}
5、foreach(fuc):对RDD中的每个元素使用给定的函数。
rdd.foreach(func)
6、saveAsTextFile:saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
var rdd1 = sc.makeRDD(1 to 10,2) scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS hadoop fs -ls /tmp/lxw1234.com Found 2 items -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS -rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000 hadoop fs -cat /tmp/lxw1234.com/part-00000 1 2 3 4 5
Pair RDD的action操作:
1、countByKey ():对每个键对应的元素分别计数
rdd.countBykey()
结果:{(1,1),(3,2)}
2、collectAsMap():将结果以映射的形式返回,以便查询。
rdd.collectAsMap()
结果:Map{(1,2),(3,6)}
3、lookup(key)返回给定键对应的所有值
rdd.lookup(3)
结果:[4, 6]