zoukankan      html  css  js  c++  java
  • Spark(九)【RDD的分区和自定义Partitioner】

    spark的分区

    ​ Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。

    注意

    (1)只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD,分区器的值是None
    (2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

    查看RDD的分区器

    scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
    pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at 
    scala> pairs.partitioner
    res1: Option[org.apache.spark.Partitioner] = None
    

    对RDD进行重新分区

    val partitioned = pairs.partitionBy(new HashPartitioner(2))
    partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27
    

    一. Hash分区

    HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

    聚类! key相同,hashCode相同,分配到同一个区

    问题:数据倾斜,每个分区中数据量的不均匀

    二. Ranger分区

    ​ 将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的

    实现过程:

    ​ ①抽样产生边界数组

    ​ ②将元素根据边界数组判断属于哪个区

    三. 自定义Partitioner

    实现过程

    要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

    (1)numPartitions: Int:返回创建出来的分区数。

    (2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

    使用

    使用自定义的 Partitioner 是很容易的:只要把它传给 partitionBy() 方法即可。

    使用自定义分区器,传给 partitionBy() 方法
    scala> val par = data.partitionBy(new MyCustomerPartitioner(2))
    par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27
    查看重新分区后的数据分布
    scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
    res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))
    

    案例

    需求:有以下数据,希望年龄相同的进入同一个区。

    User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23)
    
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    /**
     * @description: TODO
     * @author: HaoWu
     * @create: 2020年08月03日
     */
    object MyPartitionerTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val list = List(User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23))
        val result = sc.makeRDD(list).map {
          case User(name, age) => (age, name)
        }.partitionBy(new MyPartitioner(3))
        result.saveAsTextFile("output")
      }
    }
    
    /**
     * User样例类
     */
    case class User(name: String, age: Int)
    
    /**
     * 自定义分区器
     */
    class MyPartitioner(num: Int) extends Partitioner {
      //设置分区数
      override def numPartitions: Int = num
    
      //分区规则
      override def getPartition(key: Any): Int = {
        //判断是否为Int类型
        if (!key.isInstanceOf[Int]) {
          0
        } else {
          //Hash分区具有聚类的作用,相同age的会被分如同一个区
          key.asInstanceOf[Int] % numPartitions
        }
      }
    }
    
  • 相关阅读:
    字符串去重
    你必须懂的 T4 模板:深入浅出
    解决T4模板的程序集引用的五种方案
    table 合并行和列
    porwedesigner 去掉引号
    面向对象JS基础
    19套最新的免费图标字体集
    推荐 15 款很棒的文本编辑器
    13个JavaScript图表(JS图表)图形绘制插件
    推荐10款免费而优秀的图表插件
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13442205.html
Copyright © 2011-2022 走看看