repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行sort 排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
实例:
对学生(学号,姓名,学科,分数)进行二次排序
代码实例
case class StudentKey(name: String, score: Int) object SortingDemo { def main(args: Array[String]) { val spark = SparkSession.builder().appName("demo").master("local[*]").getOrCreate() val student_array = Array( "c001,n003,chinese,59", "c002,n004,english,79", "c002,n004,chinese,13", "c001,n001,english,88", "c001,n002,chinese,10", "c002,n006,chinese,29", "c001,n001,chinese,54", "c001,n002,english,32", "c001,n003,english,43", "c002,n005,english,80", "c002,n005,chinese,48", "c002,n006,english,69" ) val data = spark.sparkContext.parallelize(student_array).map(_.split(",")) .map(p => (StudentKey(p(0), p(3).toInt), p))
// 隐式声明一个排序规则 implicit val my_self_Ordering = new Ordering[StudentKey] { override def compare(x: StudentKey, y: StudentKey): Int = { x.name == y.name match { case false => -x.name.compareTo(y.name) case _ => x.score - y.score } } }
// 实现一:使用自带org.apache.spark.HashPartitioner实现分区排序
// data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(10))
// 实现二:使用自定义分区函数进行分区排序
data.repartitionAndSortWithinPartitions(new DataPartition(2)).map(p => (p._2(0), p._2.mkString(":")))
.aggregateByKey(ListBuffer[String]())((x, y) => x += y, (x, y) => x ++ y)
.foreachPartition(f => {
val random = new Random().nextInt(9)
f.foreach(f => {
println(s"${f._1} ${f._2.mkString("&")} ${random}")
})
})
}
class DataPartition(partition: Int) extends Partitioner {
override def numPartitions: Int = partition
override def getPartition(key: Any): Int = {
val keyString = key.asInstanceOf[StudentKey]
Math.abs(keyString.name.hashCode() % partition) // 需要abs绝对值,hashcode可能为负数
}
}