zoukankan      html  css  js  c++  java
  • 大数据算法:排位问题(2)

    此文已由作者叶林宝授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    方案四:Sort on Cell Values

    简述:

    上述方案三, 当数据行数较多, 情况下, 在二次排序还是可能出现oom情况, 而且, 不同的field_index的数据可能shuffle到同一个分区,这样就加大了oom的概率。当field_index本身取值较多 情况下, 增加分区数是其中一种解决方法。但是field_index取值本身就少于分区数的情况下, 增加分区数对缓解oom就没任何作用了。 如果 当field_value相比field_index较为分散, 且值较多的情况下, 不妨换个思维, 按field_value分区。 具体算法如下:

    算法:

    (1)将df 转换为(field_value, field_index)

    (2)对分区内的数据, 用sortByKey根据 field_value排序 (rangPartition排序)

    (3)利用mapPartitions确定每个分区内的每个field_index共有多少数据(不同分区中的filed_value相对有序, 例如partiiton1 中的filed_value比partition2中的field_value小)

    (4)利用第(3)步数据, 确定每个field_index中所需要的排名的数据在哪个分区以及分区内第几条数据。例如要输出field_index_6的13th位数据,假设第一个分区已经包含10条数据, 则目标数据在第二个分区的第3条数据

    (5)转换(4)计算结果为标准输出格式

    代码:

    (1)

    /**
        * 将数据源df转换为(field_value, field_index)格式的rdd
        * @param dataFrame
        * @return
        */
      def getValueColumnPairs(dataFrame : DataFrame): RDD[(Double, Int)] =
      {
        dataFrame.rdd.flatMap{
          row: Row => row.toSeq.zipWithIndex
            .map{case (v, index) => (v.toString.toDouble, index)}
        }
      }

    (3)

    /**
        * 对按照field_value排序后的sortedValueColumnPairs, 计算出每个分区上, 每个field_index分别有多少数据
        * @param sortedValueColumnPairs
        * @param numOfColumns
        * @return
        */
      def getColumnsFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)],numOfColumns : Int): Array[(Int, Array[Long])] = {
        val zero = Array.fill[Long](numOfColumns)(0)    def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = {
          val columnsFreq : Array[Long] = valueColumnPairs.aggregate(zero)(
            (a : Array[Long], v : (Double, Int)) => {
              val (value, colIndex) = v          //increment the cell in the zero array corresponding to this column
              a(colIndex) = a(colIndex) + 1L
              a
            },
            (a : Array[Long], b : Array[Long]) => {
              a.zip(b).map{ case(aVal, bVal) => aVal + bVal}
            })
          Iterator((partitionIndex, columnsFreq))
        }
        sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()
      }

    举例说明:

    假设对(1)中转换后的数据, 按照field_value排序后, 各个分区的数据如下所示

    Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)

    Partition 2: (7.5, 1) (9.5, 2)

    则(2)的输出结果为:

    [(0, [2, 1, 1]), (1, [0, 1, 1])]

    (4)

    /**
        * 计算每个field_index所需排位数据在第几个分区的第几条数据
        * @param targetRanks 排位数组
        * @param partitionColumnsFreq 每个分区的每个field_index包含多少数据
        * @param numOfColumns field个数
        * @return
        */
      def getRanksLocationsWithinEachPart(targetRanks : List[Long],
                                          partitionColumnsFreq : Array[(Int, Array[Long])],
                                          numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = {    // 二维数组, 存储当前每个field_index, 遍历到到第几条数据
        val runningTotal = Array.fill[Long](numOfColumns)(0)    // The partition indices are not necessarily in sorted order, so we need
        // to sort the partitionsColumnsFreq array by the partition index (the
        // first value in the tuple).
        partitionColumnsFreq.sortBy(_._1).map {      // relevantIndexList 存储分区上, 满足排位数组的field_index在该分区的第几条数据
          case (partitionIndex, columnsFreq) => val relevantIndexList = new mutable.MutableList[(Int, Long)]()
            columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) =>          // 当天field_index(即colIndex), 遍历到第几条数据
              val runningTotalCol = runningTotal(colIndex)          //  当前field_index(即colIndex),排位数组中哪些排位位于当前分区
              val ranksHere: List[Long] = targetRanks.filter(rank =>
                runningTotalCol < rank && runningTotalCol + colCount >= rank)          // 计算出当前分区,当前field_index(即colIndex), 满足排位数组的field_value在当前分区的位置
              relevantIndexList ++= ranksHere.map(rank => (colIndex, rank - runningTotalCol))
              runningTotal(colIndex) += colCount
            }
            (partitionIndex, relevantIndexList.toList)
        }
      }

    举个例子:

    假如目标排位:targetRanks: [5]

    各分区各feild_index数据量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]

    字段个数:numOfColumns: 2

    输出结果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]

    (5)

    /**
        * 过滤出每个field_index 所需排位的数值
        * @param sortedValueColumnPairs
        * @param ranksLocations (4)中计算出的满足排位数组要求的每个分区上,每个field_index在该分区的第几条数据
        * @return
        */
      def findTargetRanksIteratively( sortedValueColumnPairs : RDD[(Double, Int)], ranksLocations : Array[(Int, List[(Int, Long)])]):
      RDD[(Int, Double)] = {
        sortedValueColumnPairs.mapPartitionsWithIndex(
          (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {        // 当前分区上, 满足排位数组的feild_index及其在该分区上的位置
            val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2        if (targetsInThisPart.nonEmpty) {          // map中的key为field_index, value为该feild_index在当前分区中的哪些位置上的数据满足排位数组要求
              val columnsRelativeIndex: Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
              val columnsInThisPart = targetsInThisPart.map(_._1).distinct          // 存储各个field_index, 在分区遍历了多少条数据
              val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
              runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap          // 遍历当前分区的数据源, 格式为(field_value, field_index), 过滤出满足排位数据要求的数据
              valueColumnPairs.filter{            case(value, colIndex) =>
                  lazy val thisPairIsTheRankStatistic: Boolean = {                // 每遍历一条数据, runningTotals上对应的field_index 当前已遍历数据量+1
                    val total = runningTotals(colIndex) + 1L
                    runningTotals.update(colIndex, total)
                    columnsRelativeIndex(colIndex).contains(total)
                  }
                  (runningTotals contains colIndex) && thisPairIsTheRankStatistic
              }.map(_.swap)
            } else {
              Iterator.empty
            }
          })
      }


    分析:

    (1)这种方法代码可读性较差

    (2)需要遍历两遍原始数据

    (3)相比于方案三, 更加有效避免executor内oom

    (4)当field_value分布较离散的情况下, 这种方案相比于前三种, 效率更高

    (5)上述算法中, 有两个潜在的问题, 当field_value倾斜情况下(即某个范围的值特别多),算法效率严重依赖于算法描述中的步骤(2)是否能将所有的field_value均匀的分配到各个partition;另一个问题是,当某些field_value重复现象比较多时, 是否可以合并对这些field_value的计数,而不是在一个partition中的iterator中挨个遍历这些重复数据。

    备注:上述内容(问题背景、解决算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 从互联网+角度看云计算的现状与未来

  • 相关阅读:
    自定义组件要加@click方法
    绑定样式
    647. Palindromic Substrings
    215. Kth Largest Element in an Array
    448. Find All Numbers Disappeared in an Array
    287. Find the Duplicate Number
    283. Move Zeroes
    234. Palindrome Linked List
    202. Happy Number
    217. Contains Duplicate
  • 原文地址:https://www.cnblogs.com/zyfd/p/9881060.html
Copyright © 2011-2022 走看看