zoukankan      html  css  js  c++  java
  • 【原】1.1RDD源码解读(二)

    (6)transformation 操作,通过外在的不同RDD表现形式来达到内部数据的处理过程。这类操作并不会触发作业的执行,也常被称为lazy操作。

            大部分操作会生成并返回一个新的RDD,例sortByKey就不会产生一个新的RDD。

    1) map函数,一行数据经过map函数处理后还是一行数据

    //将map函数作用在RDD的所有元素上,并返回一个新的RDD

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //将函数作用在父RDD的每一个分区上

    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }

    2) flatMap函数,和map函数功能类似,但一行数据经过flatMap函数处理后是多行数据

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }

    3) filter函数,将不满足条件的数据过滤掉,并返回一个新的RDD

    def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
    this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
    }

    4) distinct函数,将重复的元素去掉,返回不同的元素,并返回一个新的RDD

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    }

    具体过程如下所示:

    图片1

    5) repartition函数,对RDD重新分区,并返回一个新的RDD

    该方法用于增加或减少RDD的并行度,实际上是通过shuffle来分发数据的

    如果想要减少RDD的分区,考虑使用‘coalesce’函数,避免shuffle

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      coalesce(numPartitions, shuffle = true)
    }

    6) coalesce函数,将RDD重新分区并返回一个新的RDD

       这个操作是窄依赖,比如,如果你从1000个分区合并为100个分区,这个合并过程并没有shuffle,而是100个新的分区将每个分区将是原来的10个分区。

    def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
        : RDD[T] = withScope {
    if (shuffle) {
    //从一个随机的分区开始,将数据均匀地分布到新分区上

    val distributePartition = (index: Int, items: Iterator[T]) => {
    var position = (new Random(index)).nextInt(numPartitions)
          items.map { t =>
    position = position + 1
    (position, t)
          }
        } : Iterator[(Int, T)]
    new CoalescedRDD(
    new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    new HashPartitioner(numPartitions)),
          numPartitions).values
      } else {
    new CoalescedRDD(this, numPartitions)
      }
    }

    7) sample函数,随机返回RDD的部分样例数据

    def sample(
        withReplacement: Boolean,
        fraction: Double,
        seed: Long = Utils.random.nextLong): RDD[T] = withScope {
    require(fraction >= 0.0, "Negative fraction value: " + fraction)
    if (withReplacement) {
    new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
      } else {
    new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
      }
    }

    8) sortBy将RDD根据所给的key函数排序,并返回本身,注意不是创建一个新的RDD,同时也说明并不是所有的transformation都是创建一个新的RDD

    def sortBy[K](
        f: (T) => K,
        ascending: Boolean = true,
        numPartitions: Int = this.partitions.length)
        (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
          .sortByKey(ascending, numPartitions)
          .values
    }

    9) glom函数,将每个分区的元素合并成一个数组并返回一个新的RDD

    def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
    }

    10) groupByKey函数,返回key和相同key的value结合组成的RDD。

    这个操作可能开销比较大,如果想要求总数sum或均值,用PairRDDFunctions.aggregateByKey或PairRDDFunctions.reduceByKey会有更好的效果。

    def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
        : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
    }

    (7)Action操作,触发作业的执行并将返回值反馈给用户程序

    1) foreach函数,将此函数应用于RDD的所有元素上

    def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }

    2) foreachPartition函数,将此函数作用于RDD的每一个分区上,比如连接数据库的连接可以一个分区共用一个连接

    def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
      sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
    }

    3) collect函数,将包含在RDD中所有的元素以数组形式返回

    def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
    }

    4) count函数,返回RDD中元素的个数

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

    5) take函数,取RDD的前num元素。先取一个分区的元素,如果不够再取其他分区的元素。

    def take(num: Int): Array[T] = withScope {
    if (num == 0) {
    new Array[T](0)
      } else {
    val buf = new ArrayBuffer[T]
    val totalParts = this.partitions.length
    var partsScanned = 0
    while (buf.size < num && partsScanned < totalParts) {
    var numPartsToTry = 1
    if (partsScanned > 0) {
    if (buf.size == 0) {
              numPartsToTry = partsScanned * 4
    } else {
    numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
              numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
            }
          }
    val left = num - buf.size
    val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
    val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
          res.foreach(buf ++= _.take(num - buf.size))
          partsScanned += numPartsToTry
        }
        buf.toArray
      }
    }

    6) first函数,取RDD中的第一个元素,实际上是take(1)操作

    def first(): T = withScope {
      take(1) match {
    case Array(t) => t
    case _ => throw new UnsupportedOperationException("empty collection")
      }
    }

    7) top函数,返回RDD中的top k,隐式排序按照Ordering[T]排序,即降序,刚好和[takeOrdered]相反

    def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
      takeOrdered(num)(ord.reverse)
    }

    8) saveAsTextFile函数,将RDD保存为文本文件

    def saveAsTextFile(path: String): Unit = withScope {
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
    val text = new Text()
        iter.map { x =>
          text.set(x.toString)
          (NullWritable.get(), text)
        }
      }
      RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
        .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
    }

    9) saveAsObjectFile函数,将RDD中的元素序列化并保存为文件

    def saveAsObjectFile(path: String): Unit = withScope {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
        .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
        .saveAsSequenceFile(path)
    }

    (8)隐式转换

    在RDD object中定义了好多隐式转换函数,这些函数额外提供了许多本身不具有的功能

    比如将RDD隐式转化为PairRDDFunctions,那么该RDD就具有了reduceByKey等功能。

    implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
    }

  • 相关阅读:
    poj1047
    poj1129
    poj1050
    C#中break、continue的用法
    关于一个不大常用的SQL数据类型-UNIQUEIDENTIFIER
    关于net2.0里面新出现的类backgroundworker的应用
    深入讲解SQL Union和Union All的使用方法
    让你一次性搞定堆、栈、值类型、引用类型 (转载)
    EXEC与sp_executesql的区别及应用(转)
    几种常用排序算法总结(转载)
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5263929.html
Copyright © 2011-2022 走看看