zoukankan      html  css  js  c++  java
  • Spark Shuffle 过程

    本文参考:http://www.cnblogs.com/cenyuhai/p/3826227.html

    在数据流动的整个过程中,最复杂最影响性能的环节,就是 Shuffle 过程,本文将参考大神的博客,根据 Spark-1.5 的代码,再次走读一遍。

    Shuffle 过程

    Spark 中最经典的 Shuffle 过程发生在函数 reduceByKey、groupByKey。这里以 reduceByKey 为例分析。举个例子:

    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
    val sums = pairs.reduceByKey(_ + _).collect()
    sums.foreach(println)

    结果为:

    (1,7)
    (2,1)

    相关代码如下:

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
      reduceByKey(new HashPartitioner(numPartitions), func)
    }
    
    /**
     * Merge the values for each key using an associative reduce function. This will also perform
     * the merging locally on each mapper before sending results to a reducer, similarly to a
     * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
     * parallelism level.
     */
    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
      reduceByKey(defaultPartitioner(self), func)
    }

    注释说的挺清楚的,翻译一下:使用 reduce 函数 merge 同一个 key 的 values。这里会在每个 mapper 端执行本地的 merge,然后将结果发送到 reducer 端,作用类似于 MapReduce 中的 combiner。输出结果会被 hash-partitioned。之后的代码也会解释这个步骤。

    第一个 reduceByKey 的分区数目是传入的,第二个则使用默认方法:

    def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
      val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
      for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
        return r.partitioner.get
      }
      if (rdd.context.conf.contains("spark.default.parallelism")) {
        new HashPartitioner(rdd.context.defaultParallelism)
      } else {
        new HashPartitioner(bySize.head.partitions.size)
      }
    }

    默认的计算方式为:

    1. 优先使用自定义的分区函数

    2. 次而使用参数 spark.default.parallelism 作为分区数,创建 HashPartition

    3. 最后选择输入数据的分区数,创建 HashPartition

    ==== 未完待续

  • 相关阅读:
    对一个或多个实体的验证失败。有关详细信息,请参阅“EntityValidationErrors”属性。
    Java基础学习笔记四 Java基础语法
    Java基础学习笔记一 Java介绍
    Java基础学习笔记二 Java基础语法
    Elasticsearch重要配置
    Elasticsearch配置
    Elasticsearch安装详解
    Elasticsearch文档查询
    Elasticsearch索引和文档操作
    Angular4项目,默认的package.json创建及配置
  • 原文地址:https://www.cnblogs.com/keepthinking/p/4870767.html
Copyright © 2011-2022 走看看