zoukankan      html  css  js  c++  java
  • 轻量级OLAP(一):Cube计算

    有一个数据多维分析的任务:

    • 日志的周UV;
    • APP的收集量及标注量,TOP 20 APP(周UV),TOP 20 APP标注分类(周UV);
    • 手机机型的收集量及标注量,TOP 20 机型(周UV),TOP 20 手机厂商(周UV);

    初始的解决方案:Spark读取数据日志,然后根据分析需求逐一进行map、distinct、reduceByKey得到分析结果。但是,这种方案存在着非常大的缺点——重复扫描数据源多次。

    1. Pig

    Pig提供cube关键字做OLAP,将dimension分为了两类:

    • normal,对应于cube operation,(n)个该维度的组合数为(2^n)
    • hierarchical ordering,对应于rollup operation, (n)个该维度的组合数为(n+1)

    官方doc例子如下:

    salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS
        (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long);
    cubedinp = CUBE salesinp BY CUBE(product,year);
    result = FOREACH cubedinp GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;
    
    salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS
        (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long);
    rolledup = CUBE salesinp BY ROLLUP(region,state,city);
    result = FOREACH rolledup GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales
    

    在例子中,cube的操作相当于按维度组合对每一record进行展开并group by Dimensions,与下一句foreach语句构成了Dimensions + Measure的数据输出格式。

    2. Spark

    朴素多维分析

    从上面介绍的pig OLAP方案中,我们得到灵感——面对开篇的多维分析需求,也可以每一条记录按Dimensions + Measure的规则进行展开:

    /**
     * @param e (uid, LogFact)
     * @return Array[((dimension order No, dimension), measure)]
     */
    def flatAppDvc(e: (String, CaseClasses.LogFact)): Array[((String, String), String)] = {
      val source = (("00", e._2.source), e._1)
      val appName = (("11", e._2.appName), e._1)
      val appTag = (("12", e._2.appTag), e._1)
      val appAll = (("13", "a"), e._1)
      val appCollect = (("14", "a"), e._2.appName)
      val appLabel = e._2.appTag match {
        case "EMPTY" => (("15", "a"), "useless")
        case _ => (("15", "a"), e._2.appName)
      }
      val dvcModel = (("21", e._2.dvcModelLabel), e._1)
      val vendor = (("22", e._2.vendor), e._1)
      val (osAll, osCollect) = ((("23", e._2.osType), e._1), (("24", e._2.osType), e._2.dvcModel))
      val osLabel = e._2.dvcModelLabel match {
        case "EMPTY" => (("25", e._2.osType), "useless")
        case _ => (("25", e._2.osType), e._2.dvcModel)
      }
    
      Array(source, appName, appTag, appAll, appCollect, appLabel, dvcModel, vendor,
        osAll, osCollect, osLabel).filter(_._2 != "useless")
    }
    

    为了区别不同的维度组合,代码中采取了比较low的方式——为每个维度组合进行编号以示区别。Spark提供flatMap API将一行展开为多行,完美地满足了维度展开的需求;然后通过一把group by key + distinct count即可得到结果:

    val flatRdd = logRdd.flatMap(flatAppDvc)
    val result = flatRdd.distinct()
      .mapValues(_ => 1)
      .reduceByKey(_ + _)
    

    多Measure

    前面的分析需求比较简单,measure均为distinct count;因而可以不必对齐Dimensions + Measure。然而,对于比较复杂的分析需求:

    • (整体上)广告物料的收集量、标注量、PV;
    • (广告物料的)二级标注类别的广告物料数、UV、PV;
    • (广告物料的)一级标注类别的广告物料数、UV、PV;

    measure既有distinct count (UV) 也有count (PV),这时需要Dimensions + Measure的对齐,维度flatMap如下:

    /**
     * @param e ((adid, 2nd ad-category, 1st ad-category, uid)
     * @return Array[((dimension order No, dimension), measure:(adid, uid or adid, 1)]
     */
    def flatAd(e: ((String, String, String), String)) = {
      val all = e._1._2 match {
        case "EMPTY" => (("0", "all"), (e._1._1, "non", 0))
        case _ => (("0", "all"), (e._1._1, e._1._1, 1))
      }
      val adCate = (("1", e._1._2), (e._1._1, e._2, 1))
      val adParent = (("2", e._1._3), (e._1._1, e._2, 1))
    
      Array(all, adCate, adParent)
    }
    

    尔后,计算每一维度的measure(其中distinct count采用HyperLogLogPlus算法的stream lib实现):

    val createHLL = (v: String) => {
      val hll = new HyperLogLogPlus(14, 0) // relative-SD = 0.01
      hll.offer(v)
      hll
    }
    
    def computeAdDimention(rdd: RDD[((String, String), (String, String, Int))]) = {
      rdd.combineByKey[(HyperLogLogPlus, HyperLogLogPlus, Int)](
        (v: (String, String, Int)) => (createHLL(v._1), createHLL(v._2), 1),
        (m: (HyperLogLogPlus, HyperLogLogPlus, Int), v: (String, String, Int)) => {
          m._1.offer(v._1)
          m._2.offer(v._2)
          val pv = m._3 + v._3
          (m._1, m._2, pv)
        },
        (m1: (HyperLogLogPlus, HyperLogLogPlus, Int),
         m2: (HyperLogLogPlus, HyperLogLogPlus, Int)) => {
          m1._1.addAll(m2._1)
          m1._2.addAll(m2._2)
          val pv = m1._3 + m2._3
          (m1._1, m1._2, pv)
        }
      )
        .mapValues(t => (t._1.cardinality().toInt, t._2.cardinality().toInt, t._3))
    }
    

    其实,本文有点标题党~~只是借了OLAP的壳做数据多维分析,距离真正的OLAP还是很远滴……

  • 相关阅读:
    POJ3094 UVALive3594 HDU2734 ZOJ2812 Quicksum【进制】
    UVALive5583 UVA562 Dividing coins
    POJ1979 HDU1312 Red and Black【DFS】
    POJ1979 HDU1312 Red and Black【DFS】
    POJ2386 Lake Counting【DFS】
    POJ2386 Lake Counting【DFS】
    HDU4394 Digital Square
    HDU4394 Digital Square
    UVA213 UVALive5152 Message Decoding
    UVA213 UVALive5152 Message Decoding
  • 原文地址:https://www.cnblogs.com/en-heng/p/5382224.html
Copyright © 2011-2022 走看看