zoukankan      html  css  js  c++  java
  • 【Spark】RDD操作具体解释3——键值型Transformation算子

    Transformation处理的数据为Key-Value形式的算子大致能够分为:输入分区与输出分区一对一、聚集、连接操作。

    输入分区与输出分区一对一

    mapValues

    mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不正确Key进行处理。

    方框代表RDD分区。a=>a+2代表仅仅对( V1。 1)数据中的1进行加2操作,返回结果为3。

    源代码:

      /**
       * Pass each value in the key-value pair RDD through a map function without changing the keys;
       * this also retains the original RDD's partitioning.
       */
      def mapValues[U](f: V => U): RDD[(K, U)] = {
        val cleanF = self.context.clean(f)
        new MapPartitionsRDD[(K, U), (K, V)](self,
          (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
          preservesPartitioning = true)
      }

    单个RDD或两个RDD聚集

    (1)combineByKey

    combineByKey是对单个Rdd的聚合。相当于将元素为(Int。Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。
    定义combineByKey算子的说明例如以下:

    • createCombiner: V => C。 在C不存在的情况下,如通过V创建seq C。
    • mergeValue:(C, V) => C, 当C已经存在的情况下。须要merge,如把item V加到seq
      C中,或者叠加。
    • mergeCombiners:(C,C) => C,合并两个C。
    • partitioner: Partitioner(分区器),Shuffle时须要通过Partitioner的分区策略进行分区。

    • mapSideCombine: Boolean=true, 为了减小传输量,非常多combine能够在map端先做。比如, 叠加能够先在一个partition中把全部同样的Key的Value叠加, 再shuffle。

    • serializerClass:String=null,传输须要序列化,用户能够自己定义序列化类。


    方框代表RDD分区。 通过combineByKey,将(V1,2)、 (V1,1)数据合并为(V1,Seq(2,1))。

    源代码:

      /**
       * Generic function to combine the elements for each key using a custom set of aggregation
       * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
       * Note that V and C can be different -- for example, one might group an RDD of type
       * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
       *
       * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
       * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
       * - `mergeCombiners`, to combine two C's into a single one.
       *
       * In addition, users can control the partitioning of the output RDD, and whether to perform
       * map-side aggregation (if a mapper can produce multiple items with the same key).
       */
      def combineByKey[C](createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null): RDD[(K, C)] = {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("Default partitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }
    
      /**
       * Simplified version of combineByKey that hash-partitions the output RDD.
       */
      def combineByKey[C](createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          numPartitions: Int): RDD[(K, C)] = {
        combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
      }

    (2)reduceByKey

    reduceByKey是更简单的一种情况。仅仅是两个值合并成一个值,所以createCombiner非常easy,就是直接返回v。而mergeValue和mergeCombiners的逻辑同样。没有差别。

    方框代表RDD分区。 通过用户自己定义函数(A。B)=>(A+B)。将同样Key的数据(V1,2)、(V1,1)的value相加。结果为(V1,3)。

    源代码:

      /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce.
       */
      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
        combineByKey[V]((v: V) => v, func, func, partitioner)
      }
    
      /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
       */
      def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
        reduceByKey(new HashPartitioner(numPartitions), func)
      }
    
      /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
       * parallelism level.
       */
      def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
        reduceByKey(defaultPartitioner(self), func)
      }

    (3)partitionBy

    partitionBy函数对RDD进行分区操作。
    假设原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,假设不一致,则相当于依据分区器生成一个新的ShuffledRDD。

    方框代表RDD分区。

    通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。

    源代码:

      /**
       * Return a copy of the RDD partitioned using the specified partitioner.
       */
      def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
        if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
        if (self.partitioner == Some(partitioner)) {
          self
        } else {
          new ShuffledRDD[K, V, V](self, partitioner)
        }
      }

    (4)cogroup

    cogroup函数将两个RDD进行协同划分。

    对在两个RDD中的Key-Value类型的元素,每一个RDD同样Key的元素分别聚合为一个集合,而且返回两个RDD中相应Key的元素集合的迭代器(K, (Iterable[V], Iterable[w]))。当中,Key和Value,Value是两个RDD下同样Key的两个数据集合的迭代器所构成的元组。

    慷慨框代表RDD。慷慨框内的小方框代表RDD中的分区。 将RDD1中的数据(U1,1)、(U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。

    源代码:

      /**
       * For each key k in `this` or `other1` or `other2` or `other3`,
       * return a resulting RDD that contains a tuple with the list of values
       * for that key in `this`, `other1`, `other2` and `other3`.
       */
      def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
          other2: RDD[(K, W2)],
          other3: RDD[(K, W3)],
          partitioner: Partitioner)
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
        if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
        val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
        cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
           (vs.asInstanceOf[Iterable[V]],
             w1s.asInstanceOf[Iterable[W1]],
             w2s.asInstanceOf[Iterable[W2]],
             w3s.asInstanceOf[Iterable[W3]])
        }
      }
    
      /**
       * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
       * list of values for that key in `this` as well as `other`.
       */
      def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
          : RDD[(K, (Iterable[V], Iterable[W]))]  = {
        if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
        val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
        cg.mapValues { case Array(vs, w1s) =>
          (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
        }
      }
    
      /**
       * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
       * tuple with the list of values for that key in `this`, `other1` and `other2`.
       */
      def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
        if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
        val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
        cg.mapValues { case Array(vs, w1s, w2s) =>
          (vs.asInstanceOf[Iterable[V]],
            w1s.asInstanceOf[Iterable[W1]],
            w2s.asInstanceOf[Iterable[W2]])
        }
      }
    
      /**
       * For each key k in `this` or `other1` or `other2` or `other3`,
       * return a resulting RDD that contains a tuple with the list of values
       * for that key in `this`, `other1`, `other2` and `other3`.
       */
      def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
        cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
      }
    
      /**
       * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
       * list of values for that key in `this` as well as `other`.
       */
      def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
        cogroup(other, defaultPartitioner(self, other))
      }
    
      /**
       * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
       * tuple with the list of values for that key in `this`, `other1` and `other2`.
       */
      def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
        cogroup(other1, other2, defaultPartitioner(self, other1, other2))
      }
    
      /**
       * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
       * list of values for that key in `this` as well as `other`.
       */
      def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
        cogroup(other, new HashPartitioner(numPartitions))
      }
    
      /**
       * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
       * tuple with the list of values for that key in `this`, `other1` and `other2`.
       */
      def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
        cogroup(other1, other2, new HashPartitioner(numPartitions))
      }
    
      /**
       * For each key k in `this` or `other1` or `other2` or `other3`,
       * return a resulting RDD that contains a tuple with the list of values
       * for that key in `this`, `other1`, `other2` and `other3`.
       */
      def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
          other2: RDD[(K, W2)],
          other3: RDD[(K, W3)],
          numPartitions: Int)
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
        cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
      }

    连接

    (1)join

    join对两个须要连接的RDD进行cogroup函数操作。cogroup操作之后形成的新RDD,对每一个key下的元素进行笛卡尔积操作,返回的结果再展平。相应Key下的全部元组形成一个集合,最后返回RDD[(K。(V。W))]。
    join的本质是通过cogroup算子先进行协同划分。再通过flatMapValues将合并的数据打散。



    对两个RDD的join操作示意图。 慷慨框代表RDD。小方框代表RDD中的分区。

    函数对拥有同样Key的元素(比如V1)为Key,以做连接后的数据结果为(V1,(1,1))和(V1,(1,2))。

    源代码:

      /**
       * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
       * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
       * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
       */
      def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
        this.cogroup(other, partitioner).flatMapValues( pair =>
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
        )
      }

    (2)leftOuterJoin和rightOuterJoin

    LeftOuterJoin(左外连接)和RightOuterJoin(右外连接)相当于在join的基础上先推断一側的RDD元素是否为空。假设为空,则填充为空。 假设不为空,则将数据进行连接运算,并返回结果。

    源代码:

      /**
       * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
       * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
       * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
       * partition the output RDD.
       */
      def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
        this.cogroup(other, partitioner).flatMapValues { pair =>
          if (pair._2.isEmpty) {
            pair._1.iterator.map(v => (v, None))
          } else {
            for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
          }
        }
      }
    
      /**
       * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
       * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
       * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
       * partition the output RDD.
       */
      def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
          : RDD[(K, (Option[V], W))] = {
        this.cogroup(other, partitioner).flatMapValues { pair =>
          if (pair._1.isEmpty) {
            pair._2.iterator.map(w => (None, w))
          } else {
            for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
          }
        }
      }

    转载请注明作者Jason Ding及其出处
    GitCafe博客主页(http://jasonding1354.gitcafe.io/)
    Github博客主页(http://jasonding1354.github.io/)
    CSDN博客(http://blog.csdn.net/jasonding1354)
    简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
    Google搜索jasonding1354进入我的博客主页

  • 相关阅读:
    【转载】LTE中RB、RE、CP、REG、CCE、子载波
    LTE中,DCI和UCI为什么要定义那么多格式
    LTE中的PDCCH介绍
    ARQ
    (转)MYSQL远程登录权限设置
    (转)忘记wamp-mysql数据库root用户密码重置方法
    phpwind部署问题
    在aliyun遇到一些问题
    (转)PHP5使用cookie时报错 cannot modify header information
    (转)WAMP多站点配置
  • 原文地址:https://www.cnblogs.com/mthoutai/p/7346967.html
Copyright © 2011-2022 走看看