zoukankan      html  css  js  c++  java
  • 转载-reduceByKey和groupByKey的区别

    原文链接-https://www.cnblogs.com/0xcafedaddy/p/7625358.html

    先来看一下在PairRDDFunctions.scala文件中reduceByKey和groupByKey的源码

    复制代码
    /**
     * Merge the values for each key using an associative reduce function. This will also perform
     * the merging locally on each mapper before sending results to a reducer, similarly to a
     * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
     * parallelism level.
     */
    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
      reduceByKey(defaultPartitioner(self), func)
    }
    
    
    /**
     * Group the values for each key in the RDD into a single sequence. Allows controlling the
     * partitioning of the resulting key-value pair RDD by passing a Partitioner.
     * The ordering of elements within each group is not guaranteed, and may even differ
     * each time the resulting RDD is evaluated.
     *
     * Note: This operation may be very expensive. If you are grouping in order to perform an
     * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
     * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
     *
     * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
     * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
     */
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
      // groupByKey shouldn't use map side combine because map side combine does not
      // reduce the amount of data shuffled and requires all map side data be inserted
      // into a hash table, leading to more objects in the old gen.
      val createCombiner = (v: V) => CompactBuffer(v)
      val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
      val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
      val bufs = combineByKey[CompactBuffer[V]](
        createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
      bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }
     
    复制代码

    通过源码可以发现:

    reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。

    groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。

    通过以上对比可以发现在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还是可以防止使用groupByKey造成的内存溢出问题。

  • 相关阅读:
    memcache和memcached区别
    C++成员函数指针错误用法警示(成员函数指针与高性能的C++委托,三篇),附好多评论
    高手问答精选:Go 语言 —— 云计算时代的 C 语言(类似于一个FAQ)
    Delphi XE5 Android 调用手机震动(通过JObject测试是否支持震动)
    Delphi Android 将Google ZXing 整合(调用Jar文件)
    Delphi Android ActivityManager(提供了接口, 利用它可以方便的对Memory, Processes, Task, Service 等进行管)
    Azure 云 Web 应用程序
    C#由变量捕获引起对闭包
    React.js学习
    Web API
  • 原文地址:https://www.cnblogs.com/QuestionsZhang/p/10520131.html
Copyright © 2011-2022 走看看