数据量较少的情况下:
scala> numrdd.sortBy(x=>x,false).take(3)
res17: Array[Int] = Array(100, 99, 98)
scala> numrdd.sortBy(x=>x,true).take(3)
res18: Array[Int] = Array(1, 2, 3)
数据相当大的情况下,当个服务器内存无法完成TOP N,由于数据比较大,spark从hdfs上读取数据,根据数据本地化的原则,数据根据加载到不同的节点上,我们可以使用mappartition获取每个分区的top N,然后再次排序获取整个数据文件的top N
scala> val numrdd=sc.makeRDD(1 to 10000000,20) // 例如有1KW的数字,当然实际中数值可能更大
numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
scala> numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)}).collect
res2: Array[Int] = Array(5000000, 4999999, 4999998, 4999997, 4999996, 10000000, 9999999, 9999998, 9999997, 9999996, 15000000, 14999999, 14999998, 14999997, 14999996, 20000000, 19999999, 19999998, 19999997, 19999996, 25000000, 24999999, 24999998, 24999997, 24999996, 30000000, 29999999, 29999998, 29999997, 29999996, 35000000, 34999999, 34999998, 34999997, 34999996, 40000000, 39999999, 39999998, 39999997, 39999996, 45000000, 44999999, 44999998, 44999997, 44999996, 50000000, 49999999, 49999998, 49999997, 49999996, 55000000, 54999999, 54999998, 54999997, 54999996, 60000000, 59999999, 59999998, 59999997, 59999996, 65000000, 64999999, 64999998, 64999997, 64999996, 70000000, 69999999, 69999998, 69999997, 69999996, 75000000, 74999999, 74999998, 74999997, 74999996, 80000000, 79999999, 79999998, 7...
scala> val maprdd=numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)})
maprdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:26
scala> maprdd.sortBy(x=>x,false).take(5)
[Stage 3:==========================================> (15 + 5) / 20]18/08/31 18:05:20 WARN spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 166889 ms exceeds timeout 120000 ms 18/08/31 18:05:25 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 3.0 (TID 53, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms [Stage 3:===================================================> (18 + 2) / 20]18/08/31 18:06:19 WARN spark.HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 156368 ms exceeds timeout 120000 ms 18/08/31 18:06:23 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:23 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0 (TID 55, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:27 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 3.0 (TID 52, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms [Stage 3:===================================================> (18 + 0) / 20]18/08/31 18:06:32 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. [Stage 3:===================================================> (18 + 2) / 20]18/08/31 18:06:33 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/1 [Stage 4:> (0 + 2) / 20]18/08/31 18:06:44 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 18/08/31 18:06:45 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/0 [Stage 4:=========================================================(20 + 0) / 20]18/08/31 18:09:42 WARN master.Master: Removing worker-20180831175320-192.168.53.122-55296 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-59602 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-56119 because we got no heartbeat in 60 seconds 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 3 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 4 on 192.168.53.122: worker lost 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector sun.nio.ch.EPollSelectorImpl@2972d788. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector sun.nio.ch.EPollSelectorImpl@6763b7c7. 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/3 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/2 18/08/31 18:10:01 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/4 18/08/31 18:10:19 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 94, 192.168.53.122, executor 5): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
) res3: Array[Int] = Array(100000000, 99999999, 99999998, 99999997, 99999996)
由于本机是单机测试,内存不足,重试多次后艰难完成。
按分区排序,整个分区读入内存,会导致内存不足,改良如下:
scala> val numrdd=sc.makeRDD(1 to 1000000000,20)
numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:26
scala> import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
scala> numrdd.mapPartitions(iteror=>{var lst=new ArrayBuffer[Int]();iteror.foreach(line=>{lst+=line;if(lst.length>5){lst=lst.sorted.reverse.take(5)}});lst.toIterator}).sortBy(x=>x,false).take(5)
res58: Array[Int] = Array(1000000000, 999999999, 999999998, 999999997, 999999996)
按分区逐个元素写入数组,数组元素个数到达N的数值,则排序取数组的前N个数,数据处理慢,但不会由于内存不足导致失败重算。
分组取前N
scala> val accrdd=sc.textFile("/tmp/account.sql")
accrdd: org.apache.spark.rdd.RDD[String] = /tmp/account.sql MapPartitionsRDD[67] at textFile at <console>:27
scala> accrdd.take(3)
res63: Array[String] = Array(1004210 1004210 6 20180116 2018-01-16 10:39:50, 20946754 20946754 0 20170913 2017-09-13 10:02:37, 20946766 20946766 0 20170901 2017-09-01 16:51:30)
按第三列分组,取第二列的前3名
scala> val platrdd=accrdd.map(x=>{val arr=x.split(" ");(arr(2),arr(1))})
platrdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26
scala> val rdd1=platrdd.groupByKey().map(x=>(x._1,x._2.toList.sorted.reverse.take(3)))
rdd1: org.apache.spark.rdd.RDD[(String, List[String])] = MapPartitionsRDD[69] at map at <console>:31
scala> rdd1.collect
res64: Array[(String, List[String])] = Array((277,List(23160123, 23147727, 23145975)), (158,List(23152661, 23149311, 23123978)), (4,List(23164779, 23164767, 23164357)), (60,List(23164529, 23164524, 23162846)), (19,List(23163392, 23160323, 23159482)), (15,List(23164731, 23164723, 23164675)), (35,List(23164240, 23163934, 23163473)), (6,List(23164325, 23164005, 23163425)), (0,List(23134458, 23134450, 23134447)), (46,List(23160265, 23159171, 23158976)), (138,List(23157207, 23151175, 23145077)), (286,List(23063520)), (187,List(23162165, 23153133, 23147186)), (291,List(23147661, 23147644, 23037617)), (219,List(23000087, 22960754, 22953189)), (93,List(23157244, 23156623, 23152646)), (295,List(23029982, 23029518)), (33,List(23164663, 23164653, 23164341)), (17,List(22962791, 22869569, 22771519))...
按列显示
scala> rdd1.collect.foreach(line=>{print(line._1);line._2.foreach(x=>print(" "+x));println();})
277 23160123 23147727 23145975
158 23152661 23149311 23123978
4 23164779 23164767 23164357
60 23164529 23164524 23162846
19 23163392 23160323 23159482
15 23164731 23164723 23164675
35 23164240 23163934 23163473
6 23164325 23164005 23163425
0 23134458 23134450 23134447
46 23160265 23159171 23158976
138 23157207 23151175 23145077
286 23063520
187 23162165 23153133 23147186
291 23147661 23147644 23037617
219 23000087 22960754 22953189
93 23157244 23156623 23152646
295 23029982 23029518
33 23164663 23164653 23164341
17 22962791 22869569 22771519
13 23164178 23160793 23157707
11 23131196 23078538 23058051
scala> rdd1.collect.foreach(line=>{line._2.foreach(x=>println(line._1+" "+x));})
277 23160123
277 23147727
277 23145975
158 23152661
158 23149311
158 23123978
4 23164779
4 23164767
4 23164357
60 23164529
60 23164524
60 23162846
19 23163392
19 23160323
19 23159482
15 23164731
15 23164723
15 23164675
35 23164240
35 23163934
35 23163473
6 23164325
6 23164005
6 23163425
0 23134458
0 23134450
0 23134447
使用aggregateByKey分组统计top N
def aggregateByKey[U](zeroValue: U)(seqOp: (U, String) => U,combOp: (U, U) => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
def aggregateByKey[U](zeroValue: U,numPartitions: Int)(seqOp: (U, String) => U,combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
def aggregateByKey[U](zeroValue: U,partitioner: org.apache.spark.Partitioner)(seqOp: (U, String) => U,combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey
, the number of reduce tasks is configurable through an optional second argument.
scala> val accrdd=sc.textFile("/tmp/account.sql",10)
accrdd: org.apache.spark.rdd.RDD[String] = /tmp/account.sql MapPartitionsRDD[18] at textFile at <console>:26
scala> val platrdd=accrdd.map(x=>{val arr=x.split(" ");(arr(2),arr(1))})
platrdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[19] at map at <console>:28
scala> import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
scala> val rdd2=platrdd.aggregateByKey(ArrayBuffer[String]())((u,v)=>{u+=v;u.sorted.reverse.take(3)},(p1,p2)=>{p1++=p2;p2.sorted.reverse.take(3)})
rdd2: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.ArrayBuffer[String])] = ShuffledRDD[21] at aggregateByKey at <console>:31
scala> rdd2.collect.foreach(line=>{line._2.foreach(x=>println(line._1+" "+x));})
277 23160123
277 23147727
277 23145975
286 23063520
187 23162165
187 23153133
187 23147186
295 23029982
295 23029518
13 23164178
13 23160793
13 23157707
287 23056342
3 23163429
3 23160789
3 23154568
50 23154115
50 23151895
50 23148910
scala> rdd2.collect.foreach(line=>{print(line._1);line._2.foreach(x=>print(" "+x));println()})
277 23160123 23147727 23145975
286 23063520
187 23162165 23153133 23147186
295 23029982 23029518
13 23164178 23160793 23157707
287 23056342
3 23163429 23160789 23154568
50 23154115 23151895 23148910
10000 20989473
24294 23128962 23127889 23095729
4 23164779 23164767 23164357
60 23164529 23164524 23162846
15 23164731 23164723 23164675
33 23164663 23164653 23164341
288 23066764 23059048 23057539
89 23134381 23130183 23127920
289 23153710 23152126 23151804
35 23164240 23163934 23163473
6 23164325 23164005 23163425
使用combineByKey分组统计top N
def combineByKey[C](createCombiner: String => C,mergeValue: (C, String) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(String, C)]
def combineByKey[C](createCombiner: String => C,mergeValue: (C, String) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(String, C)]
def combineByKey[C](createCombiner: String => C,mergeValue: (C, String) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(String, C)]
scala> val accrdd=sc.textFile("/tmp/account.sql",10)
accrdd: org.apache.spark.rdd.RDD[String] = /tmp/account.sql MapPartitionsRDD[8] at textFile at <console>:25
scala> import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
scala> val platrdd=accrdd.map(x=>{val arr=x.split(" ");(arr(2),arr(1))})
platrdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[11] at map at <console>:29
scala> val rdd4=platrdd.combineByKey(v=>(ArrayBuffer[String]()),(m1:ArrayBuffer[String],v)=>{m1+=v;m1.sorted.reverse.take(3)},(n1:ArrayBuffer[String],n2:ArrayBuffer[String])=>{n1++=n2;n1.sorted.reverse.take(3)})
rdd4: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.ArrayBuffer[String])] = ShuffledRDD[12] at combineByKey at <console>:31
scala> rdd4.collect.foreach(line=>{print(line._1);line._2.foreach(x=>print(" "+x));println()})
277 23160123 23147727 23145975
286
187 23162165 23153133 23147186
295 23029982
13 23164178 23160793 23157707
287 23048699 23036534 23027693
3 23163429 23160789 23154568
50 23154115 23151895 23148910
10000
24294 23128962 23127889 23095729
4 23164779 23164767 23164357
60 23164529 23164524 23162846
15 23164731 23164723 23164675
33 23164663 23164653 23164341
288 23066764 23059048 23057539
89 23134381 23130183 23127920
289 23153710 23152126 23151804
35 23164240 23163934 23163473
6 23164325 23164005 23163425
17 22284389 21289970
18 23164773 23164736 23164716
27 23164509 23164315 23164215
36 23163187 23161563 23160212
191 23164694 23163359 23163186
19 23163392 23160323 23159482
46 23160265 23159171 23158976
138 23157207 23151175 23145077
291 23147661 23037617 22996169
219 23000087 22960754 22944546
9 23138867 23123314 23112930
83 23164087 23164063 23163347
247
38 23162205 23160277 23116227
158 23152661 23149311 23123978
0 23134458 23134450 23134447
93 23157244 23156623 23152646
11 23131196 23078538 23058051
21 23163298 23160329 23159013
1 23164650 23164615 23164588