zoukankan      html  css  js  c++  java
  • 键值对RDD数据分区器

                             键值对RDD数据分区器

                                         作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.键值对RDD数据分区器概述

      Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。
    
      温馨提示:
        1>.只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
        2>.每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

    二.获取RDD分区方式 

    package com.yinzhengjie.bigdata.spark.partitioner
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.HashPartitioner
    import org.apache.spark.rdd.RDD
    
    /**
      *   可以通过使用RDD的partitioner 属性来获取 RDD 的分区方式。它会返回一个 scala.Option 对象,通过get方法获取其中的值。
      */
    object GetRDDPartition {
      def main(args: Array[String]): Unit = {
        //初始化配置信息及SparkContext
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
    
        val listRDD:RDD[(Int,Int)] = sc.parallelize(List((1,1),(2,2),(3,3)))
    
        //查看RDD的分区器
        println(listRDD.partitioner)
    
        //使用HashPartitioner算子对RDD进行重新分区
        val partitionByRDD = listRDD.partitionBy(new HashPartitioner(2))
    
        //查看重新分区后RDD的分区器
        println(partitionByRDD.partitioner)
      }
    }

    三.Hash分区

    package com.yinzhengjie.bigdata.spark.partitioner
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    /**
      *   HashPartitioner分区的原理:
      *       对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
      */
    object HashPartition {
      def main(args: Array[String]): Unit = {
        //初始化配置信息及SparkContext
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
    
        val listRDD:RDD[(Int,Int)] =  sc.makeRDD(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8),(1,5),(2,6)),12)
    
        //查看RDD的分区器(可以通过使用RDD的partitioner 属性来获取 RDD 的分区方式。它会返回一个 scala.Option 对象,)
        println(listRDD.partitioner)
    
        listRDD.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect.foreach(println)
    
        val hashpar:RDD[(Int, Int)]  = listRDD.partitionBy(new org.apache.spark.HashPartitioner(7))
        println(hashpar.count)
        println(hashpar.partitioner)
        hashpar.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println)
      }
    }

    四.Ranger分区

      HashPartitioner分区弊端:
        可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。
      RangePartitioner作用:
        将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
        实现过程为:       第一步:先重整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;       第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的。

      温馨提示:
        如果想要使用Ranger方式进行分区
    (这种方式和Hbase的预分区优点类似),那么对数据应该有两个要求,即数据是可以排序和比较。因此Spark基本上很少使用这种方式。

    五.自定义分区

    package com.yinzhengjie.bigdata.spark.partitioner
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.Partitioner
    
    /**
      *     要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
      *       numPartitions: Int:
      *         返回创建出来的分区数。
      *       getPartition(key: Any): Int:
      *         返回给定键的分区编号(0到numPartitions-1)。
      *       equals():
      *          Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
      *
      */
    class CustomerPartitioner(numParts:Int) extends Partitioner{
    
      //覆盖分区数
      override def numPartitions: Int = numParts
    
      //覆盖分区号获取函数
      override def getPartition(key: Any): Int = {
        val ckey: String = key.toString
        ckey.substring(ckey.length-1).toInt%numParts
      }
    }
    
    
    /**
      *     使用自定义的 Partitioner 是很容易的:只要把它传给 partitionBy() 方法即可。
      *
      *     Spark 中有许多依赖于数据混洗的方法,比如 join() 和 groupByKey(),它们也可以接收一个可选的 Partitioner 对象来控制输出数据的分区方式。
      */
    object CustomPartition {
      def main(args: Array[String]): Unit = {
        //初始化配置信息及SparkContext
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
    
        //创建listRDD,建设你的服务器是32core,我们就使用32个切片,但由于数据仅有8条,因此只有32个分区中仅有8个分区有数据哟~
        val listRDD:RDD[(Int,Int)] =  sc.makeRDD(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8),(1,5),(2,6)),32)
    
        //查看数据分布情况
        listRDD.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect.foreach(println)
    
        //将RDD使用自定义的分区类进行重新分区
        val par = listRDD.partitionBy(new CustomerPartitioner(3))
    
        //查看重新分区后的数据分布
        par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect.foreach(println)
      }
    }
  • 相关阅读:
    Recommended Books for Algo Trading in 2020
    Market Making is simpler than you think!
    Top Crypto Market Makers of 2020
    Top Crypto Market Makers, Rated and Reviewed
    爬取伯乐在线文章(五)itemloader
    爬取伯乐在线文章(四)将爬取结果保存到MySQL
    爬取伯乐在线文章(三)爬取所有页面的文章
    爬取伯乐在线文章(二)通过xpath提取源文件中需要的内容
    爬取伯乐在线文章(一)
    爬虫去重策略
  • 原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13166989.html
Copyright © 2011-2022 走看看