zoukankan      html  css  js  c++  java
  • Spark学习摘记 —— Pair RDD转化操作API归纳

    本文参考

    参考《Spark快速大数据分析》动物书中的第四章"键值对操作",由于pair RDD的一些特殊操作,没有和前面两篇的API归纳放在一起做示例

    前面的几个api —— reduceByKey()函数、foldByKey()函数、groupByKey()函数、combineByKey()函数、mapValues()函数、flatMapValues()函数、keys()函数、values()函数和sortByKey函数是针对一个Pair RDD的操作

    而后的几个api —— subtractByKey()函数、join()函数、rightOuterJoin()函数、leftOuterJoin()函数和cogroup()函数是针对两个Pair RDD的函数

    Spark转化操作API归纳:https://www.cnblogs.com/kuluo/p/12545374.html

    Spark行动操作API归纳:https://www.cnblogs.com/kuluo/p/12550938.html

    Pair RDD行动操作API归纳:https://www.cnblogs.com/kuluo/p/12567221.html

    环境

    idea + spark 2.4.5 + scala 2.11.12

    RDD均通过SparkContext的parallelize()函数创建

    reduceByKey()函数

    目的:

    合并具有相同键的值,在经典的WordCount示例程序里,通过该函数合并相同字母(键)的出现次数(值)

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(pair => print(s"$pair "))

    输出:

    (d,2) (a,3) (b,5) (c,4)

     

    foldByKey()函数

    目的:

    合并具有相同键的值,同reduceByKey()类似,不过要指定初始值

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).foreach(pair => print(s"$pair "))

    输出:

    (d,2) (a,3) (b,5) (c,4)

     

    mapValues()函数

    目的:

    对pair RDD中的每个值应用一个函数而不改变键

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .reduceByKey(_ + _)
    .mapValues(_ * 2)
    .foreach(pair => print(s"$pair "))

    输出:

    (d,4) (a,6) (b,10) (c,8)

     

    flatMapValues()函数

    目的:

    对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .reduceByKey(_ + _)
    .flatMapValues(_ to 5)
    .foreach(pair => print(s"$pair "))

    输出:

    (d,2) (d,3) (d,4) (d,5) (a,3) (a,4) (a,5) (b,5) (c,4) (c,5)

     

    groupByKey()函数

    目的:

    对具有相同键的值进行分组

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .groupByKey()
    /**
    * (d,CompactBuffer(1, 1)) (a,CompactBuffer(1, 1, 1)) (b,CompactBuffer(1, 1, 1, 1, 1)) (c,CompactBuffer(1, 1, 1, 1))
    */
    .mapValues(_.sum)
    .foreach(pair => print(s"$pair "))

    输出:

    使用groupByKey()函数后,直接输出的结果为

    (d,CompactBuffer(1, 1)) (a,CompactBuffer(1, 1, 1)) (b,CompactBuffer(1, 1, 1, 1, 1)) (c,CompactBuffer(1, 1, 1, 1))

    在groupByKey()函数后使用mapValues()函数,可以实现和WordCount的reduceByKey()函数相同的效果,输出的结果为

    (d,2) (a,3) (b,5) (c,4)

    更高效的操作:

    可以一步到位的reduceByKey()函数效率更高,避免使用groupByKey()函数分步操作,以下为源码注释

    This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` or `PairRDDFunctions.reduceByKey` will provide much better performance.

     

    combineByKey()函数

    目的:

    使用不同的返回类型合并具有相同键的值,大多数基于键聚合的函数都是用它实现的(如reduceByKey()函数和foldByKey()函数)

    遇到一个新元素时,会使用createCombiner()的函数来创建那个键对应的累加器的初始值,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现时发生

    若是一个之前遇到过的键,会使用mergeValue()的函数将该键的累加器对应的当前值与这个新的值进行合并

    每个分区独立处理,对于同一个键可以有多个累加器,若有多个分区对应同一个键的累加器,会使用mergeCombiners()函数将各个分区的结果进行合并

    代码(WordCount):

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .combineByKey(x => x,
    (x:Int, y:Int) => x + y,
    (x:Int, y:Int) => x + y)
    .foreach(pair => print(s"$pair "))

    输出:

    (d,2) (a,3) (b,5) (c,4)

    代码(求平均):

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4) (c,6)
    */
    val
    testList2 = List("a a a a a b b", "b b c c c c c", "c")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    /*
    * union
    取并集,构造(a,3) (a,5) (b,5) (b,4) (c,4) (c,6) (d,2)
    */
    val
    testRdd3 = testRdd1.flatMap(_.split(" ")).map((_, 1))
    .combineByKey(x => x,
    (x: Int, y: Int) => x + y,
    (x: Int, y: Int) => x + y)
    .union(
    testRdd2.flatMap(_.split(" ")).map((_, 1))
    .combineByKey(x => x,
    (x: Int, y: Int) => x + y,
    (x: Int, y: Int) => x + y)
    )

    testRdd3.combineByKey(
    x => (x, 1),
    (x: (Int, Int), y) => (x._1 + y, x._2 + 1),
    (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
    ).mapValues(value => 1.0 * value._1 / value._2).foreach(pair => print(s"$pair "))

    输出:

    (d,2.0) (a,4.0) (b,4.5) (c,5.0)

     

    keys()函数

    目的:

    返回一个仅包含键的RDD

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .reduceByKey(_ + _)
    .keys
    .foreach(key => print(s"$key "))

    输出:

    d a b c

     

    values()函数

    目的:

    返回一个仅包含值得RDD

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .reduceByKey(_ + _)
    .values
    .foreach(value => print(s"$value "))

    输出:

    2 3 5 4

     

    sortByKey()函数

    目的:

    返回一个根据键排序的RDD,默认升序

    代码:

    val testList = List("a a a b b b", "b b c c c", "c d d")
    val testRdd = sc.parallelize(testList)
    testRdd.flatMap(_.split(" ")).map((_, 1))
    .reduceByKey(_ + _)
    .sortByKey(false)
    .foreach(pair => print(s"$pair "))

    输出:

    (d,2) (c,4) (b,5) (a,3)

     

    subtractByKey()函数

    目的:

    类似于集合中的差集,删掉当前RDD中键与另一个RDD中的键相同的元素

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4) (c,6)
    */
    val
    testList2 = List("a a a a a b b", "b b c c c c c", "c")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    .subtractByKey(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
    .foreach(pair => print(s"$pair "))

    输出:

    (d,2)

     

    join()函数

    目的:

    类似于集合中的并集,对两个RDD进行内连接

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4) (c,6)
    */
    val
    testList2 = List("a a a a a b b", "b b c c c c c", "c")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    .join(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
    .foreach(pair => print(s"$pair "))

    输出:

    (a,(3,5)) (b,(5,4)) (c,(4,6))

     

    rightOuterJoin()函数

    目的:

    类似于集合的右连接,确保函数内的RDD的键存在

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4) (c,6)
    */
    val
    testList2 = List("a a a a a b b", "b b c c c c c", "c")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    .rightOuterJoin(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
    .foreach(pair => print(s"$pair "))

    输出:

    (a,(Some(3),5)) (b,(Some(5),4)) (c,(Some(4),6))

     

    leftOuterJoin()函数

    目的:

    类似于集合的左连接,确保调用该函数的RDD的键存在

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4) (c,6)
    */
    val
    testList2 = List("a a a a a b b", "b b c c c c c", "c")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    .leftOuterJoin(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
    .foreach(pair => print(s"$pair "))

    输出:

    (d,(2,None)) (a,(3,Some(5))) (b,(5,Some(4))) (c,(4,Some(6)))

     

    cogroup()函数

    目的:

    将两个RDD中拥有相同键的数据分组到一起

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4) (c,6)
    */
    val
    testList2 = List("a a a a a b b", "b b c c c c c", "c")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    .cogroup(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
    .foreach(pair => print(s"$pair "))

    输出:

    (d,(CompactBuffer(2),CompactBuffer())) (a,(CompactBuffer(3),CompactBuffer(5)))

    (b,(CompactBuffer(5),CompactBuffer(4))) (c,(CompactBuffer(4),CompactBuffer(6)))

  • 相关阅读:
    stress工具使用指南和结果分析
    copy.c实现
    sysbench测试阿里云CPU
    sysbench测试阿里云ECS云磁盘的IOPS,吞吐量
    iostat详解
    sysbench_fileio.sh
    rm -f /var/lib/rpm/__db*;rpm --rebuilddb
    HeadFirst 13 (包装器, 过滤器) not Finish
    基于Linux的oracle数据库管理 part5( linux启动关闭 自动启动关闭 oracle )
    基于Linux的oracle数据库管理 part4( shell管理 上 )
  • 原文地址:https://www.cnblogs.com/kuluo/p/12558563.html
Copyright © 2011-2022 走看看