常用的一些简单算子:
map(func) |
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
flatMap(func) |
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
partitionBy |
对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD. |
mapPartitions(func) |
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。 假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区 |
mapPartitionsWithIndex(func) |
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
举个栗子:

1 scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))) 2 rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24 3 4 scala> :paste 5 // Entering paste mode (ctrl-D to finish) 6 7 def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = { 8 var woman = List[String]() 9 while (iter.hasNext){ 10 val next = iter.next() 11 next match { 12 case (_,"female") => woman = next._1 :: woman 13 case _ => 14 } 15 } 16 woman.iterator 17 } 18 19 // Exiting paste mode, now interpreting. 20 21 partitionsFun: (iter: Iterator[(String, String)])Iterator[String] 22 23 scala> val result = rdd.mapPartitions(partitionsFun) 24 result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at mapPartitions at <console>:28 25 26 scala> result.collect() 27 res13: Array[String] = Array(kpop, lucy)
sample(withReplacement, fraction, seed) |
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值) |
takeSample |
和Sample的区别是:takeSample返回的是最终的结果集合。 |
union(otherDataset) |
对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) |
对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) |
对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。 |
reduceByKey(func, [numTasks]) |
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置 |
groupByKey |
groupByKey也是对每个key进行操作,但只生成一个sequence。 group.collect() res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1))) |
combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) |
对相同K,把V合并成一个集合. createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并 mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。 |