zoukankan      html  css  js  c++  java
  • RDD之五:Key-Value型Transformation算子

    Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一、聚集、连接操作。

    输入分区与输出分区一对一

    mapValues

    mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。 

    方框代表RDD分区。a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Pass each value in the key-value pair RDD through a map function without changing the keys;  
    3.  * this also retains the original RDD's partitioning.  
    4.  */  
    5. def mapValues[U](f: V => U): RDD[(K, U)] = {  
    6.   val cleanF = self.context.clean(f)  
    7.   new MapPartitionsRDD[(K, U), (K, V)](self,  
    8.     (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },  
    9.     preservesPartitioning = true)  
    10. }  

    单个RDD或两个RDD聚集

    (1)combineByKey

    combineByKey是对单个Rdd的聚合。相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。 
    定义combineByKey算子的说明如下:

    • createCombiner: V => C, 在C不存在的情况下,如通过V创建seq C。
    • mergeValue:(C, V) => C, 当C已经存在的情况下,需要merge,如把item V加到seq 
      C中,或者叠加。
    • mergeCombiners:(C,C) => C,合并两个C。
    • partitioner: Partitioner(分区器),Shuffle时需要通过Partitioner的分区策略进行分区。
    • mapSideCombine: Boolean=true, 为了减小传输量,很多combine可以在map端先做。例如, 叠加可以先在一个partition中把所有相同的Key的Value叠加, 再shuffle。
    • serializerClass:String=null,传输需要序列化,用户可以自定义序列化类。


    方框代表RDD分区。 通过combineByKey,将(V1,2)、 (V1,1)数据合并为(V1,Seq(2,1))。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Generic function to combine the elements for each key using a custom set of aggregation  
    3.  * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C  
    4.  * Note that V and C can be different -- for example, one might group an RDD of type  
    5.  * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:  
    6.  *  
    7.  * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)  
    8.  * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)  
    9.  * - `mergeCombiners`, to combine two C's into a single one.  
    10.  *  
    11.  * In addition, users can control the partitioning of the output RDD, and whether to perform  
    12.  * map-side aggregation (if a mapper can produce multiple items with the same key).  
    13.  */  
    14. def combineByKey[C](createCombiner: V => C,  
    15.     mergeValue: (C, V) => C,  
    16.     mergeCombiners: (C, C) => C,  
    17.     partitioner: Partitioner,  
    18.     mapSideCombine: Boolean = true,  
    19.     serializer: Serializer = null): RDD[(K, C)] = {  
    20.   require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0  
    21.   if (keyClass.isArray) {  
    22.     if (mapSideCombine) {  
    23.       throw new SparkException("Cannot use map-side combining with array keys.")  
    24.     }  
    25.     if (partitioner.isInstanceOf[HashPartitioner]) {  
    26.       throw new SparkException("Default partitioner cannot partition array keys.")  
    27.     }  
    28.   }  
    29.   val aggregator = new Aggregator[K, V, C](  
    30.     self.context.clean(createCombiner),  
    31.     self.context.clean(mergeValue),  
    32.     self.context.clean(mergeCombiners))  
    33.   if (self.partitioner == Some(partitioner)) {  
    34.     self.mapPartitions(iter => {  
    35.       val context = TaskContext.get()  
    36.       new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))  
    37.     }, preservesPartitioning = true)  
    38.   } else {  
    39.     new ShuffledRDD[K, V, C](self, partitioner)  
    40.       .setSerializer(serializer)  
    41.       .setAggregator(aggregator)  
    42.       .setMapSideCombine(mapSideCombine)  
    43.   }  
    44. }  
    45.   
    46. /**  
    47.  * Simplified version of combineByKey that hash-partitions the output RDD.  
    48.  */  
    49. def combineByKey[C](createCombiner: V => C,  
    50.     mergeValue: (C, V) => C,  
    51.     mergeCombiners: (C, C) => C,  
    52.     numPartitions: Int): RDD[(K, C)] = {  
    53.   combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))  
    54. }  

    (2)reduceByKey

    reduceByKey是更简单的一种情况,只是两个值合并成一个值,所以createCombiner很简单,就是直接返回v,而mergeValue和mergeCombiners的逻辑相同,没有区别。

    方框代表RDD分区。 通过用户自定义函数(A,B)=>(A+B),将相同Key的数据(V1,2)、(V1,1)的value相加,结果为(V1,3)。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Merge the values for each key using an associative reduce function. This will also perform  
    3.  * the merging locally on each mapper before sending results to a reducer, similarly to a  
    4.  * "combiner" in MapReduce.  
    5.  */  
    6. def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {  
    7.   combineByKey[V]((v: V) => v, func, func, partitioner)  
    8. }  
    9.   
    10. /**  
    11.  * Merge the values for each key using an associative reduce function. This will also perform  
    12.  * the merging locally on each mapper before sending results to a reducer, similarly to a  
    13.  * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.  
    14.  */  
    15. def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {  
    16.   reduceByKey(new HashPartitioner(numPartitions), func)  
    17. }  
    18.   
    19. /**  
    20.  * Merge the values for each key using an associative reduce function. This will also perform  
    21.  * the merging locally on each mapper before sending results to a reducer, similarly to a  
    22.  * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/  
    23.  * parallelism level.  
    24.  */  
    25. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {  
    26.   reduceByKey(defaultPartitioner(self), func)  
    27. }  

    (3)partitionBy

    partitionBy函数对RDD进行分区操作。 
    如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。 

    方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Return a copy of the RDD partitioned using the specified partitioner.  
    3.  */  
    4. def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {  
    5.   if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {  
    6.     throw new SparkException("Default partitioner cannot partition array keys.")  
    7.   }  
    8.   if (self.partitioner == Some(partitioner)) {  
    9.     self  
    10.   } else {  
    11.     new ShuffledRDD[K, V, V](self, partitioner)  
    12.   }  
    13. }  

    (4)cogroup

    cogroup函数将两个RDD进行协同划分。对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器(K, (Iterable[V], Iterable[w]))。其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。

    大方框代表RDD,大方框内的小方框代表RDD中的分区。 将RDD1中的数据(U1,1)、(U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * For each key k in `this` or `other1` or `other2` or `other3`,  
    3.  * return a resulting RDD that contains a tuple with the list of values  
    4.  * for that key in `this`, `other1`, `other2` and `other3`.  
    5.  */  
    6. def cogroup[W1, W2, W3](other1: RDD[(K, W1)],  
    7.     other2: RDD[(K, W2)],  
    8.     other3: RDD[(K, W3)],  
    9.     partitioner: Partitioner)  
    10.     : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {  
    11.   if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {  
    12.     throw new SparkException("Default partitioner cannot partition array keys.")  
    13.   }  
    14.   val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)  
    15.   cg.mapValues { case Array(vs, w1s, w2s, w3s) =>  
    16.      (vs.asInstanceOf[Iterable[V]],  
    17.        w1s.asInstanceOf[Iterable[W1]],  
    18.        w2s.asInstanceOf[Iterable[W2]],  
    19.        w3s.asInstanceOf[Iterable[W3]])  
    20.   }  
    21. }  
    22.   
    23. /**  
    24.  * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the  
    25.  * list of values for that key in `this` as well as `other`.  
    26.  */  
    27. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)  
    28.     : RDD[(K, (Iterable[V], Iterable[W]))]  = {  
    29.   if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {  
    30.     throw new SparkException("Default partitioner cannot partition array keys.")  
    31.   }  
    32.   val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)  
    33.   cg.mapValues { case Array(vs, w1s) =>  
    34.     (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])  
    35.   }  
    36. }  
    37.   
    38. /**  
    39.  * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a  
    40.  * tuple with the list of values for that key in `this`, `other1` and `other2`.  
    41.  */  
    42. def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)  
    43.     : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {  
    44.   if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {  
    45.     throw new SparkException("Default partitioner cannot partition array keys.")  
    46.   }  
    47.   val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)  
    48.   cg.mapValues { case Array(vs, w1s, w2s) =>  
    49.     (vs.asInstanceOf[Iterable[V]],  
    50.       w1s.asInstanceOf[Iterable[W1]],  
    51.       w2s.asInstanceOf[Iterable[W2]])  
    52.   }  
    53. }  
    54.   
    55. /**  
    56.  * For each key k in `this` or `other1` or `other2` or `other3`,  
    57.  * return a resulting RDD that contains a tuple with the list of values  
    58.  * for that key in `this`, `other1`, `other2` and `other3`.  
    59.  */  
    60. def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])  
    61.     : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {  
    62.   cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))  
    63. }  
    64.   
    65. /**  
    66.  * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the  
    67.  * list of values for that key in `this` as well as `other`.  
    68.  */  
    69. def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {  
    70.   cogroup(other, defaultPartitioner(self, other))  
    71. }  
    72.   
    73. /**  
    74.  * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a  
    75.  * tuple with the list of values for that key in `this`, `other1` and `other2`.  
    76.  */  
    77. def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])  
    78.     : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {  
    79.   cogroup(other1, other2, defaultPartitioner(self, other1, other2))  
    80. }  
    81.   
    82. /**  
    83.  * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the  
    84.  * list of values for that key in `this` as well as `other`.  
    85.  */  
    86. def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {  
    87.   cogroup(other, new HashPartitioner(numPartitions))  
    88. }  
    89.   
    90. /**  
    91.  * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a  
    92.  * tuple with the list of values for that key in `this`, `other1` and `other2`.  
    93.  */  
    94. def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)  
    95.     : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {  
    96.   cogroup(other1, other2, new HashPartitioner(numPartitions))  
    97. }  
    98.   
    99. /**  
    100.  * For each key k in `this` or `other1` or `other2` or `other3`,  
    101.  * return a resulting RDD that contains a tuple with the list of values  
    102.  * for that key in `this`, `other1`, `other2` and `other3`.  
    103.  */  
    104. def cogroup[W1, W2, W3](other1: RDD[(K, W1)],  
    105.     other2: RDD[(K, W2)],  
    106.     other3: RDD[(K, W3)],  
    107.     numPartitions: Int)  
    108.     : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {  
    109.   cogroup(other1, other2, other3, new HashPartitioner(numPartitions))  
    110. }  

    连接

    (1)join

    join对两个需要连接的RDD进行cogroup函数操作。cogroup操作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作,返回的结果再展平,对应Key下的所有元组形成一个集合,最后返回RDD[(K,(V,W))]。
    join的本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。 

    对两个RDD的join操作示意图。 大方框代表RDD,小方框代表RDD中的分区。函数对拥有相同Key的元素(例如V1)为Key,以做连接后的数据结果为(V1,(1,1))和(V1,(1,2))。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each  
    3.  * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and  
    4.  * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.  
    5.  */  
    6. def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {  
    7.   this.cogroup(other, partitioner).flatMapValues( pair =>  
    8.     for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)  
    9.   )  
    10. }  

    (2)leftOuterJoin和rightOuterJoin

    LeftOuterJoin(左外连接)和RightOuterJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并返回结果。

    源码:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /**  
    2.  * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the  
    3.  * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the  
    4.  * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to  
    5.  * partition the output RDD.  
    6.  */  
    7. def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {  
    8.   this.cogroup(other, partitioner).flatMapValues { pair =>  
    9.     if (pair._2.isEmpty) {  
    10.       pair._1.iterator.map(v => (v, None))  
    11.     } else {  
    12.       for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))  
    13.     }  
    14.   }  
    15. }  
    16.   
    17. /**  
    18.  * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the  
    19.  * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the  
    20.  * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to  
    21.  * partition the output RDD.  
    22.  */  
    23. def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)  
    24.     : RDD[(K, (Option[V], W))] = {  
    25.   this.cogroup(other, partitioner).flatMapValues { pair =>  
    26.     if (pair._1.isEmpty) {  
    27.       pair._2.iterator.map(w => (None, w))  
    28.     } else {  
    29.       for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)  
    30.     }  
    31.   }  
    32. }  

    原文链接:http://blog.csdn.net/jasonding1354

  • 相关阅读:
    写在开篇——过往总结
    线程池原理实现
    MD5加密工具类
    跨域及jsonp
    四种xml的解析方式
    浅析正则表达式—(原理篇)
    用JavaScript添加选择按钮的背景颜色和juqery添加选择按钮的背景色
    怎么用JavaScript实现tab切换
    vue.js2.0:如何搭建开发环境及构建项目
    vscode如何用浏览器预览运行html文件
  • 原文地址:https://www.cnblogs.com/duanxz/p/6327375.html
Copyright © 2011-2022 走看看