zoukankan      html  css  js  c++  java
  • Spark分区

    转载自:https://www.cnblogs.com/qingyunzong/p/8987065.html

    一:分区的概念

    分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区。

    分区的格式决定了并行计算的粒度,而每个分区的数值计算都是在一个任务task中进行的,因此任务的个数,也是由RDD(准确来说是作业最后一个RDD)的分区数决定。

    二:分区的目的

    数据分区,在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输是可以避免的,把大文件压缩变小文件,从而减少网络传输,但是增加了cpu的计算负载。

    (一)Spark里面io也是不可避免的,同Hadoop在网络传输spark里面进行了优化

    Spark把rdd进行分区(分片),放在集群上并行计算。同一个rdd分片100个,10个节点,平均一个节点10个分区,当进行sum型的计算的时候,先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum,所以进行sum型计算对网络传输非常小。但对于进行join型的计算的时候,需要把数据本身进行shuffle,网络开销很大。

    (二)我们进行mapreduce计算的时候为什么要进行shuffle?《重点》对比理解shuffle

    就是说mapreduce里面网络传输主要在shuffle阶段,shuffle的根本原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的,它要把所有的数据混在一起走网络,然后它才能把相同的key走到一起。进行shuffle是存储决定的。

    (三)spark独有特点---优化改进

    Spark把key-value类型的RDD,通过key的hashcode进行分区,而且保证相同的key存储在同一个节点上。这样对改rdd进行key聚合时,就不需要shuffle过程。

    Spark为了改进mapreduce的shuffle机制,spark会把key进行分区,也就是key的hashcode进行分区,相同的key,hashcode肯定是一样的,所以它进行分区的时候100t的数据分成10分,每部分10个t,它能确保相同的key肯定在一个分区里面,而且它能保证存储的时候相同的key能够存在同一个节点上。比如一个rdd分成了100份,集群有10个节点,所以每个节点存10份,每一分称为每个分区,spark能保证相同的key存在同一个节点上,实际上相同的key存在同一个分区。

    key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等,但它会保证在一个接近的范围。所以mapreduce里面做的某些工作里边,spark就不需要shuffle了,spark解决网络传输这块的根本原理就是这个。

    (四)spark多表分区

    进行join的时候是两个表,不可能把两个表都分区好,通常情况下是把用的频繁的大表事先进行分区,小表进行关联它的时候小表进行shuffle过程。----大表不需要shuffle。

    需要在工作节点间进行数据混洗的转换极大地受益于分区。这样的转换是 cogroup,groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,combineByKey 和lookup。

    分区是可配置的,只要RDD是基于键值对的即可

    三:Spark分区原则及方法

    RDD分区的一个分区原则:尽可能是得分区的个数等于集群核心数目

    无论是本地模式、Standalone模式、YARN模式或Mesos模式,我们都可以通过spark.default.parallelism来配置其默认分区个数若没有设置该值,则根据不同的集群环境确定该值

    (一)本地模式

    1.默认方式---就一个分区

      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
        //设置运行模式为本地运行,不然默认是集群模式
        //conf.setMaster("local")  //默认是集群模式
        //设置任务名
        conf.setAppName("WordCount").setMaster("local")
        //设置SparkContext,是SparkCore的程序入口
        val sc = new SparkContext(conf)
        
        val array = Array(1,2,3)
        val arrayRDD:RDD[Int] = sc.parallelize(array)  //默认是一个分区
        val numPartitions = arrayRDD.getNumPartitions
        println(numPartitions)
      }

    2.手动设置分区数

      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
        //设置运行模式为本地运行,不然默认是集群模式
        //conf.setMaster("local")  //默认是集群模式
        //设置任务名
        conf.setAppName("WordCount").setMaster("local")
        //设置SparkContext,是SparkCore的程序入口
        val sc = new SparkContext(conf)
        
        val array = Array(1,2,3)
        val arrayRDD:RDD[Int] = sc.parallelize(array, numSlices=2)
        val numPartitions = arrayRDD.getNumPartitions
        println(numPartitions)
      }

    3.local[n]---n等于几默认就是几个分区,如果n=* 那么分区个数就等于cpu core的个数

      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
        //设置运行模式为本地运行,不然默认是集群模式
        //conf.setMaster("local")  //默认是集群模式
        //设置任务名
        conf.setAppName("WordCount").setMaster("local[3]")
        //设置SparkContext,是SparkCore的程序入口
        val sc = new SparkContext(conf)
        
        val array = Array(1,2,3)
        val arrayRDD:RDD[Int] = sc.parallelize(array)
        val numPartitions = arrayRDD.getNumPartitions
        println(numPartitions)
      }

    conf.setAppName("WordCount").setMaster("local[*]")

    4.参数控制

      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
        //设置运行模式为本地运行,不然默认是集群模式
        //conf.setMaster("local")  //默认是集群模式
        //设置任务名
        conf.setAppName("WordCount").setMaster("local")
        conf.set("spark.default.parallelism","5")
        //设置SparkContext,是SparkCore的程序入口
        val sc = new SparkContext(conf)
        
        val array = Array(1,2,3)
        val arrayRDD:RDD[Int] = sc.parallelize(array)
        val numPartitions = arrayRDD.getNumPartitions
        println(numPartitions)
      }

    (二)其他模式

     1.yarn模式

    最大分区数:所有执行节点上的核心总数或2个,以较大的为准

    2.Mesos细粒度模式

    最大分区数:8 

    四:分区器

    (一)设置分区器的场景

    1.如果是从HDFS里面读取出来的数据,不需要分区器。因为HDFS本来就分好区了

        分区数我们是可以控制的,但是没必要有分区器

    2.非key-value RDD分区没必要设置分区器。但是也可以设置

      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
        //设置运行模式为本地运行,不然默认是集群模式
        //conf.setMaster("local")  //默认是集群模式
        //设置任务名
        conf.setAppName("WordCount").setMaster("local")
        conf.set("spark.default.parallelism","5")
        //设置SparkContext,是SparkCore的程序入口
        val sc = new SparkContext(conf)
        
        val testRDD = sc.textFile("E:\1_a1.txt").flatMap(line => line.split(",")).map(word => (word, 1)).partitionBy(new HashPartitioner(2))
      }

    3.Key-value形式的时候,我们就有必要了。

    (二)分区器---HashPartitioner (默认)

    val resultRDD = testRDD.reduceByKey(new HashPartitioner(2),(x:Int,y:Int) => x+ y)
    //如果不设置默认也是HashPartitoiner,分区数跟spark.default.parallelism一样
    println(resultRDD.partitioner)
    println("resultRDD"+resultRDD.getNumPartitions)

    (三)分区器---RangePartitioner

    val newresultRDD=resultRDD.partitionBy(new RangePartitioner[String,Int](3,resultRDD))  //String,Int是将要进行分区的resultRDD的键值对类型。(3,redultRDD)中3是分区数,resultRDD是要进行分区的RDD。详细见下面

    SparkCore中除了HashPartitioner分区器外,另外一个比较重要的已经实现的分区器,主要用于RDD的数据排序相关API中,比如sortByKey底层使用的数据分区器就是RangePartitioner分区器。

    该分区器的实现方式主要是通过两个步骤来实现的:

    第一步:先重整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
    
    第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;

    该分区器要求RDD中的KEY类型必须是可以排序的,

    class RangePartitioner[K: Ordering : ClassTag, V](
                                                       partitions: Int,
                                                       rdd: RDD[_ <: Product2[K, V]],
                                                       private var ascending: Boolean = true)
      extends Partitioner {
    
      // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
      require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
    
      // 获取RDD中key类型数据的排序器
      private var ordering = implicitly[Ordering[K]]
    
      // An array of upper bounds for the first (partitions - 1) partitions
      private var rangeBounds: Array[K] = {
        if (partitions <= 1) {
          // 如果给定的分区数是一个的情况下,直接返回一个空的集合,表示数据不进行分区
          Array.empty
        } else {
          // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
          // 给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每个RDD分区至少抽取20条数据
          val sampleSize = math.min(20.0 * partitions, 1e6)
          // Assume the input partitions are roughly balanced and over-sample a little bit.
          // 计算每个分区抽取的数据量大小, 假设输入数据每个分区分布的比较均匀
          // 对于超大数据集(分区数超过5万的)乘以3会让数据稍微增大一点,对于分区数低于5万的数据集,每个分区抽取数据量为60条也不算多
          val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
          // 从rdd中抽取数据,返回值:(总rdd数据量, Array[分区id,当前分区的数据量,当前分区抽取的数据])
          val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
          if (numItems == 0L) {
            // 如果总的数据量为0(RDD为空),那么直接返回一个空的数组
            Array.empty
          } else {
            // If a partition contains much more than the average number of items, we re-sample from it
            // to ensure that enough items are collected from that partition.
            // 计算总样本数量和总记录数的占比,占比最大为1.0
            val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
            // 保存样本数据的集合buffer
            val candidates = ArrayBuffer.empty[(K, Float)]
            // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)
            val imbalancedPartitions = mutable.Set.empty[Int]
            // 计算抽取出来的样本数据
            sketched.foreach { case (idx, n, sample) =>
              if (fraction * n > sampleSizePerPartition) {
                // 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽象数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
                imbalancedPartitions += idx
              } else {
                // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
                // The weight is 1 over the sampling probability.
                val weight = (n.toDouble / sample.size).toFloat
                for (key <- sample) {
                  candidates += ((key, weight))
                }
              }
            }
    
            // 对于数据分布不均衡的RDD分区,重新进行数据抽样
            if (imbalancedPartitions.nonEmpty) {
              // Re-sample imbalanced partitions with the desired sampling probability.
              // 获取数据分布不均衡的RDD分区,并构成RDD
              val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
              // 随机种子
              val seed = byteswap32(-rdd.id - 1)
              // 利用rdd的sample抽样函数API进行数据抽样
              val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
              val weight = (1.0 / fraction).toFloat
              candidates ++= reSampled.map(x => (x, weight))
            }
    
            // 将最终的抽样数据计算出rangeBounds出来
            RangePartitioner.determineBounds(candidates, partitions)
          }
        }
      }
    
      // 下一个RDD的分区数量是rangeBounds数组中元素数量+ 1个
      def numPartitions: Int = rangeBounds.length + 1
    
      // 二分查找器,内部使用java中的Arrays类提供的二分查找方法
      private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
    
      // 根据RDD的key值返回对应的分区id。从0开始
      def getPartition(key: Any): Int = {
        // 强制转换key类型为RDD中原本的数据类型
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
          // If we have less than 128 partitions naive search
          // 如果分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标
          while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
            partition += 1
          }
        } else {
          // Determine which binary search method to use only once.
          // 如果分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标;
          // 但是如果k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比所有数据都大)
          partition = binarySearch(rangeBounds, k)
          // binarySearch either returns the match location or -[insertion point]-1
          if (partition < 0) {
            partition = -partition - 1
          }
          if (partition > rangeBounds.length) {
            partition = rangeBounds.length
          }
        }
    
        // 根据数据排序是升序还是降序进行数据的排列,默认为升序
        if (ascending) {
          partition
        } else {
          rangeBounds.length - partition
        }
      }

    按照范围进行分区的,如果是字符串,那么就按字典顺序的范围划分。如果是数字,就按数据自的范围划分

    def determineBounds[K: Ordering : ClassTag](
                                                   candidates: ArrayBuffer[(K, Float)],
                                                   partitions: Int): Array[K] = {
        val ordering = implicitly[Ordering[K]]
        // 按照数据进行数据排序,默认升序排列
        val ordered = candidates.sortBy(_._1)
        // 获取总的样本数量大小
        val numCandidates = ordered.size
        // 计算总的权重大小
        val sumWeights = ordered.map(_._2.toDouble).sum
        // 计算步长
        val step = sumWeights / partitions
        var cumWeight = 0.0
        var target = step
        val bounds = ArrayBuffer.empty[K]
        var i = 0
        var j = 0
        var previousBound = Option.empty[K]
        while ((i < numCandidates) && (j < partitions - 1)) {
          // 获取排序后的第i个数据及权重
          val (key, weight) = ordered(i)
          // 累计权重
          cumWeight += weight
          if (cumWeight >= target) {
            // Skip duplicate values.
            // 权重已经达到一个步长的范围,计算出一个分区id的值
            if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
              // 上一个边界值为空,或者当前边界key数据大于上一个边界的值,那么当前key有效,进行计算
              // 添加当前key到边界集合中
              bounds += key
              // 累计target步长界限
              target += step
              // 分区数量加1
              j += 1
              // 上一个边界的值重置为当前边界的值
              previousBound = Some(key)
            }
          }
          i += 1
        }
        // 返回结果
        bounds.toArray
      }
    RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界

    补充:RangePartitioner分区执行原理概述

    1.计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1e6的样本的数据量。

    2.根据sampleSize和分区数量计算每个分区的数据抽样样本数量最大值sampleSizePrePartition。

    3.根据以上两个值进行水塘抽样,返回RDD的总数据量,分区中总元素的个数和每个分区的采样数据。

    4.计算出数据量较大的分区通过RDD.sample进行重新抽样。

    5.通过抽样数组 candidates: ArrayBuffer[(K, wiegth)]计算出分区边界的数组BoundsArray

    6.在取数据时,如果分区数小于128则直接获取,如果大于128则通过二分法,获取当前Key属于那个区间,返回对应的BoundsArray下标即为partitionsID。

    补充:https://www.zhihu.com/question/34672009

    (四)两种分区器的区别 

    1.HashPartitioner分区可能HashPartitioner导致每个分区中数据量的不均匀。

    2.RangePartitioner分区尽量保证每个分区中数据量的均匀,将一定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。

    五:自定义分区器

    package com.dt.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.HashPartitioner
    import org.apache.spark.Partitioner
    import java.net.URL
    import org.apache.spark.RangePartitioner
    
    class MyPartitioner(val numParts:Int) extends Partitioner{
      def numPartitions: Int = numParts  //设置分区数
      def getPartition(key: Any): Int = {  //返回分区号
        val domain = new URL(key.toString()).getHost
        val code = (domain.hashCode()%numParts)
        if(code<0){
          code+numParts
        }else{
          code
        }
      }
    }
    
    object WordCount {
      def main(args:Array[String]):Unit={
        val conf = new SparkConf()
        //设置运行模式为本地运行,不然默认是集群模式
        //conf.setMaster("local")  //默认是集群模式
        //设置任务名
        conf.setAppName("WordCount").setMaster("local")
        conf.set("spark.default.parallelism","5")
        //设置SparkContext,是SparkCore的程序入口
        val sc = new SparkContext(conf)
        
        val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2),("http://baidu.com/index", 2),("http://ali.com", 3), ("http://baidu.com/tmmmm", 4),("http://baidu.com/test", 4)))
        
        val newresultRDD = urlRDD.partitionBy(new RangePartitioner(2,urlRDD))
        
        val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2))  //hashPartition
        val res = hashPartitionedRDD.glom().collect()
        
        val partitionedRDD = urlRDD.partitionBy(new MyPartitioner(2))  //使用自定义partition
        val array = partitionedRDD.glom().collect()
        println(array)
      }
    }
  • 相关阅读:
    三次请求(读-改-读)引出nibernate 一级缓存
    算法竞赛入门经典第一、二章摘记
    uva 10905 Children's Game
    uva 11205 The broken pedometer
    uva 10160 Servicing stations
    uva 208 Firetruck
    uva 167 The Sultan's Successors
    zoj 1016 Parencodings
    uva 307 Sticks
    uva 216 Getting in Line
  • 原文地址:https://www.cnblogs.com/ssyfj/p/12615298.html
Copyright © 2011-2022 走看看