zoukankan      html  css  js  c++  java
  • Spark Shuffle 过程

    本文参考:http://www.cnblogs.com/cenyuhai/p/3826227.html

    在数据流动的整个过程中,最复杂最影响性能的环节,就是 Shuffle 过程,本文将参考大神的博客,根据 Spark-1.5 的代码,再次走读一遍。

    Shuffle 过程

    Spark 中最经典的 Shuffle 过程发生在函数 reduceByKey、groupByKey。这里以 reduceByKey 为例分析。举个例子:

    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
    val sums = pairs.reduceByKey(_ + _).collect()
    sums.foreach(println)

    结果为:

    (1,7)
    (2,1)

    相关代码如下:

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
      reduceByKey(new HashPartitioner(numPartitions), func)
    }
    
    /**
     * Merge the values for each key using an associative reduce function. This will also perform
     * the merging locally on each mapper before sending results to a reducer, similarly to a
     * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
     * parallelism level.
     */
    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
      reduceByKey(defaultPartitioner(self), func)
    }

    注释说的挺清楚的,翻译一下:使用 reduce 函数 merge 同一个 key 的 values。这里会在每个 mapper 端执行本地的 merge,然后将结果发送到 reducer 端,作用类似于 MapReduce 中的 combiner。输出结果会被 hash-partitioned。之后的代码也会解释这个步骤。

    第一个 reduceByKey 的分区数目是传入的,第二个则使用默认方法:

    def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
      val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
      for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
        return r.partitioner.get
      }
      if (rdd.context.conf.contains("spark.default.parallelism")) {
        new HashPartitioner(rdd.context.defaultParallelism)
      } else {
        new HashPartitioner(bySize.head.partitions.size)
      }
    }

    默认的计算方式为:

    1. 优先使用自定义的分区函数

    2. 次而使用参数 spark.default.parallelism 作为分区数,创建 HashPartition

    3. 最后选择输入数据的分区数,创建 HashPartition

    ==== 未完待续

  • 相关阅读:
    通过kettle数据导入mysql时,空值的处理在插入mysql时,会自动转转换为null值,无法插入
    centos 安装配置kettle
    centos服务器安装配置Postgre9.6
    python 获取随机字母
    defaultdict & Counter
    PIL 安装及使用
    python之yield和Generator
    Win7系统中用anaconda配置tensorflow运行环境
    #基础概念#之tensor
    python 生成特定间隔数列的方法
  • 原文地址:https://www.cnblogs.com/keepthinking/p/4870767.html
Copyright © 2011-2022 走看看