zoukankan      html  css  js  c++  java
  • Spark 中在处理大批量数据排序问题时,如何避免OOM

    错误思想

    举个列子,当我们想要比较  一个  类型为  RDD[(Long, (String, Int))]   的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable  转换为 list,然后sortby,但是这样却有一个致命的缺点,就是Iterable 在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable 含有元素过多,那么极易引起OOM

     val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
            // 4. 排序, 取top10
            val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map {
                case (cid, sidCountIt) =>
                    // sidCountIt 排序, 取前10
                    // Iterable转成容器式集合的时候, 如果数据量过大, 极有可能导致oom
                    (cid, sidCountIt.toList.sortBy(-_._2).take(5))
            }

    方法一:利用RDD排序特点

    首先,我们要知道,RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序

     //把long(即key值)提取出来
            val cids: List[Long] = categoryCountList.map(_.cid.toLong)
            val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]()
            //根据每个key来过滤RDD
            for (cid <- cids) {
                /*
                List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)), (15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)), (15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)), (15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)), (15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8)))
                目标:
                (9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9), (329b966c-d61b-46ad-949a-7e37142d384a,8), (5e3545a0-1521-4ad6-91fe-e792c20c46da,8), (e306c00b-a6c5-44c2-9c77-15e919340324,7), (bed60a57-3f81-4616-9e8b-067445695a77,7)))
                 */
                val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1)
                    .sortBy(-_._2._2)
                    .take(5)
                    .map(_._2)
                buffer += ((cid, arr.toList))
            }
            buffer.foreach(println)

    这样做也有缺点:即有多少个key,就有多少个Job,占用资源

    方法二:利用TreeSet自动排序的特性

     def statCategoryTop10Session_3(sc: SparkContext,
                                       categoryCountList: List[CategroyCount],
                                       userVisitActionRDD: RDD[UserVisitAction]) = {
            // 1. 过滤出来 top10品类的所有点击记录
            // 1.1 先map出来top10的品类id
            val cids = categoryCountList.map(_.cid.toLong)
            val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))
    
    
            // 2. 计算每个品类 下的每个session 的点击量  rdd ((cid, sid) ,1)
            val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD
                .map(action => ((action.click_category_id, action.session_id), 1))
                // 使用自定义分区器  重点理解分区器的原理
                .reduceByKey(new CategoryPartitioner(cids), _ + _)
                .map {
                    case ((cid, sid), count) => (cid, (sid, count))
                }
            
            // 3. 排序取top10
    //因为已经按key分好了区,所以用Mappartitions ,在每个分区中新建一个TreeSet即可
            val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => {
    //new 一个TreeSet,并同时指定排序规则
         var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] {
                        override def compare(x: CategorySession, y: CategorySession): Int = {
                            if (x.clickCount >= y.clickCount) -1 else 1
                        }
                    })
         var id = 0l
        iter.foreach({
            case (l, session) => {
                id = l
                treeSet.add(session)
            if (treeSet.size > 10) treeSet = treeSet.take(10)
                        }
                    })
                    Iterator(id, treeSet)
                })
        
            result.collect.foreach(println)
            
            Thread.sleep(1000000)
        }
    }
    
    /*
    根据传入的key值来决定分区号,让相同key进入相同的分区,能够避免多次shuffle
     */
    class CategoryPartitioner(cids: List[Long]) extends Partitioner {
        // 用cid索引, 作为将来他的分区索引.
        private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap
        
        // 返回集合的长度
        override def numPartitions: Int = cids.length
        
        // 根据key返回分区的索引
        override def getPartition(key: Any): Int = {
            key match {
                // 根据品类id返回分区的索引!    0-9
                case (cid: Long, _) =>
                    cidWithIndex(cid)
            }
        }
    }

    巧妙利用分区器可以避免多次shuffle

  • 相关阅读:
    Eclipse+EPIC+PadWalker
    Commit message 和 Change log 编写指南
    把perl脚本编译成exe
    Qt使用中碰到的问题
    Python——函数 7、位置参数与默认参数之间的关系
    Python——函数 6、默认参数
    Python——函数 5、位置参数与关键字参数
    Python——函数 4、形参的应用
    Python——函数 3、实参与形参
    Python——函数 2、返回值
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/12912456.html
Copyright © 2011-2022 走看看