排序文件:
3,2
5,2
5,3
5,9
6,2
9,1
9,3
8,4
方法一:
1 package spark.rdd 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 class SecondarySortByKey(val first:Int, val second:Int) extends Ordered[SecondarySortByKey] with Serializable{ 7 override def compare(that: SecondarySortByKey): Int = { 8 if(this.first-that.first != 0){ 9 this.first - that.first 10 } else { 11 this.second - that.second 12 } 13 } 14 } 15 object SecondarySortApp { 16 // 第一列升序,第二列降序,巧妙使用List的默认排序方法 17 def main(args: Array[String]): Unit = { 18 val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]") 19 val sc = new SparkContext(conf) 20 sc.setLogLevel("ERROR") 21 val data = sc.textFile("/test/file/secondarySort.txt") 22 val lines = data.map(line => (new SecondarySortByKey(line.split(",")(0).toInt,line.split(",")(1).toInt),line)) 23 val sorted = lines.sortByKey(true) 24 sorted.map(line => line._2).collect().foreach(println(_)) 25 } 26 27 28 }
第二种方法:
1 def main(args: Array[String]): Unit = { 2 val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]") 3 val sc = new SparkContext(conf) 4 sc.setLogLevel("ERROR") 5 val data = sc.textFile("/test/file/secondarySort.txt",1) 6 //第一个列升序 7 val value: RDD[(String, String)] = data.coalesce(1,false).map(line => (line, line)).sortByKey(true) 8 val value1: RDD[(String, List[String])] = data.map(line => (line.split(",")(0), line)).groupByKey(1).sortByKey(true).map(line => (line._1, line._2.toList.sortWith(_.compareTo(_) > 0))) 9 value1.map(_._2).flatMap(_.mkString("@").split("@")).foreach(println) 10 }
注意:默认分区产生的影响。