目录
RDD的创建
三种方式
从一个集合中创建
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
从文件中创建
val rdd2 = sc.textFile("/words.txt")
从其他的RDD转化而来
val rdd3=rdd2.flatMap(_.split(" "))
RDD编程常用API
算子分类
Transformation
概述
Transformation —— 根据数据集创建一个新的数据集,计算后返回一个新的RDD,但不会直接返回计算结果,二是记住这些应用到数据集(例如一个文件)上的转换动作,只有当发生一个要求返回结果给Driver的动作是,这些转换才会真正运行。
帮助文档
http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#transformations
常用Transformation表
Transformation使用实例
map(func)
将分区里面每一条数据取出来,进行处理
filter(func)
flatMap(func)
mapPartitions(func)
一次性将一个分区里面的数据全部取出来。效率更高
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
partitionBy
重新分区,分区数可以手动指定的。分区可能变多也可能变少,而且partitionBy还会产生shuffle过程
reduceByKey(func, [numTasks])
效率更高,会对数据提前进行部分的聚合,减少数据的key的shuffle
groupByKey
效率低下,尽量不要用
combineByKey
aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
sortByKey([ascending], [numTasks])
sortBy(func,[ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
coalesce(numPartitions)
缩减分区数到指定的值,分区的个数只能减少,不能变多。不会产生shuffle过程
适用于一些大的数据集filter过滤之后,进行缩减分区,提高效率
1280M数据 ==> 10个block块 ==> 10个分区,每个分区128M数据 ==> filter ==> 10个分区,每个分区里面剩下了1KB数据 ==> coalesce => 1个分区
repartition(numPartitions)
数据随机洗牌冲洗分区,没有任何规则,可以将分区数变大,或者变小,会产生shuffle的过程
glom
mapValues
subtract
Action
帮助文档
http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions
常用Action表
Action使用实例
reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement,num, [seed])
takeOrdered(n)
aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
fold(num)(func)
countByKey
foreach(func)
这里没有列出所有的算子,更多算子可以到RDD的源码中查看