zoukankan      html  css  js  c++  java
  • 键值对的算子讲解 PairRDDFunctions

    1:groupByKey

    def groupByKey(): RDD[(K, Iterable[V])]

    根据key进行聚集,value组成一个列表,没有进行聚集,所以在有shuffle操作时候避免使用概算子,会增大通信数据量。需要考虑进行一个本地的Combiner,所以可以直接使用reduceByKey

    cala> p.collect
    res15: Array[(Int, Int)] = Array((1,1), (2,1), (1,1), (2,1), (1,1), (2,1), (3,1), (4,1), (5,1))
    
    scala> p.groupByKey.collect
    res16: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(1)), (2,CompactBuffer(1, 1, 1)), (1,CompactBuffer(1, 1, 1)), (3,CompactBuffer(1)), (5,CompactBuffer(1)))

    2:cogroup

    def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

    对每一个rdd先进行groupByKey,然后在对相同key的value在进行一个groupByKey,例如

    scala> p.collect
    res18: Array[(Int, Int)] = Array((1,1), (2,1), (1,1), (2,1), (1,1), (2,1), (3,1), (4,1), (5,1))
    
    scala> p.cogroup(p).collect
    res19: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((4,(CompactBuffer(1),CompactBuffer(1))), (2,(CompactBuffer(1, 1, 1),CompactBuffer(1, 1, 1))), (1,(CompactBuffer(1, 1, 1),CompactBuffer(1, 1, 1))), (3,(CompactBuffer(1),CompactBuffer(1))), (5,(CompactBuffer(1),CompactBuffer(1))))

    例如:(1,1)在pair中出现三次,所以cogroup之后(1,1)的结果是:

    (1,(CompactBuffer(1, 1, 1),CompactBuffer(1, 1, 1)))

     3:aggregateByKey

    def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

    说明:这个函数相对有点难懂,其他重载版本与此类似。该函数接受三个参数,一个初始值,两个函数:seqOp和comOp:

    seqOp是对分区进行具体的函数,zeroValue值只有在seqOp中有使用,在第二个函数comOp中就不在使用了。

    comOp是分区之间的combine函数(combine函数有点类似combiner的作用,进行聚集的函数)。

    例如:

        val data = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3), (2, 5)))
        def seq(a: Int, b: Int): Int = {
          println("seq: " + a + "	 " + b)
          math.max(a, b)
        }
        def comb(a: Int, b: Int): Int = {
          println("comb: " + a + "	 " + b)
          a + b
        }

    执行过程输出:

    seq: 1     3
    seq: 1     2
    seq: 1     4
    seq: 1     3
    seq: 1     5
    comb: 3     2
    comb: 5     4
    comb: 3     5

    说明:在调用seqOp函数时候,每一次都向(key,value)中的value和zeroValue进行求最大值,将该最大值作为key的value。执行完毕seqOp函数的状态为:

    ((1,(3,2,4)),(2(3,5)))

    然后调用comOp函数将每一个key的value进行累加,得到最后输出。

    输出:
    
    scala> data.aggregateByKey(1)(seq, comb).collect.toBuffer
    res36: scala.collection.mutable.Buffer[(Int, Int)] = ArrayBuffer((2,5), (1,7))

    4:combineByKey

    def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

    参数说明:

    • createCombiner, which turns a V into a C (e.g., creates a one-element list)
    • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
    • mergeCombiners, to combine two C's into a single one.

    例如:

    val data1 = sc.parallelize(List("a", "b", "c", "c", "b", "a", "b", "a"))
    val data2 = sc.parallelize(List(1, 2, 3, 3, 2, 1, 2, 1))
    val zip = data2.zip(data1)
    val combineByKey = zip.combineByKey(List(_), (x: List[String], y: String) => y :: x, (x: List[String], y: List[String]) => x ::: y)
    println(combineByKey.collect().toBuffer)

    输出:

    ArrayBuffer((1,List(a, a, a)), (2,List(b, b, b)), (3,List(c, c)))

    5:flatMapValues

    def flatMapValues[U](f: (V) ⇒ TraversableOnce[U]): RDD[(K, U)]

    传入一个键值对的值给 (V) ⇒ TraversableOnce[U]函数,返回的是一个集合的函数。将当前的key和当前集合中每一个元素组成元组返回

    val a = sc.parallelize(List((1,2),(3,4),(5,6)))
    val b = a.flatMapValues(x=>1 to x)
    b.collect.foreach(println(_))

    输出:

    (1,1)
    (1,2)
    (3,1)
    (3,2)
    (3,3)
    (3,4)
    (5,1)
    (5,2)
    (5,3)
    (5,4)
    (5,5)
    (5,6)

    分析:当传入是2的时候,生成一个1 to 2 的序列,然后当前key=1,所以生成(1,1),(1,2)两个元组、

     6:foldByKey

    def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

    fold:折叠的意思,根据key进行折叠,例如:

    scala> rdd.collect
    res2: Array[String] = Array(hello, world, xiaomi, meizu, meizu)
    
    scala> rdd.map((_,1)).foldByKey(1)(_+_).collect
    res3: Array[(String, Int)] = Array((meizu,3), (hello,2), (world,2), (xiaomi,2))

    此处zeroValue为1,由于meizu这个字符出现两次并加上zeroValue的话恰好是3。hello world xiaomi这些单词都是一次,并加上zeroValue恰好是2。

    当zeroValue值为0的时候,我们可以实现一个wordcount。

    7:mapValues

    def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]

    对键值对的每一个value进行处理,key保持不变。例如:我对现有的键值对的值进行乘以10的操作

    scala> rdd.map((_,1)).collect
    res14: Array[(String, Int)] = Array((hello,1), (world,1), (xiaomi,1), (meizu,1), (meizu,1))
    
    scala> rdd.map((_,1)).mapValues(x=>x*10).collect
    res15: Array[(String, Int)] = Array((hello,10), (world,10), (xiaomi,10), (meizu,10), (meizu,10))

    8:rightOuterJoin

    def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

    右外连接,other中的所有key都会出现在结果中,关联到的是一个Some值,关联不到的是一个None。例如:

    scala> rdd1.collect
    res26: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
    
    scala> rdd2.collect
    res27: Array[(Int, Int)] = Array((3,2), (4,2), (5,2), (6,2), (7,2), (8,2))
    
    scala> rdd1.rightOuterJoin(rdd2).collect
    res28: Array[(Int, (Option[Int], Int))] = Array((4,(Some(1),2)), (6,(None,2)), (8,(None,2)), (3,(Some(1),2)), (7,(None,2)), (5,(Some(1),2)))

    9:leftOuterJoin

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    同上面右外连接。示例:

    scala> rdd1.collect
    res31: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
    
    scala> rdd2.collect
    res32: Array[(Int, Int)] = Array((3,2), (4,2), (5,2), (6,2), (7,2), (8,2))
    
    scala> rdd1.leftOuterJoin(rdd2).collect
    res33: Array[(Int, (Int, Option[Int]))] = Array((4,(1,Some(2))), (2,(1,None)), (1,(1,None)), (3,(1,Some(2))), (5,(1,Some(2))))

    rdd中的所有的key都会出现,关联的不到的为None,此时关联的值在value的第二个位置。我们可以通过交换右外连接的两个rdd的位置,实现左外连接,但是区别在于value中元素的位置是逆序的:

    scala> rdd2.rightOuterJoin(rdd1).collect //通过右外连接实现左外连接
    res34: Array[(Int, (Option[Int], Int))] = Array((4,(Some(2),1)), (2,(None,1)), (1,(None,1)), (3,(Some(2),1)), (5,(Some(2),1)))
  • 相关阅读:
    windows的一组常用运行命令
    nfs:server is not responding,still trying 原因与解决方案
    MYSQL外键(Foreign Key)的使用
    byte[]转字符串编码问题
    /usr/bin/ld: cannot find lGL
    Linux查看用户及分组
    NAND和NOR flash的区别
    Win7+Ubuntu11.10(EasyBCD硬盘安装)
    Win7+Ubuntu12.04.1硬盘安装错误及解决方案
    Linux内核编译时错误
  • 原文地址:https://www.cnblogs.com/leodaxin/p/7499557.html
Copyright © 2011-2022 走看看