zoukankan      html  css  js  c++  java
  • spark 实现TOP N

    数据量较少的情况下:

    scala> numrdd.sortBy(x=>x,false).take(3)

    res17: Array[Int] = Array(100, 99, 98)

    scala> numrdd.sortBy(x=>x,true).take(3)

    res18: Array[Int] = Array(1, 2, 3)

    数据相当大的情况下,当个服务器内存无法完成TOP N,由于数据比较大,spark从hdfs上读取数据,根据数据本地化的原则,数据根据加载到不同的节点上,我们可以使用mappartition获取每个分区的top N,然后再次排序获取整个数据文件的top N

    scala> val numrdd=sc.makeRDD(1 to 10000000,20) // 例如有1KW的数字,当然实际中数值可能更大
    numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24


    scala> numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)}).collect
    res2: Array[Int] = Array(5000000, 4999999, 4999998, 4999997, 4999996, 10000000, 9999999, 9999998, 9999997, 9999996, 15000000, 14999999, 14999998, 14999997, 14999996, 20000000, 19999999, 19999998, 19999997, 19999996, 25000000, 24999999, 24999998, 24999997, 24999996, 30000000, 29999999, 29999998, 29999997, 29999996, 35000000, 34999999, 34999998, 34999997, 34999996, 40000000, 39999999, 39999998, 39999997, 39999996, 45000000, 44999999, 44999998, 44999997, 44999996, 50000000, 49999999, 49999998, 49999997, 49999996, 55000000, 54999999, 54999998, 54999997, 54999996, 60000000, 59999999, 59999998, 59999997, 59999996, 65000000, 64999999, 64999998, 64999997, 64999996, 70000000, 69999999, 69999998, 69999997, 69999996, 75000000, 74999999, 74999998, 74999997, 74999996, 80000000, 79999999, 79999998, 7...

    scala> val maprdd=numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)})
    maprdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:26

    scala> maprdd.sortBy(x=>x,false).take(5)

    [Stage 3:==========================================>              (15 + 5) / 20]18/08/31 18:05:20 WARN spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 166889 ms exceeds timeout 120000 ms 18/08/31 18:05:25 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 3.0 (TID 53, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms [Stage 3:===================================================>     (18 + 2) / 20]18/08/31 18:06:19 WARN spark.HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 156368 ms exceeds timeout 120000 ms 18/08/31 18:06:23 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:23 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0 (TID 55, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:27 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 3.0 (TID 52, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms [Stage 3:===================================================>     (18 + 0) / 20]18/08/31 18:06:32 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. [Stage 3:===================================================>     (18 + 2) / 20]18/08/31 18:06:33 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/1 [Stage 4:>                                                         (0 + 2) / 20]18/08/31 18:06:44 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 18/08/31 18:06:45 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/0 [Stage 4:=========================================================(20 + 0) / 20]18/08/31 18:09:42 WARN master.Master: Removing worker-20180831175320-192.168.53.122-55296 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-59602 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-56119 because we got no heartbeat in 60 seconds 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 3 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 4 on 192.168.53.122: worker lost 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector sun.nio.ch.EPollSelectorImpl@2972d788. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector sun.nio.ch.EPollSelectorImpl@6763b7c7. 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/3 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/2 18/08/31 18:10:01 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/4 18/08/31 18:10:19 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 94, 192.168.53.122, executor 5): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0         at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)         at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)         at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)         at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)         at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)         at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)         at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)         at org.apache.spark.scheduler.Task.run(Task.scala:108)         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)         at java.lang.Thread.run(Thread.java:745)

    ) res3: Array[Int] = Array(100000000, 99999999, 99999998, 99999997, 99999996)    

    由于本机是单机测试,内存不足,重试多次后艰难完成。

     按分区排序,整个分区读入内存,会导致内存不足,改良如下:

    scala> val numrdd=sc.makeRDD(1 to 1000000000,20)
    numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:26


    scala>  import scala.collection.mutable.ArrayBuffer
    import scala.collection.mutable.ArrayBuffer

    scala> numrdd.mapPartitions(iteror=>{var lst=new ArrayBuffer[Int]();iteror.foreach(line=>{lst+=line;if(lst.length>5){lst=lst.sorted.reverse.take(5)}});lst.toIterator}).sortBy(x=>x,false).take(5)
    res58: Array[Int] = Array(1000000000, 999999999, 999999998, 999999997, 999999996)

    按分区逐个元素写入数组,数组元素个数到达N的数值,则排序取数组的前N个数,数据处理慢,但不会由于内存不足导致失败重算。

    分组取前N


    scala> val accrdd=sc.textFile("/tmp/account.sql")
    accrdd: org.apache.spark.rdd.RDD[String] = /tmp/account.sql MapPartitionsRDD[67] at textFile at <console>:27

    scala> accrdd.take(3)
    res63: Array[String] = Array(1004210    1004210 6       20180116        2018-01-16 10:39:50, 20946754   20946754        0       20170913        2017-09-13 10:02:37, 20946766   20946766 0       20170901        2017-09-01 16:51:30)

    按第三列分组,取第二列的前3名

    scala> val platrdd=accrdd.map(x=>{val arr=x.split(" ");(arr(2),arr(1))})
    platrdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26

    scala> val rdd1=platrdd.groupByKey().map(x=>(x._1,x._2.toList.sorted.reverse.take(3)))
    rdd1: org.apache.spark.rdd.RDD[(String, List[String])] = MapPartitionsRDD[69] at map at <console>:31

    scala> rdd1.collect
    res64: Array[(String, List[String])] = Array((277,List(23160123, 23147727, 23145975)), (158,List(23152661, 23149311, 23123978)), (4,List(23164779, 23164767, 23164357)), (60,List(23164529, 23164524, 23162846)), (19,List(23163392, 23160323, 23159482)), (15,List(23164731, 23164723, 23164675)), (35,List(23164240, 23163934, 23163473)), (6,List(23164325, 23164005, 23163425)), (0,List(23134458, 23134450, 23134447)), (46,List(23160265, 23159171, 23158976)), (138,List(23157207, 23151175, 23145077)), (286,List(23063520)), (187,List(23162165, 23153133, 23147186)), (291,List(23147661, 23147644, 23037617)), (219,List(23000087, 22960754, 22953189)), (93,List(23157244, 23156623, 23152646)), (295,List(23029982, 23029518)), (33,List(23164663, 23164653, 23164341)), (17,List(22962791, 22869569, 22771519))...

    按列显示

    scala> rdd1.collect.foreach(line=>{print(line._1);line._2.foreach(x=>print(" "+x));println();})
    277     23160123        23147727        23145975                               
    158     23152661        23149311        23123978
    4       23164779        23164767        23164357
    60      23164529        23164524        23162846
    19      23163392        23160323        23159482
    15      23164731        23164723        23164675
    35      23164240        23163934        23163473
    6       23164325        23164005        23163425
    0       23134458        23134450        23134447
    46      23160265        23159171        23158976
    138     23157207        23151175        23145077
    286     23063520
    187     23162165        23153133        23147186
    291     23147661        23147644        23037617
    219     23000087        22960754        22953189
    93      23157244        23156623        23152646
    295     23029982        23029518
    33      23164663        23164653        23164341
    17      22962791        22869569        22771519
    13      23164178        23160793        23157707
    11      23131196        23078538        23058051

    scala> rdd1.collect.foreach(line=>{line._2.foreach(x=>println(line._1+" "+x));})
    277     23160123                                                               
    277     23147727
    277     23145975
    158     23152661
    158     23149311
    158     23123978
    4       23164779
    4       23164767
    4       23164357
    60      23164529
    60      23164524
    60      23162846
    19      23163392
    19      23160323
    19      23159482
    15      23164731
    15      23164723
    15      23164675
    35      23164240
    35      23163934
    35      23163473
    6       23164325
    6       23164005
    6       23163425
    0       23134458
    0       23134450
    0       23134447

     使用aggregateByKey分组统计top N

    def aggregateByKey[U](zeroValue: U)(seqOp: (U, String) => U,combOp: (U, U) => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
    def aggregateByKey[U](zeroValue: U,numPartitions: Int)(seqOp: (U, String) => U,combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]
    def aggregateByKey[U](zeroValue: U,partitioner: org.apache.spark.Partitioner)(seqOp: (U, String) => U,combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]

    When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

    scala> val accrdd=sc.textFile("/tmp/account.sql",10)

    accrdd: org.apache.spark.rdd.RDD[String] = /tmp/account.sql MapPartitionsRDD[18] at textFile at <console>:26

    scala> val platrdd=accrdd.map(x=>{val arr=x.split(" ");(arr(2),arr(1))})

    platrdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[19] at map at <console>:28

    scala> import scala.collection.mutable.ArrayBuffer
    import scala.collection.mutable.ArrayBuffer


    scala> val rdd2=platrdd.aggregateByKey(ArrayBuffer[String]())((u,v)=>{u+=v;u.sorted.reverse.take(3)},(p1,p2)=>{p1++=p2;p2.sorted.reverse.take(3)})
    rdd2: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.ArrayBuffer[String])] = ShuffledRDD[21] at aggregateByKey at <console>:31

    scala> rdd2.collect.foreach(line=>{line._2.foreach(x=>println(line._1+" "+x));})
    277     23160123
    277     23147727
    277     23145975
    286     23063520
    187     23162165
    187     23153133
    187     23147186
    295     23029982
    295     23029518
    13      23164178
    13      23160793
    13      23157707
    287     23056342
    3       23163429
    3       23160789
    3       23154568
    50      23154115
    50      23151895
    50      23148910


    scala> rdd2.collect.foreach(line=>{print(line._1);line._2.foreach(x=>print(" "+x));println()})
    277     23160123        23147727        23145975
    286     23063520
    187     23162165        23153133        23147186
    295     23029982        23029518
    13      23164178        23160793        23157707
    287     23056342
    3       23163429        23160789        23154568
    50      23154115        23151895        23148910
    10000   20989473
    24294   23128962        23127889        23095729
    4       23164779        23164767        23164357
    60      23164529        23164524        23162846
    15      23164731        23164723        23164675
    33      23164663        23164653        23164341
    288     23066764        23059048        23057539
    89      23134381        23130183        23127920
    289     23153710        23152126        23151804
    35      23164240        23163934        23163473
    6       23164325        23164005        23163425

     使用combineByKey分组统计top N
    def combineByKey[C](createCombiner: String => C,mergeValue: (C, String) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(String, C)]
    def combineByKey[C](createCombiner: String => C,mergeValue: (C, String) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(String, C)]
    def combineByKey[C](createCombiner: String => C,mergeValue: (C, String) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(String, C)]


    scala> val accrdd=sc.textFile("/tmp/account.sql",10)
    accrdd: org.apache.spark.rdd.RDD[String] = /tmp/account.sql MapPartitionsRDD[8] at textFile at <console>:25


    scala> import scala.collection.mutable.ArrayBuffer
    import scala.collection.mutable.ArrayBuffer

    scala> val platrdd=accrdd.map(x=>{val arr=x.split(" ");(arr(2),arr(1))})
    platrdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[11] at map at <console>:29

    scala> val rdd4=platrdd.combineByKey(v=>(ArrayBuffer[String]()),(m1:ArrayBuffer[String],v)=>{m1+=v;m1.sorted.reverse.take(3)},(n1:ArrayBuffer[String],n2:ArrayBuffer[String])=>{n1++=n2;n1.sorted.reverse.take(3)})
    rdd4: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.ArrayBuffer[String])] = ShuffledRDD[12] at combineByKey at <console>:31

    scala> rdd4.collect.foreach(line=>{print(line._1);line._2.foreach(x=>print(" "+x));println()})
    277     23160123        23147727        23145975
    286
    187     23162165        23153133        23147186
    295     23029982
    13      23164178        23160793        23157707
    287     23048699        23036534        23027693
    3       23163429        23160789        23154568
    50      23154115        23151895        23148910
    10000
    24294   23128962        23127889        23095729
    4       23164779        23164767        23164357
    60      23164529        23164524        23162846
    15      23164731        23164723        23164675
    33      23164663        23164653        23164341
    288     23066764        23059048        23057539
    89      23134381        23130183        23127920
    289     23153710        23152126        23151804
    35      23164240        23163934        23163473
    6       23164325        23164005        23163425
    17      22284389        21289970
    18      23164773        23164736        23164716
    27      23164509        23164315        23164215
    36      23163187        23161563        23160212
    191     23164694        23163359        23163186
    19      23163392        23160323        23159482
    46      23160265        23159171        23158976
    138     23157207        23151175        23145077
    291     23147661        23037617        22996169
    219     23000087        22960754        22944546
    9       23138867        23123314        23112930
    83      23164087        23164063        23163347
    247
    38      23162205        23160277        23116227
    158     23152661        23149311        23123978
    0       23134458        23134450        23134447
    93      23157244        23156623        23152646
    11      23131196        23078538        23058051
    21      23163298        23160329        23159013
    1       23164650        23164615        23164588

  • 相关阅读:
    文件上传案例_Socket_测试
    Linux的小整理(入门)
    full stack 第一天
    基础考题 试题
    shell语法
    网络管理
    图像类
    定时储存
    网络管理
    磁盘管理
  • 原文地址:https://www.cnblogs.com/playforever/p/9567183.html
Copyright © 2011-2022 走看看