zoukankan      html  css  js  c++  java
  • Spark学习摘记 —— Pair RDD行动操作API归纳

    本文参考

    参考《Spark快速大数据分析》动物书中的第四章"键值对操作",本篇是对RDD转化操作和行动操作API归纳的最后一篇

    RDD转化操作API归纳:https://www.cnblogs.com/kuluo/p/12545374.html

    RDD行动操作API归纳:https://www.cnblogs.com/kuluo/p/12550938.html

    pair RDD转化操作API归纳:https://www.cnblogs.com/kuluo/p/12558563.html

    环境

    idea + spark 2.4.5 + scala 2.11.12

    RDD均通过SparkContext的parallelize()函数创建

    countByKey()函数

    目的:

    对每个键对应的元素分别计数

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4)
    */
    val
    testList2 = List("a a a a a b b", "b b")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    val map = testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
      .union(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
      .countByKey()

    for ((x, y) <- map) {
      println(s"($x, $y)")
    }

    输出:

    (d, 1)

    (a, 2)

    (b, 2)

    (c, 1)

    注意:

    This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.

    countByKey()函数会将结果全部加载到驱动器进程中,不适合结果集较大时使用

    我们在源码中可以看到它调用了collect()函数

    def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }

    因此在处理大数据量时,应当使用.mapValues(_ => 1L).reduceByKey(_ + _)两个函数返回一个RDD

     

    collectAsMap()函数

    目的:

    collect()函数针对pair RDD的实现,将结果以映射表的形式返回

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4)
    */
    val
    testList2 = List("a a a a a b b", "b b")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    val map = testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
      .union(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
      .collectAsMap()

    for ((x, y) <- map) {
      println(s"($x, $y)")
    }

    输出:

    (b, 4)

    (d, 2)

    (a, 5)

    (c, 4)

    注意:

    this doesn't return a multimap (so if you have multiple values to the same key, only one value per key is preserved in the map returned)

    也正如本例所示,pair RDD中有重复的键时,collectByKey函数只会保留一个

    因为内部调用了collect()函数,不适合结果集较大时使用

     

    lookup()函数

    目的:

    返回给定键对应的所有值

    代码:

    /*
    * (a,3) (b,5) (c,4) (d,2)
    */
    val
    testList1 = List("a a a b b b", "b b c c c", "c d d")
    /*
    * (a,5) (b,4)
    */
    val
    testList2 = List("a a a a a b b", "b b")

    val testRdd1 = sc.parallelize(testList1)
    val testRdd2 = sc.parallelize(testList2)

    println(testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
      .union(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
      .lookup("a"))

    输出:

    ArrayBuffer(3, 5)

  • 相关阅读:
    阿里云 linux centos7安装tomcat
    定义全局的输入框获取焦点指令vfocus
    vue按enter键刷新页面 使用@submit.native.prevent阻止表单默认提交,添加在form标签上
    vue按键修饰符@keyup.enter.native
    阿里云 linux centos7中安装redis
    CRON表达式
    python定时执行nutch爬取任务
    Spring中设置bean作用域
    Cassandra 安装
    cassandra入门
  • 原文地址:https://www.cnblogs.com/kuluo/p/12567221.html
Copyright © 2011-2022 走看看