zoukankan      html  css  js  c++  java
  • spark: 二次排序-2

    在上一篇文章,使用了###错误###的方法实现二次排序,导致排序按key字典排序,并非我们想要的结果

    现在,使用自定义排序方法来实现二次排序

    1, 思路

    输入数据
    aa 12
    bb 32
    aa 3,
    cc 43
    dd 23
    cc 5
    cc 8
    bb 23
    bb 12

    1. 自定义排序类:对二元组(key:String, value:Int)进行排序

    2. 把每一行数据map成二元组(key:String, value:Int)

    3. 排序 -- scala自动使用步骤1定义的排序类进行排序

       /**
        *  1. 自定义排序类:对二元组(key:String, value:Int)进行排序
        */
       class KVOrdering extends Ordering[Tuple2[String, Int]] with Serializable {
         override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]):Int = {
       	 val comp = x._1.compareTo(y._1)
       	 if (comp == 0) {    // key相等, 则比较value
       		 x._2.compareTo(y._2)
       	 } else {
       		comp 
       	 }       
         }
       }
      

    由于scala的隐式转换机制,这个类会对闭包内的所有Tuple2[String, Int]产生作用

        val inputFile = sc.textFile("/home/hadoop/second_sort")
    
    // 2. 把每一行数据map成二元组(key:String, value:Int)
    val splitRdd = inputFile.map{x=>val y = x.split(' '); (y(0), Integer.valueOf(y(1)))}
    
    // 3. 排序
    val sortedRdd = splitRdd.sortBy(x=>x)
    

    3 后记

    1. 自定义排序类需要继承 com.google.common.collect.Ordering, 这个类在guava.jar, 测试的时候需要导入这个包。如果是在shell测试,使用下面的命令

      bin/spark-shell --jars  "PATH_TO_GUAVA"
      
    2. 这个方法忽略了一个事实:9park中, RDD是可分区的。现在我们只针对一个RDD的一个分区来做排序。RDD的多个分区,在多个worker node上并行排序,产生多个结果集,如何对这些结果集最后做一次归并排序?后面的文章再继续分析9这个问题

  • 相关阅读:
    centos8.0 安装 jenkins
    No match for argument: mysql-community-server Error: Unable to find a match: mysql-community-server
    Navicat 远程连接 centos8.0
    centos8.0 安装 mysql
    -bash: java-version: command not found
    centos8.0 安装 JDK1.8
    [Err] 1062
    android之Fragment基础详解(一)
    Android之RecyclerView(一)
    Android之ProgressBar
  • 原文地址:https://www.cnblogs.com/ivanny/p/spark_secondary_sort_2.html
Copyright © 2011-2022 走看看