zoukankan      html  css  js  c++  java
  • spark RDD 键值算子——repartitionAndSortWithinPartitions算子

    repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行sort 排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 

    实例:

    对学生(学号,姓名,学科,分数)进行二次排序

    代码实例

    case class StudentKey(name: String, score: Int)
    
    object SortingDemo {
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder().appName("demo").master("local[*]").getOrCreate()
    
        val student_array = Array(
          "c001,n003,chinese,59",
          "c002,n004,english,79",
          "c002,n004,chinese,13",
          "c001,n001,english,88",
          "c001,n002,chinese,10",
          "c002,n006,chinese,29",
          "c001,n001,chinese,54",
          "c001,n002,english,32",
          "c001,n003,english,43",
          "c002,n005,english,80",
          "c002,n005,chinese,48",
          "c002,n006,english,69"
        )
    
        val data = spark.sparkContext.parallelize(student_array).map(_.split(","))
          .map(p => (StudentKey(p(0), p(3).toInt), p))
    
    // 隐式声明一个排序规则
    implicit val my_self_Ordering = new Ordering[StudentKey] { override def compare(x: StudentKey, y: StudentKey): Int = { x.name == y.name match { case false => -x.name.compareTo(y.name) case _ => x.score - y.score } } }
    // 实现一:使用自带
    org.apache.spark.HashPartitioner实现分区排序
        // data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(10))

        //  实现二:使用自定义分区函数进行分区排序
      data.repartitionAndSortWithinPartitions(new DataPartition(2)).map(p => (p._2(0), p._2.mkString(":")))
    .aggregateByKey(ListBuffer[String]())((x, y) => x += y, (x, y) => x ++ y)
    .foreachPartition(f => {
    val random = new Random().nextInt(9)
    f.foreach(f => {
    println(s"${f._1} ${f._2.mkString("&")} ${random}")
    })
    })
    }

    class DataPartition(partition: Int) extends Partitioner {
    override def numPartitions: Int = partition

    override def getPartition(key: Any): Int = {
    val keyString = key.asInstanceOf[StudentKey]
    Math.abs(keyString.name.hashCode() % partition) // 需要abs绝对值,hashcode可能为负数
    }
    }
    
    
  • 相关阅读:
    【Quartz】Quartz将Job保存在数据库中所需表的说明
    第十章 springboot + logback
    第二章 rabbitmq在mac上的安装
    第九章 springboot + mybatis + 多数据源 (AOP实现)
    第一章 AOP
    第八章 springboot + mybatis + 多数据源
    第三章 线程安全的DateFormat工具类
    第六章 consul UI
    第五章 consul key/value
    附1 consul常用命令+常用选项
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/12553723.html
Copyright © 2011-2022 走看看