zoukankan      html  css  js  c++  java
  • spark自定义分区器实现

    在spark中,框架默认使用的事hashPartitioner分区器进行对rdd分区,但是实际生产中,往往使用spark自带的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区。具体的流程步骤如下:

    1、创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner

    2、重写partitioner中的方法

      override def numPartitions: Int = ???
    override def getPartition(key: Any): Int = ???

    代码实现:
    测试数据集:
    cookieid,createtime,pv
    cookie1,2015-04-10,1
    cookie1,2015-04-11,5
    cookie1,2015-04-12,7
    cookie1,2015-04-13,3
    cookie1,2015-04-14,2
    cookie1,2015-04-15,4
    cookie1,2015-04-16,4
    cookie2,2015-04-10,2
    cookie2,2015-04-11,3
    cookie2,2015-04-12,5
    cookie2,2015-04-13,6
    cookie2,2015-04-14,3
    cookie2,2015-04-15,9
    cookie2,2015-04-16,7
    

      指定按照第一个字段进行分区

    步骤1:
    package _core.sourceCodeLearning.partitioner
    
    import org.apache.spark.Partitioner
    import scala.collection.mutable.HashMap
    
    /**
      * Author Mr. Guo
      * Create 2019/6/23 - 12:19
      */
    class UDFPartitioner(args: Array[String]) extends Partitioner {
    
      private val partitionMap: HashMap[String, Int] = new HashMap[String, Int]()
      var parId = 0
      for (arg <- args) {
        if (!partitionMap.contains(arg)) {
          partitionMap(arg) = parId
          parId += 1
        }
      }
    
      override def numPartitions: Int = partitionMap.valuesIterator.length
    
      override def getPartition(key: Any): Int = {
        val keys: String = key.asInstanceOf[String]
        val sub = keys
        partitionMap(sub)
      }
    }
    

      步骤2:

    主类测试:

    package _core.sourceCodeLearning.partitioner
    
    import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.sql.SparkSession
    
    /**
      * Author Mr. Guo
      * Create 2019/6/23 - 12:21
      */
    object UDFPartitionerMain {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val ssc = SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
        val sc = ssc.sparkContext
        sc.setLogLevel("WARN")
    
        val rdd = ssc.sparkContext.textFile("file:///E:\TestFile\analyfuncdata.txt")
        val transform = rdd.filter(_.split(",").length == 3).map(x => {
          val arr = x.split(",")
          (arr(0), (arr(1), arr(2)))
        })
        val keys: Array[String] = transform.map(_._1).collect()
        val partiion = transform.partitionBy(new UDFPartitioner(keys))
        partiion.foreachPartition(iter => {
          println(s"**********分区号:${TaskContext.getPartitionId()}***************")
          iter.foreach(r => {
            println(s"分区:${TaskContext.getPartitionId()}###" + r._1 + "	" + r._2 + "::" + r._2._1)
          })
        })
        ssc.stop()
      }
    }
    

      运行结果:

    这样就是按照第一个字段进行了分区,当然在分区器的中,对于key是可以根据自己的需求随意的处理,比如添加随机数等等

  • 相关阅读:
    continue语句及小案例
    break语句和break版猜数字游戏
    python 用while语句打印99乘法表
    python2中引入python3中print函数的语法的语句
    【猜数字 小游戏】
    【while循环】
    代码块和缩进
    使用vs2015编写c语言的方法
    This function or variable may be unsafe. Consider using scanf_s instead. To disable deprecation, use _CRT_SECURE_NO_WARNINGS. See online help for details
    矩阵相乘法则和技巧
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/11073381.html
Copyright © 2011-2022 走看看