第一部分:Transformation算子操作,延迟操作,返回新的RDD
Map算子
1 /** 2 *对RDD中每个元素进行操作,返回一个新的RDD 3 * Return a new RDD by applying a function to all elements of this RDD. 4 */ 5 def map[U: ClassTag](f: T => U): RDD[U] = withScope { 6 val cleanF = sc.clean(f) 7 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) 8 }
FlatMap算子
1 /** 2 *先map再flat 3 * Return a new RDD by first applying a function to all elements of this 4 * RDD, and then flattening the results. 5 */ 6 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { 7 val cleanF = sc.clean(f) 8 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) 9 }
Filter算子
1 /** 2 *保留为true 3 * Return a new RDD containing only the elements that satisfy a predicate. 4 */ 5 def filter(f: T => Boolean): RDD[T] = withScope { 6 val cleanF = sc.clean(f) 7 new MapPartitionsRDD[T, T]( 8 this, 9 (context, pid, iter) => iter.filter(cleanF), 10 preservesPartitioning = true) 11 }
Distinct算子(有参数)
1 /** 2 *有参数,去重,先mapToPair,然后reduceBykey实现去重,再取Pair的第一个即原来的值 3 * Return a new RDD containing the distinct elements in this RDD. 4 */ 5 def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 6 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) 7 }
Distinct算子(无参数)
1 /** 2 *其实是调用有参数的Distinct方法 3 * Return a new RDD containing the distinct elements in this RDD. 4 */ 5 def distinct(): RDD[T] = withScope { 6 distinct(partitions.length) 7 }
Coalesce算子
1 1 /** 2 2 * Return a new RDD that is reduced into `numPartitions` partitions. 3 3 *用于减少Partition数量,默认shuffle=false 4 4 * This results in a narrow dependency, e.g. if you go from 1000 partitions 5 5 * to 100 partitions, there will not be a shuffle, instead each of the 100 6 6 * new partitions will claim 10 of the current partitions. 7 7 *因为减少Partition是窄依赖,没有shuffle 8 8 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, 9 9 *如果Partition数量减少过多,会造成计算在很少的节点上进行造成资源浪费等,这时将shuffle=true,可以并行在原有所有节点上进行shuffle的map端 10 10 * this may result in your computation taking place on fewer nodes than 11 11 * you like (e.g. one node in the case of numPartitions = 1). To avoid this, 12 12 * you can pass shuffle = true. This will add a shuffle step, but means the 13 13 * current upstream partitions will be executed in parallel (per whatever 14 14 * the current partitioning is). 15 15 *增加partition数量,需要数据重新分布,重新对partition取hash 16 16 * Note: With shuffle = true, you can actually coalesce to a larger number 17 17 * of partitions. This is useful if you have a small number of partitions, 18 18 * say 100, potentially with a few partitions being abnormally large. Calling 19 19 * coalesce(1000, shuffle = true) will result in 1000 partitions with the 20 20 * data distributed using a hash partitioner. 21 21 */ 22 22 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) 23 23 : RDD[T] = withScope { 24 24 if (shuffle) { 25 25 /** Distributes elements evenly across output partitions, starting from a random partition. */ 26 26 val distributePartition = (index: Int, items: Iterator[T]) => { 27 27 var position = (new Random(index)).nextInt(numPartitions) 28 28 items.map { t => 29 29 // Note that the hash code of the key will just be the key itself. The HashPartitioner 30 30 // will mod it with the number of total partitions. 31 31 position = position + 1 32 32 (position, t) 33 33 } 34 34 } : Iterator[(Int, T)] 35 35 36 36 // include a shuffle step so that our upstream tasks are still distributed 37 37 new CoalescedRDD( 38 38 new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), 39 39 new HashPartitioner(numPartitions)), 40 40 numPartitions).values 41 41 } else { 42 42 new CoalescedRDD(this, numPartitions) 43 43 } 44 44 }
Repartition算子
1 /** 2 *内部调用Coalesce算子,参数NumPartition数量增大,shuffle=true 3 * Return a new RDD that has exactly numPartitions partitions. 4 * 5 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 6 * a shuffle to redistribute data. 7 * 8 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 9 * which can avoid performing a shuffle. 10 */ 11 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 12 coalesce(numPartitions, shuffle = true) 13 }
Sample算子
1 /** 2 * Return a sampled subset of this RDD. 3 * 取样算子 4 * @param withReplacement can elements be sampled multiple times (replaced when sampled out)是否放回抽样 5 * @param fraction expected size of the sample as a fraction of this RDD's size 抽样比例 6 * without replacement: probability that each element is chosen; fraction must be [0, 1] 7 * with replacement: expected number of times each element is chosen; fraction must be >= 0 8 * @param seed seed for the random number generator 随机数生成器 9 */ 10 def sample( 11 withReplacement: Boolean, 12 fraction: Double, 13 seed: Long = Utils.random.nextLong): RDD[T] = withScope { 14 require(fraction >= 0.0, "Negative fraction value: " + fraction) 15 if (withReplacement) { 16 new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) 17 } else { 18 new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) 19 } 20 }
RandomSplit算子
1 /** 2 * Randomly splits this RDD with the provided weights. 3 * 按比例随机切割RDD,如果随机数列和不为1,会转换成1,然后随机切割 4 * @param weights weights for splits, will be normalized if they don't sum to 1 5 * @param seed random seed 6 * 7 * @return split RDDs in an array 8 */ 9 def randomSplit( 10 weights: Array[Double], 11 seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope { 12 val sum = weights.sum 13 val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) 14 normalizedCumWeights.sliding(2).map { x => 15 randomSampleWithRange(x(0), x(1), seed) 16 }.toArray 17 }
RandomSampleWithRange算子
1 /** 2 * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability 3 * range. 有范围的随机取样 4 * @param lb lower bound to use for the Bernoulli sampler 下限 5 * @param ub upper bound to use for the Bernoulli sampler 上限 6 * @param seed the seed for the Random number generator 随机数种子 7 * @return A random sub-sample of the RDD without replacement.返回不放回随机取样的RDD 8 */ 9 private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = { 10 this.mapPartitionsWithIndex( { (index, partition) => 11 val sampler = new BernoulliCellSampler[T](lb, ub) 12 sampler.setSeed(seed + index) 13 sampler.sample(partition) 14 }, preservesPartitioning = true) 15 }