zoukankan      html  css  js  c++  java
  • 大数据算法:排位问题

    此文已由作者叶林宝授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    问题描述

    (1)如上图是一张表, 第一行是列名,除第一列是string类型外, 其它列都是double类型。第二行开始,每行代表一条记录

    (2)需求:输入一个数组例如[2, 4](后文中, 简称排位数组), 要求输出每列(除第一列)按升序取出第2、第4位的值。 结果如下图所示:

    解决算法

    一、迭代计算

    简述:

    简单来说, 就是迭代每一列, 然后迭代每一行, 然后找出满足相应排名的数据

    代码:

    /**
        * 筛选出每个列满足给定的排位数组中要求的位置上的取值, 简单粗暴方案
        *
        * @param dataFrame 数据源
        * @param ranks 排位数组
        * @return
        */
      def findRankStatistics(dataFrame: DataFrame,  ranks: List[Long]): Map[Int, Iterable[Double]] = {
        require(ranks.forall(_ > 0))    // 获取字段个数
        val numberOfColumns = dataFrame.schema.length
        var i = 0
        // 保存结果, key为列索引, value为该列满足排序数组的值构成的列表
        var result = Map[Int, Iterable[Double]]()    while (i < numberOfColumns) {      // 每轮迭代, 只filter出该列数据
          val col = dataFrame.rdd.map(row => row.getDouble(i))
          val sortedCol: RDD[(Double, Long)] = col.sortBy(v => v).zipWithIndex()      // 只过滤出满足排位的数值
          val ranksOnly = sortedCol.filter { case (colValue, index) => ranks.contains(index + 1)}.keys
          val list = ranksOnly.collect()
          result += (i -> list)
          i += 1
        }
        result
      }

    分析:

    缺点:效率低,  每列排位的计算是串行的。

    二、 groupByKey

    简述:

    解决方案一, 循环迭代每一列带来的效率问题, 列与列之间的计算本质上互不影响的, 所以, 方案二的改进方法是, 列索引作为key , 每行的值作为value,依据key的hash值shuffle到不同的partition, 并行计算每个partition

    代码:

    /**
        * 筛选出每个列满足给定的排位数组中要求的位置上的数值, groupByKey方案
        *
        * @param dataFrame 数据源
        * @param ranks  排位数组
        * @return
        */
      def findRankStatistics(dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {
        require(ranks.forall(_ > 0))    // 将源数据 依据列索引(后文简称field_index)和每行对应的数值(后文简称field_value)转换为pairRDD
        val pairRDD: RDD[(Int, Double)] = mapToKeyValuePairs(dataFrame)    // 依据field_index 聚合相应filed_value
        val groupColumns: RDD[(Int, Iterable[Double])] = pairRDD.groupByKey()    
        // 对每个field_index, 计算其相应的field_values, 过滤出满足排位要求的数值
        groupColumns.mapValues(
          iter => {        // 排序field_values
            val sortedIter = iter.toArray.sorted
            sortedIter.toIterable.zipWithIndex.flatMap({          case (colValue, index) =>            if (ranks.contains(index + 1)) {
                  Iterator(colValue)
                } else {
                  Iterator.empty
                }
            })
          }).collectAsMap()
      }

    分析:

    这种方案只使用数数据规模较小(指的是行数较少)的数据, 对于大规模数据, groupByKey操作, 容易oom。

    groupByKey执行效果如下图所示:

    groupByKey会在内存中暂存所有的<key, values>, 所以, 对于同一个key, value较多的情况, 容易引起executor端oom

    三、二次排序

    简述:

    方案二, 除了容易引起executor端oom问题, 还有另外一个问题, 排序操作时在shuffle后, 在executor进行的。spark sort based shuffle 支持在shuffle阶段,直接对key进行排序。因此,可通过二次排序提高效率。

    算法思想:

    (1)将df的每行数据展开为(field_index, field_value)格式, 再转换为((field_index, field_value), 1)

    (2)自定义分区器, 根据((field_index, field_value), 1)中的field_index分区

    (3)调用repartitionAndSortWithinPartitions函数, 依据field_index分区, 依据(field_index, field_value)排序

    (4)过滤出各列满足配位需求的值

    (5)转换为所需输出格式

    代码:

    /**
        * 筛选出每个列满足给定的排位数组中要求的位置上的数值, 二次排序方案
        * @param dataFrame 数据源
        * @param targetRanks 排位数组
        * @param partitions 分区数
        * @return
        */
      def findRankStatistics(dataFrame: DataFrame, targetRanks: List[Long], partitions: Int) = {    // 将df的每行数据展开为(field_index, field_value)格式, 再转换为((field_index, field_value), 1)
        // 即最终为pariRdd
        // 其中“1”只是pairRdd中value的一个占位符, 对最终计算结果不产生影响
        val pairRDD: RDD[((Int, Double), Int)] = mapToKeyValuePairs(dataFrame).map((_, 1))    //自定义分区器, 根据((field_index, field_value), 1)中的field_index分区
        val partitioner = new ColumnIndexPartition(partitions)    //根据((field_index, field_value), 1)中的(field_index, field_value)排序
        val sorted = pairRDD.repartitionAndSortWithinPartitions(partitioner)    //过滤出所需数据
        val filterForTargetIndex: RDD[(Int, Double)] =
          sorted.mapPartitions(iter => {
            var currentColumnIndex = -1
            var runningTotal = 0
             // 过滤出各列满足配位需求的值
            iter.filter({          case (((colIndex, value), _)) =>            // 同一个分区中的数据,可能包含多个field_index即多列数据, 当遍历到新的field_index时, 需要重置计数器runningTotal
                if (colIndex != currentColumnIndex) {
                  currentColumnIndex = colIndex
                  runningTotal = 1
                } else {
                  runningTotal += 1
                }            //保留满足排位的数值
                targetRanks.contains(runningTotal)
            })
          }.map(_._1), preservesPartitioning = true)    // 转换为所需输出格式
        groupSorted(filterForTargetIndex.collect())
      }  // 隐式转换, 对于二元数组, 先按第一排序, 再按第二个排序
      implicit val ordering: Ordering[(K, S)] = Ordering.Tuple2  /**
        * 将df的每行数据展开为(field_index, field_value)格式, 再转换为((field_index, field_value), 1)
        * @param dataFrame
        * @return
        */
      def mapToKeyValuePairs(dataFrame: DataFrame): RDD[(Int, Double)] = {    // 获取字段个数
        val rowLength = dataFrame.schema.length
        dataFrame.rdd.flatMap(
          row => Range(0, rowLength).map(i => (i, row.getDouble(i)))
        )
      }  /**
        * 自定义分区器, 根据((field_index, field_value), 1)中的field_index分区
        * @param numPartitions 分区个数
        */
      class ColumnIndexPartition(override val numPartitions: Int) extends Partitioner {
        require(numPartitions >= 0, s"Number of partitions " + s"($numPartitions) cannot be negative.")    override def getPartition(key: Any): Int = {
          val k = key.asInstanceOf[(Int, Double)]
          Math.abs(k._1) % numPartitions //hashcode of column index
        }
      }  /**
        * 转换为所需输出格式
        * 将it聚合为map, 其中key为field_index, value为同一个field_index下的field_value组成的数组
        * @param it 数组中的元素为(field_index, field_value)
        * @return
        */
      private def groupSorted(it: Array[(Int, Double)]): Map[Int, Iterable[Double]] = {
        val res = List[(Int, ArrayBuffer[Double])]()
        it.foldLeft(res)((list, next) => list match {      case Nil =>
            val (firstKey, value) = next        List((firstKey, ArrayBuffer(value)))      case head :: rest =>
            val (curKey, valueBuf) = head        val (firstKey, value) = next        if (!firstKey.equals(curKey)) {
              (firstKey, ArrayBuffer(value)) :: list
            } else {
              valueBuf.append(value)
              list
            }
        }).map { case (key, buf) => (key, buf.toIterable) }.toMap
      }


    分析:

    相比于方案二, 这种方案将数据排序下推到shuffle阶段, 然后对每个partitions的数据,迭代每条记录过滤出所需数据, 避免了executor将所有数据加载到内存中。但是, 在shuffle阶段, 也可能出现由于数据量过大(数据本身行数较多, 且不同的key都打到同一个executor), 特别是重复数据特别多的情况下, 导致在二次排序过程中oom。



    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 django项目在uwsgi+nginx上部署遇到的坑
    【推荐】 使用QUIC

  • 相关阅读:
    一些业内有名的网站收集
    WCF重载
    FCKEditor fckconfig.js配置,添加字体和大小 附:中文字体乱码问题解决
    查询第几条到第几条的数据的SQL语句
    SPOJ 9939 Eliminate the Conflict
    UVA 10534 Wavio Sequence
    HDU 3474 Necklace
    POJ 2823 Sliding Window
    UVA 437 The Tower of Babylon
    UVA 825 Walking on the Safe Side
  • 原文地址:https://www.cnblogs.com/163yun/p/9881016.html
Copyright © 2011-2022 走看看