zoukankan      html  css  js  c++  java
  • 【spark】常用转换操作:reduceByKey和groupByKey

    1.reduceByKey(func)

    功能:

      使用 func 函数合并具有相同键的值。

    示例:

    val list = List("hadoop","spark","hive","spark")
    val rdd = sc.parallelize(list)
    val pairRdd = rdd.map((_,1))
    pairRdd.reduceByKey(_+_).collect.foreach(println)
    

    上例中,我们先是建立了一个list,然后建立通过这个list集合建立一个rdd

    然后我们通过map函数将list的rdd转化成键值对形式的rdd

    然后我们通过reduceByKey方法对具有相同key的值进行func(_+_)的累加操作。

    输入结果如下

    (hive,1)
    (spark,2)
    (hadoop,1)
    list: List[String] = List(hadoop, spark, hive, spark)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[127] at parallelize at command-3434610298353610:2
    pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[128] at map at command-3434610298353610:3 
    pairRdd.collect.foreach(println) //打印pairRdd
    (hive,1)
    (spark,1)
    (hadoop,1)
    (spark,1)
    

    我们需要留意的事情是,我们调用了reduceByKey操作的返回的结果类型是

    org.apache.spark.rdd.RDD[(String, Int)]  

    注意,我们这里的collect()方法的作用是收集分布在各个worker的数据到driver节点。

    如果不使用这个方法,每个worker的数据只在自己本地显示,并不会在driver节点显示。

    2.groupByKey()

    功能:

      对具有相同key的value进行分组。

    示例:

    val list = List("hadoop","spark","hive","spark")
    val rdd = sc.parallelize(list)
    val pairRdd = rdd.map(x => (x,1))
    pairRdd.groupByKey().collect.foreach(println)
    

    我们同样是对跟上面同样的pairRdd进行groupByKey()操作

    得出的结果为

    (hive,CompactBuffer(1))
    (spark,CompactBuffer(1, 1))
    (hadoop,CompactBuffer(1))
    list: List[String] = List(hadoop, spark, hive, spark)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[130] at parallelize at command-3434610298353610:2
    pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[131] at map at command-3434610298353610:3
    

    可以看到,结果并不是把具有相同key值进行相加,而是就简单的进行了分组,生成一个sequence。

    其实,我们可以把groupByKey()当作reduceByKey(func)操作的一部分,

    reduceByKey(func)先是对rdd进行groupByKey()然后在对每个分组进行func操作。

    pairRdd.reduceByKey(_+_).collect.foreach(println)
    等同于
    pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)
    

    我们这里通过groupByKey()后调用map遍历每个分组,然后通过t => (t._1,t._2.sum)对每个分组的值进行累加。

    因为groupByKey()操作是把具有相同类型的key收集到一起聚合成一个集合,集合中有个sum方法,对所有元素进行求和。

    注意,(k,v)形式的数据,我们可以通过 ._1,._2 来访问键和值,

    用占位符表示就是 _._1,_._2,这里前面的两个下划线的含义是不同的,前边下划线是占位符,后边的是访问方式。 

    我们记不记得 ._1,._2,._3 是元组的访问方式。我们可以把键值看成二维的元组。

    3.reduceByKey(func)和groupByKey()的区别

    reduceByKey()对于每个key对应的多个value进行了merge操作,最重要的是它能够先在本地进行merge操作。merge可以通过func自定义。

    groupByKey()也是对每个key对应的多个value进行操作,但是只是汇总生成一个sequence,本身不能自定义函数,只能通过额外通过map(func)来实现。

    使用reduceByKey()的时候,本地的数据先进行merge然后再传输到不同节点再进行merge,最终得到最终结果。

    而使用groupByKey()的时候,并不进行本地的merge,全部数据传出,得到全部数据后才会进行聚合成一个sequence,

    groupByKey()传输速度明显慢于reduceByKey()。

    虽然groupByKey().map(func)也能实现reduceByKey(func)功能,但是,优先使用reduceByKey(func)

  • 相关阅读:
    CodeForces 19D Points (线段树+set)
    FZU 2105 Digits Count
    HDU 5618 Jam's problem again(三维偏序,CDQ分治,树状数组,线段树)
    HDU 5634 Rikka with Phi (线段树)
    Java实现 蓝桥杯 算法提高 转圈游戏(暴力快速幂)
    Java实现 蓝桥杯 算法提高 转圈游戏(暴力快速幂)
    Java实现 蓝桥杯 算法提高 转圈游戏(暴力快速幂)
    Java实现 蓝桥杯 算法提高VIP Substrings(暴力)
    Java实现 蓝桥杯 算法提高VIP Substrings(暴力)
    Java实现 蓝桥杯 算法提高VIP Substrings(暴力)
  • 原文地址:https://www.cnblogs.com/zzhangyuhang/p/9001523.html
Copyright © 2011-2022 走看看