zoukankan      html  css  js  c++  java
  • Spark RDD编程(3) Key-Value类型

    1 partitionBypairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

    val conf = new SparkConf().setMaster("local[*]").setAppName("word count")
    val sc = new SparkContext(conf)
    
    //----------------------- partitionBy -------------------------
    val kvRDD: RDD[(Int, Char)] = sc.makeRDD(Array((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')), 4)
    val partitionByRDD: RDD[(Int, Char)] = kvRDD.partitionBy(new HashPartitioner(2))
    val partitionByArray: Array[Array[(Int, Char)]] = partitionByRDD.glom().collect()
    partitionByArray.foreach(data => println(data.mkString(",")))
    //(2,b),(4,d)
    //(1,a),(3,c)
    

    2 groupByKey:groupByKey也是对每个key进行操作,但只生成一个sequence

    val rdd = sc.makeRDD(List("java", "scala", "java", "spark"))
    val mapRDD = rdd.map((_, 1))
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
    groupByKeyRDD.foreach(println)
    //(spark,CompactBuffer(1))
    //(scala,CompactBuffer(1))
    //(java,CompactBuffer(1, 1))
    

    3 reduceByKey在一个(K,V)RDD上调用,返回一个(K,V)RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

    //分组、所有区预聚合
    val rdd2 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2)))
    val reduceByKeyRDD: RDD[(String, Int)] = rdd2.reduceByKey((result, v) => result + v)
    reduceByKeyRDD.collect().foreach(println)
    //(female,6)
    //(male,7)

    reduceByKeygroupByKey的区别

    1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。既分组又聚合。

    2. groupByKey:按照key进行分组,直接进行shuffle。只分组。

    3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

    4 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

      kv对的RDD中,按keyvalue进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

    参数描述:

    1zeroValue给每一个分区中的每一个key一个初始值;

    2seqOp函数用于在每一个分区中用初始值逐步迭代value

    3combOp函数用于合并每个分区中的结果。

    案例:分区内相同key找出最大值,分区间相加

    val rdd3: RDD[(String, Int)] = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
    rdd3.glom().collect().foreach(data => println(data.mkString(",")))
    //(a,3),(a,2),(c,4)
    //(b,3),(c,6),(c,8)
    
    //区内聚合,区间聚合
    val aggregateByKeyRDD: RDD[(String, Int)] = rdd3.aggregateByKey(0)(_.max(_), _ + _)
    aggregateByKeyRDD.foreach(println)
    //(a,3)
    //(c,12)
    //(b,3)
    

    5 foldByKey:aggregateByKey的简化操作,seqopcombop相同

    案例:计算相同key的value累加值。

    val rdd4: RDD[(Char, Int)] = sc.parallelize(List(('a', 3), ('a', 2), ('a', 4), ('b', 3), ('c', 6), ('c', 8)), 3)
    //相当于aggregateByKey的简化,区内和区间的操作一样
    val foldByKeyRDD: RDD[(Char, Int)] = rdd4.foldByKey(0)(_ + _)
    foldByKeyRDD.collect().foreach(println)
    //(c,14)
    //(a,9)
    //(b,3)
    

    6 combineByKey(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C) 对相同K,把V合并成一个集合。

    参数描述:

    1createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

    2mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

    3mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

    案例:求每个key所对应值的平均值

    val rdd5: RDD[(String, Int)] = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
    //acc的类型是第一个函数的返回值类型,无法自动推断,因此不能用下划线简写
    val combineByKeyRDD = rdd5.combineByKey(
      (_, 1), //对每一个v进行map
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //区内聚合
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) //区间聚合
    combineByKeyRDD.collect().foreach(println)
    //(b,(286,3))
    //(a,(274,3))
    
    //求平均值.map入参只有一个,模式匹配更方便取出元组的值
    val mapRDD1: RDD[(String, Double)] = combineByKeyRDD.map {
      case (k, t) => (k, t._1 / t._2.toDouble)
    }
    mapRDD1.collect().foreach(println)
    //(b,95.33333333333333)
    //(a,91.33333333333333)
    

    7 sortByKey([ascending], [numTasks])在一个(K,V)RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)RDD。

    val rdd6 = sc.parallelize(Array((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
    val sortByKeyRDD: RDD[(Int, String)] = rdd6.sortByKey(true)
    sortByKeyRDD.collect().foreach(println)
    //(1,dd)
    //(2,bb)
    //(3,aa)
    //(6,cc)
    

    8 mapValues针对于(K,V)形式的类型只对V进行操作。

    val rdd7 = sc.parallelize(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
    val mapValuesRDD: RDD[(Int, String)] = rdd7.mapValues(_ + "|||")
    mapValuesRDD.collect().foreach(println)
    //(1,a|||)
    //(1,d|||)
    //(2,b|||)
    //(3,c|||)
    

    9 join(otherDataset, [numTasks]):在类型为(K,V)(K,W)RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))RDD。

    val rdd8 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "c")))
    val rdd9 = sc.parallelize(Array((1, 4), (2, 5), (3, 6)))
    val rdd10 = sc.parallelize(Array((1, "x"), (2, "y"), (3, "z")))
    val joinRDD1: RDD[(Int, (String, Int))] = rdd8.join(rdd9)
    //相当于内连接,双方都存在此key的保留
    joinRDD1.collect().foreach(println)
    //(1,(a,4))
    //(2,(b,5))
    //(3,(c,6))
    
    val joinRDD2: RDD[(Int, ((String, Int), String))] = rdd8.join(rdd9).join(rdd10)
    joinRDD2.collect().foreach(println)
    //(1,((a,4),x))
    //(2,((b,5),y))
    //(3,((c,6),z))
    

    10 cogroup(otherDataset, [numTasks]):在类型为(K,V)(K,W)RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。

    val rdd8 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "c")))
    val rdd9 = sc.parallelize(Array((1, 4), (2, 5), (3, 6)))
    //相当于全外连接,只要某个数据集中出现了此key,就组合成元祖
    val cogroupRDD1: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd8.cogroup(rdd9)
    cogroupRDD1.collect().foreach(println)
    //(4,(CompactBuffer(c),CompactBuffer()))
    //(1,(CompactBuffer(a),CompactBuffer(4)))
    //(2,(CompactBuffer(b),CompactBuffer(5)))
    //(3,(CompactBuffer(c),CompactBuffer(6)))
    
    val cogroupRDD2: RDD[(Int, (Iterable[Int], Iterable[String]))] = rdd9.cogroup(rdd8)
    cogroupRDD2.collect().foreach(println)
    //(4,(CompactBuffer(),CompactBuffer(c)))
    //(1,(CompactBuffer(4),CompactBuffer(a)))
    //(2,(CompactBuffer(5),CompactBuffer(b)))
    //(3,(CompactBuffer(6),CompactBuffer(c)))
    
  • 相关阅读:
    c/c++ # and ## in macros以及宏debug
    postgresql unnamed statement
    postgresql/lightdb a [left/right] join b on true的含义
    openjdk、javafx各发行版
    lightdb for postgresql PL/pgSQL perform、execute、call区别
    postgresql有mysql兼容插件吗?
    各种互联网公司,不要再那么没有分寸的刷屏QPS/TPS/日活千万这些毫无意义的数据了
    PostgreSQL分布式数据库实践
    LightDB发布日常运维管理手册
    恒生电子发布金融分布式数据库LightDB
  • 原文地址:https://www.cnblogs.com/noyouth/p/13023570.html
Copyright © 2011-2022 走看看