zoukankan      html  css  js  c++  java
  • Spark源码解读1-算子操作

    第一部分:Transformation算子操作,延迟操作,返回新的RDD

    Map算子

    1 /**
    2  *对RDD中每个元素进行操作,返回一个新的RDD
    3  * Return a new RDD by applying a function to all elements of this RDD.
    4  */
    5 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    6   val cleanF = sc.clean(f)
    7   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    8 }

    FlatMap算子

    1  /**
    2    *先map再flat
    3    *  Return a new RDD by first applying a function to all elements of this
    4    *  RDD, and then flattening the results.
    5    */
    6   def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    7     val cleanF = sc.clean(f)
    8     new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    9   }

    Filter算子

     1 /**
     2    *保留为true
     3    * Return a new RDD containing only the elements that satisfy a predicate.
     4    */
     5   def filter(f: T => Boolean): RDD[T] = withScope {
     6     val cleanF = sc.clean(f)
     7     new MapPartitionsRDD[T, T](
     8       this,
     9       (context, pid, iter) => iter.filter(cleanF),
    10       preservesPartitioning = true)
    11   }

    Distinct算子(有参数)

    1  /**
    2    *有参数,去重,先mapToPair,然后reduceBykey实现去重,再取Pair的第一个即原来的值
    3    * Return a new RDD containing the distinct elements in this RDD.
    4    */
    5   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    6     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    7   }

    Distinct算子(无参数)

    1 /**
    2    *其实是调用有参数的Distinct方法
    3    * Return a new RDD containing the distinct elements in this RDD.
    4    */
    5   def distinct(): RDD[T] = withScope {
    6     distinct(partitions.length)
    7   }

    Coalesce算子

     1 1  /**
     2  2    * Return a new RDD that is reduced into `numPartitions` partitions.
     3  3    *用于减少Partition数量,默认shuffle=false
     4  4    * This results in a narrow dependency, e.g. if you go from 1000 partitions
     5  5    * to 100 partitions, there will not be a shuffle, instead each of the 100
     6  6    * new partitions will claim 10 of the current partitions.
     7  7    *因为减少Partition是窄依赖,没有shuffle
     8  8    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
     9  9    *如果Partition数量减少过多,会造成计算在很少的节点上进行造成资源浪费等,这时将shuffle=true,可以并行在原有所有节点上进行shuffle的map端
    10 10    * this may result in your computation taking place on fewer nodes than
    11 11    * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
    12 12    * you can pass shuffle = true. This will add a shuffle step, but means the
    13 13    * current upstream partitions will be executed in parallel (per whatever
    14 14    * the current partitioning is).
    15 15    *增加partition数量,需要数据重新分布,重新对partition取hash
    16 16    * Note: With shuffle = true, you can actually coalesce to a larger number
    17 17    * of partitions. This is useful if you have a small number of partitions,
    18 18    * say 100, potentially with a few partitions being abnormally large. Calling
    19 19    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
    20 20    * data distributed using a hash partitioner.
    21 21    */
    22 22   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
    23 23       : RDD[T] = withScope {
    24 24     if (shuffle) {
    25 25       /** Distributes elements evenly across output partitions, starting from a random partition. */
    26 26       val distributePartition = (index: Int, items: Iterator[T]) => {
    27 27         var position = (new Random(index)).nextInt(numPartitions)
    28 28         items.map { t =>
    29 29           // Note that the hash code of the key will just be the key itself. The HashPartitioner
    30 30           // will mod it with the number of total partitions.
    31 31           position = position + 1
    32 32           (position, t)
    33 33         }
    34 34       } : Iterator[(Int, T)]
    35 35 
    36 36       // include a shuffle step so that our upstream tasks are still distributed
    37 37       new CoalescedRDD(
    38 38         new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    39 39         new HashPartitioner(numPartitions)),
    40 40         numPartitions).values
    41 41     } else {
    42 42       new CoalescedRDD(this, numPartitions)
    43 43     }
    44 44   }

    Repartition算子

     1 /**
     2    *内部调用Coalesce算子,参数NumPartition数量增大,shuffle=true  
     3    * Return a new RDD that has exactly numPartitions partitions.
     4    *
     5    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
     6    * a shuffle to redistribute data.
     7    *
     8    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
     9    * which can avoid performing a shuffle.
    10    */
    11   def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    12     coalesce(numPartitions, shuffle = true)
    13   }

    Sample算子

     1 /**
     2    * Return a sampled subset of this RDD.
     3    * 取样算子
     4    * @param withReplacement can elements be sampled multiple times (replaced when sampled out)是否放回抽样
     5    * @param fraction expected size of the sample as a fraction of this RDD's size   抽样比例
     6    *  without replacement: probability that each element is chosen; fraction must be [0, 1]
     7    *  with replacement: expected number of times each element is chosen; fraction must be >= 0
     8    * @param seed seed for the random number generator   随机数生成器
     9    */
    10   def sample(
    11       withReplacement: Boolean,
    12       fraction: Double,
    13       seed: Long = Utils.random.nextLong): RDD[T] = withScope {
    14     require(fraction >= 0.0, "Negative fraction value: " + fraction)
    15     if (withReplacement) {
    16       new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    17     } else {
    18       new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    19     }
    20   }

    RandomSplit算子

     1  /**
     2    * Randomly splits this RDD with the provided weights.
     3    * 按比例随机切割RDD,如果随机数列和不为1,会转换成1,然后随机切割
     4    * @param weights weights for splits, will be normalized if they don't sum to 1
     5    * @param seed random seed
     6    *
     7    * @return split RDDs in an array
     8    */
     9   def randomSplit(
    10       weights: Array[Double],
    11       seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {
    12     val sum = weights.sum
    13     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
    14     normalizedCumWeights.sliding(2).map { x =>
    15       randomSampleWithRange(x(0), x(1), seed)
    16     }.toArray
    17   }

    RandomSampleWithRange算子

     1  /**
     2    * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability
     3    * range.  有范围的随机取样
     4    * @param lb lower bound to use for the Bernoulli sampler 下限
     5    * @param ub upper bound to use for the Bernoulli sampler 上限 
     6    * @param seed the seed for the Random number generator   随机数种子
     7    * @return A random sub-sample of the RDD without replacement.返回不放回随机取样的RDD
     8    */
     9   private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {
    10     this.mapPartitionsWithIndex( { (index, partition) =>
    11       val sampler = new BernoulliCellSampler[T](lb, ub)
    12       sampler.setSeed(seed + index)
    13       sampler.sample(partition)
    14     }, preservesPartitioning = true)
    15   }


    我要把所有的坑都趟平!
  • 相关阅读:
    逐浪CMSv8.2发布-集成Node与Vue脚手架和PowerShell支持的新一代网站管理系统
    文化赢未来-智能做字体-逐浪字体大师1.0发布
    逐浪CMS对用户注册字段正则的自由定义(注册字段必填)
    两大高招逐浪CMS中定义省地市县三级字段显示方式
    开放融合易用@门户移动开发新体验-逐浪CMS v8.0.1全面发布[基于dotNET Core]
    点触科技安全验证新模式与逐浪CMS3.9.3新功能预览
    摩拜ofo挥师三四线市场 第二梯队面临"团灭"危机
    谷歌提出新的字体调用方案帮助提高中文字体的加载速度
    [图]Windows 10 Build 16273版本更新发布:新增可变式字体Bahnschrift
    rabbitMQ面试题 整理给自己
  • 原文地址:https://www.cnblogs.com/loveling-0239/p/5882607.html
Copyright © 2011-2022 走看看