zoukankan      html  css  js  c++  java
  • 大数据算法:排位问题(2)

    此文已由作者叶林宝授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    方案四:Sort on Cell Values

    简述:

    上述方案三, 当数据行数较多, 情况下, 在二次排序还是可能出现oom情况, 而且, 不同的field_index的数据可能shuffle到同一个分区,这样就加大了oom的概率。当field_index本身取值较多 情况下, 增加分区数是其中一种解决方法。但是field_index取值本身就少于分区数的情况下, 增加分区数对缓解oom就没任何作用了。 如果 当field_value相比field_index较为分散, 且值较多的情况下, 不妨换个思维, 按field_value分区。 具体算法如下:

    算法:

    (1)将df 转换为(field_value, field_index)

    (2)对分区内的数据, 用sortByKey根据 field_value排序 (rangPartition排序)

    (3)利用mapPartitions确定每个分区内的每个field_index共有多少数据(不同分区中的filed_value相对有序, 例如partiiton1 中的filed_value比partition2中的field_value小)

    (4)利用第(3)步数据, 确定每个field_index中所需要的排名的数据在哪个分区以及分区内第几条数据。例如要输出field_index_6的13th位数据,假设第一个分区已经包含10条数据, 则目标数据在第二个分区的第3条数据

    (5)转换(4)计算结果为标准输出格式

    代码:

    (1)

    /**
        * 将数据源df转换为(field_value, field_index)格式的rdd
        * @param dataFrame
        * @return
        */
      def getValueColumnPairs(dataFrame : DataFrame): RDD[(Double, Int)] =
      {
        dataFrame.rdd.flatMap{
          row: Row => row.toSeq.zipWithIndex
            .map{case (v, index) => (v.toString.toDouble, index)}
        }
      }

    (3)

    /**
        * 对按照field_value排序后的sortedValueColumnPairs, 计算出每个分区上, 每个field_index分别有多少数据
        * @param sortedValueColumnPairs
        * @param numOfColumns
        * @return
        */
      def getColumnsFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)],numOfColumns : Int): Array[(Int, Array[Long])] = {
        val zero = Array.fill[Long](numOfColumns)(0)    def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = {
          val columnsFreq : Array[Long] = valueColumnPairs.aggregate(zero)(
            (a : Array[Long], v : (Double, Int)) => {
              val (value, colIndex) = v          //increment the cell in the zero array corresponding to this column
              a(colIndex) = a(colIndex) + 1L
              a
            },
            (a : Array[Long], b : Array[Long]) => {
              a.zip(b).map{ case(aVal, bVal) => aVal + bVal}
            })
          Iterator((partitionIndex, columnsFreq))
        }
        sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()
      }

    举例说明:

    假设对(1)中转换后的数据, 按照field_value排序后, 各个分区的数据如下所示

    Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)

    Partition 2: (7.5, 1) (9.5, 2)

    则(2)的输出结果为:

    [(0, [2, 1, 1]), (1, [0, 1, 1])]

    (4)

    /**
        * 计算每个field_index所需排位数据在第几个分区的第几条数据
        * @param targetRanks 排位数组
        * @param partitionColumnsFreq 每个分区的每个field_index包含多少数据
        * @param numOfColumns field个数
        * @return
        */
      def getRanksLocationsWithinEachPart(targetRanks : List[Long],
                                          partitionColumnsFreq : Array[(Int, Array[Long])],
                                          numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = {    // 二维数组, 存储当前每个field_index, 遍历到到第几条数据
        val runningTotal = Array.fill[Long](numOfColumns)(0)    // The partition indices are not necessarily in sorted order, so we need
        // to sort the partitionsColumnsFreq array by the partition index (the
        // first value in the tuple).
        partitionColumnsFreq.sortBy(_._1).map {      // relevantIndexList 存储分区上, 满足排位数组的field_index在该分区的第几条数据
          case (partitionIndex, columnsFreq) => val relevantIndexList = new mutable.MutableList[(Int, Long)]()
            columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) =>          // 当天field_index(即colIndex), 遍历到第几条数据
              val runningTotalCol = runningTotal(colIndex)          //  当前field_index(即colIndex),排位数组中哪些排位位于当前分区
              val ranksHere: List[Long] = targetRanks.filter(rank =>
                runningTotalCol < rank && runningTotalCol + colCount >= rank)          // 计算出当前分区,当前field_index(即colIndex), 满足排位数组的field_value在当前分区的位置
              relevantIndexList ++= ranksHere.map(rank => (colIndex, rank - runningTotalCol))
              runningTotal(colIndex) += colCount
            }
            (partitionIndex, relevantIndexList.toList)
        }
      }

    举个例子:

    假如目标排位:targetRanks: [5]

    各分区各feild_index数据量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]

    字段个数:numOfColumns: 2

    输出结果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]

    (5)

    /**
        * 过滤出每个field_index 所需排位的数值
        * @param sortedValueColumnPairs
        * @param ranksLocations (4)中计算出的满足排位数组要求的每个分区上,每个field_index在该分区的第几条数据
        * @return
        */
      def findTargetRanksIteratively( sortedValueColumnPairs : RDD[(Double, Int)], ranksLocations : Array[(Int, List[(Int, Long)])]):
      RDD[(Int, Double)] = {
        sortedValueColumnPairs.mapPartitionsWithIndex(
          (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {        // 当前分区上, 满足排位数组的feild_index及其在该分区上的位置
            val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2        if (targetsInThisPart.nonEmpty) {          // map中的key为field_index, value为该feild_index在当前分区中的哪些位置上的数据满足排位数组要求
              val columnsRelativeIndex: Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
              val columnsInThisPart = targetsInThisPart.map(_._1).distinct          // 存储各个field_index, 在分区遍历了多少条数据
              val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
              runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap          // 遍历当前分区的数据源, 格式为(field_value, field_index), 过滤出满足排位数据要求的数据
              valueColumnPairs.filter{            case(value, colIndex) =>
                  lazy val thisPairIsTheRankStatistic: Boolean = {                // 每遍历一条数据, runningTotals上对应的field_index 当前已遍历数据量+1
                    val total = runningTotals(colIndex) + 1L
                    runningTotals.update(colIndex, total)
                    columnsRelativeIndex(colIndex).contains(total)
                  }
                  (runningTotals contains colIndex) && thisPairIsTheRankStatistic
              }.map(_.swap)
            } else {
              Iterator.empty
            }
          })
      }


    分析:

    (1)这种方法代码可读性较差

    (2)需要遍历两遍原始数据

    (3)相比于方案三, 更加有效避免executor内oom

    (4)当field_value分布较离散的情况下, 这种方案相比于前三种, 效率更高

    (5)上述算法中, 有两个潜在的问题, 当field_value倾斜情况下(即某个范围的值特别多),算法效率严重依赖于算法描述中的步骤(2)是否能将所有的field_value均匀的分配到各个partition;另一个问题是,当某些field_value重复现象比较多时, 是否可以合并对这些field_value的计数,而不是在一个partition中的iterator中挨个遍历这些重复数据。

    备注:上述内容(问题背景、解决算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 [翻译]pytest测试框架(一)
    【推荐】 浅谈js拖拽
    【推荐】 HBase最佳实践-集群规划

  • 相关阅读:
    多边形游戏
    大整数乘法
    矩阵连乘
    最长公共子序列
    动态规划
    快速排序
    二分搜索技术
    动态规划基本要素
    合并排序
    最大子段和
  • 原文地址:https://www.cnblogs.com/163yun/p/9881058.html
Copyright © 2011-2022 走看看