zoukankan      html  css  js  c++  java
  • spark之 避免数据倾斜之 给名字分区(百家姓)

    一、如何确定按姓分区的Partitioner?为什么不能用HashPartitioner?

    1. 根据余数测试,得知:   结论:[1到num]% num 一定是不一样的分区值,[>num] % num 的分区一定会有重复  

    2. 为什么不能用HashPartitioner -> 应该取substring姓决定分区,而不是整个姓名;而且有复姓的可能,需要把复姓单独放在一个分区中

    3. 为什么不能自定义Partitioner中用substring(0,1)取姓第一个字后hashcode % num 进行分区?

    -> hashcode % num => 姓的hashcode值 > 486,所以,不同姓的分区存在相同的可能

    4.假如给每个姓自定义一个值。一共485个数 % 485个分区 => 根据1的结论,可以保证姓的分区唯一

    二、自定义Partitioner

    SparkContext不可以成为入参,因为需要被序列化,extends Serialization,由于SparkContext是我们获取的类,无法修改,故不可以作为传参

    可以把Map(李->0,王->1,...)作为入参

    package com.njbdqn.myname.TeachersDemo
    
    import org.apache.spark.Partitioner
    
    class MyNamePartition(num:Int,ns:Map[String,Int]) extends Partitioner{
      var names:Map[String,Int] = ns
      override def numPartitions: Int = {
        num
      }
      override def getPartition(key: Any): Int = {
        var name =key.asInstanceOf[String]
        // 如果能取到两位数就 _
        // 如果姓只有一个字就 -1
        names.getOrElse(name.substring(0,2),-1) match {
          case -1 =>names.getOrElse(name.substring(0,1),0)%num
          case _=> 5000%num
        }
      }
    }
    object MyNamePartition{
      def apply(num: Int, ns: Map[String, Int]): MyNamePartition = new MyNamePartition(num, ns)
    }

    三、如何用文件生成入参:Map(李->0,王->1,...)

    源头文件1:firstname.txt(第一行单姓,第二行复姓)

    赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳酆鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮卞齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闵席季麻强贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田樊胡凌霍虞万支柯昝管卢莫经房裘缪干解应宗丁宣贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁荀羊於惠甄曲家封芮羿储靳汲邴糜松井段富巫乌焦巴弓牧隗山谷车侯宓蓬全郗班仰秋仲伊宫宁仇栾暴甘钭厉戎祖武符刘景詹束龙叶幸司韶郜黎蓟薄印宿白怀蒲邰从鄂索咸籍赖卓蔺屠蒙池乔阴鬱胥能苍双闻莘党翟谭贡劳逄姬申扶堵冉宰郦雍卻璩桑桂濮牛寿通边扈燕冀郏浦尚农温别庄晏柴瞿阎充慕连茹习宦艾鱼容向古易慎戈廖庾终暨居衡步都耿满弘匡国文寇广禄阙东欧殳沃利蔚越夔隆师巩厍聂晁勾敖融冷訾辛阚那简饶空曾毋沙乜养鞠须丰巢关蒯相查后荆红游竺权逯盖益桓公
    万俟司马上官欧阳夏侯诸葛闻人东方赫连皇甫尉迟公羊澹台公冶宗政濮阳淳于单于太叔申屠公孙仲孙轩辕令狐钟离宇文长孙慕容鲜于闾丘司徒司空丌官司寇仉督子车颛孙端木巫马公西漆雕乐正壤驷公良拓跋夹谷宰父谷梁晋楚闫法汝鄢涂钦段干百里东郭南门呼延归海羊舌微生岳帅缑亢况郈有琴梁丘左丘东门西门商牟佘佴伯赏南宫墨哈谯笪年爱阳佟第五言福

    关键转换Map语句:

    // 1.Array(两个Array[String]) => Array(第一个Array[String]) => Array[String]
    val singlename = na.map(_.split("")).take(1).flatten
    // 2.      => Array(第二个Array[String])  => Array[String] => Iterator[Array[String]] => (上,官) 合并成(上官)【Iterator需要转换成Array】
    val doublename = na.map(_.split("")).take(2).takeRight(1).flatten.sliding(2,2).map(_.mkString).toArray 
    // 3.Array(1,2,3).zipWithIndex = Array((1,0),(2,1),(3,2))
    val allnameMap = (singlename ++ doublename).zipWithIndex.toMap

    需要递增数值可以考虑zipwithIndex,需要Map可以考虑Array(2维元组).toMap

    源头文件2:百家姓.txt

    赵
    钱
    孙
    李
    ....
    val map = rdd.map(x => {
          (1, x)
        }).groupBy(x => x._1).mapValues(x => {
          var lst:ListBuffer[(String,Int)] = ListBuffer[(String,Int)]()
          var i: Int = 0
          val iterator = x.toIterator
          while (iterator.hasNext) {
            i += 1
            lst.append((iterator.next()._2, i))
          }
          lst
        }).map(x=>x._2.toMap).collect()(0)
        map

    四、使用自定义的Partitioner干活

    处理文件:name.txt

    李世民
    李四
    张三
    王五
    王杰
    赵敏
    上官我
    欧阳杨

    关键代码:

    rdd.map((_,1)).partitionBy(MyNamePartition(465,allnameMap)).foreachPartition(x=>println(x.toList))

    可以保证每一个姓都在一个单独的分区中,避免了数据倾斜

  • 相关阅读:
    c# 集合运算
    Nuxt
    引入js,不共享变量
    sourcetree将存在的本地项目提交到远程仓库
    c#DateTime与unix时间戳互相转换
    IfcBoundingBox
    IfcBooleanResult
    IfcAnnotationFillArea
    IfcGeometricRepresentationItem
    IfcRepresentationItem
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13731487.html
Copyright © 2011-2022 走看看