zoukankan      html  css  js  c++  java
  • 基于 Algebird 谈一谈代数数据类型在数据聚合中的应用

    此文已由作者肖乃同授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。




    代数数据类型是指满足一定数学特性的数据类型, 这些特性使得计算能够很方便的并行化,在Scalding和  Spark等数据计算框架中有着广泛的应用。代数数据类型是一个通用的概念, 其实现不限于Algebird,  本文主要结合近期处理的一个数据任务, 介绍一下这一技术及Algebird这个函数库。   文中代码示例都是基于Scala, 如有纰漏欢迎指正。

    应用场景:云阅读用户流失模型特征体取

    近期接到这样一个任务, 提取一个特定时间窗口登陆用户的95个特征,用于训练预测用户流失的模型。抽取  任何一个单独特征并不复杂, 不过特征众多,数据分布在多个数据源。我计划延续前期的代码, 使用scalding  处理这一任务。 这些特征大致分为四类:

    1. 功能使用的次数

    2. 功能使用的天数

    3. 用户某一属性最新的非空值

    4. 用户某一属性的集合数量

    使用次数就是常规的数值累加,2和4都要考虑集合的去重,3是时间上的maxBy同时要考虑空值处理问题。 为了方便统一处理,我考虑将这些数据都转化成可加的代数数据类型, 然后基于这些类型做聚合。

    Scalding 及 Spark 类型安全聚合接口介绍

    先来看一下Scalding提供的聚合接口,直接使用Algebird提供的聚合器:

      import com.twitter.algebird.Aggregator.count
      val users: TypedPipe[User] = ???
      users.groupBy(_.userId).aggregate(count)

    Spark在2.0后增加了DataSet这一新的API, 简单讲就是类型安全的DataFrame (基本等同于与scalding type safe api 之于 field based api)。

     import org.apache.spark.sql.expressions.scalalang.count
     spark.createDataset(Seq(1,2,3))
          .groupByKey(_)
          .agg(count)

    Spark 没有直接使用 algebird, 其参考algebird代码(看了一下spark的代码注释),写了一套类似接口。 后面会讲到如何在Spark使用Algebird。

    Aggregator提供了可复用的聚合组件,不再限于特定的字段。Algebird的Aggregator是基于半群和幺半群的代数数据类型聚合。

    代数数据类型理论

    先来补习一下数学知识,群是一个二元操作下满足一定特征的集合:

    1. 闭包性: 集合中的任意两个元素A和B, A op B 结果依然是集合的元素

    2. 结合律: 集合中的任意两个元素A、B和C, (A op B) op C 等价于 A op (B op C)

    3. 幺元(也可以叫零元 Unit):集合存在元素e, 使得任意的元素A有 e op A 等价于 A op e 等价于 A

    4. 逆元: 任意元素A, 存在集合元素B, 使得 A op B = e  (e 为幺元)

    满足全部条件的是群, 满足1和2的是半群, 满足1、2和3的是幺半群(有幺元存在)

    举例几个具体的例子说明一下:

    • 自然数在加法下是一个群,满足闭包和结合律,0是幺元,负数是逆元

    • 偶数在加法下是一个群, 奇数不是, 不满足闭包性,奇数相加为偶数

    • 奇数在乘法下是一个幺半群, 不存在逆元

    • 正整数在加法下是一个半群, 不存在幺元,0不属于正整数

    Algebird 实现介绍

    Algebird 是twitter开源的scala的抽象代数库,实现了常见数据类型的半群、幺半群等支持,  是从scalding分离出来的通用库。

    通过例子比较好理解:

     import com.twitter.algebird._ import com.twitter.algebird.Operators._ Max(3) + Max(5) + Max(10)  // result: Max(10)
     Map(1 -> 2) + Map(1 -> 3) // result: Map(1 -> 5) 
     Map(1 -> Max(3), 2 -> Max(7)) + Map(1 -> Max(-10), 2 -> Max(20)) 
     // result: Map(1 -> Max(3), 2 -> Max(20))

    Max是个Semigroup(半群), Map是个Monoid(幺半群),  Algebird有很大的灵活性,从上面示例可以看到Map的值是半群,可以实现相同key的值的聚合。

    Algebird除了基本Semigroup和Monoid, Map、IndexedSeq、Tuple等高阶的群 (参数类型是群的群,我这样称谓),可以组合出非常灵活的使用。

    用户流失模型中的应用

    回到我要处理的问题上来, 需要按照用户去计算4类不同的特征值, 这些值很稀疏, 可以把上述问题转化成聚合问题。

    以搜索这个事件来说明, 假设要统计用户搜索的次数、天数、关键词数量,那么

      Map("搜索次数" -> 1)
      Map("搜索天数" -> date)
      Map("搜索关键词数量" -> keyword)

    Map是幺半群,需要值类型是半群, date和keyword需要转换成半群的数据结构。 关键词数量需要去重, 可以使用Set来做, 使用Set求集合, 最后取集合数量, 聚合器如下:

      import com.twitter.algebird.Aggregator.{const, toSet, prepareMonoid => sumAfter}
    
      val searchCountAgg = sumAfter[MdaEvent, Map[String, Int]](_.searchCount)
    
      val keywordCountAgg = toSet[String]
       .composePrepare[ClientEvent](_.keyword)
       .andThenPresent(_.size)

    搜索天数统计,日期也可以使用上述集合,不过天数的统计非常多, 集合开销比较大, 我把它转成一个bitset, , 我统计的窗口只有1个月, 所以用Long型记下相对于开始日期, 这一天是不是有使用:

      import com.twitter.algebird.Monoid
      import com.github.nscala_time.time.Imports._
      import org.joda.time.Days
    
      class Bits(val value: Long) extends AnyVal {
        def count: Int = java.lang.Long.bitCount(value)
        def get(b: Int): Int = if((value & (1 << b)) > 0) 1 else 0
        override def toString: String = value.toBinaryString
      }
    
      object BitsMonoid extends Monoid[Bits] {
        override def zero = new Bits(0L)
        override def plus(left: Bits, right: Bits) = new Bits(left.value | right.value)
        override def sumOption(iter: TraversableOnce[Bits]): Option[Bits] = {
          if(iter.isEmpty) None
          Some(iter.reduce((a, b) => new Bits(a.value | b.value)))
        }
      }
    
      def dateDiffToBits(fromDate: DateTime): Long => Bits = {
        val base = fromDate.withTimeAtStartOfDay()
        (timestamp: Long) => {
          val theDay = new DateTime(timestamp).withTimeAtStartOfDay()
          val days = Days.daysBetween(base, theDay).getDays
          require(days < 64, s"only 64 bits long is supported, got day diff: $days")
          new Bits(1 << days)
        }
      }
    
      val toBitsFun = dateDiffToBits(sampleStartDate)
    
      val searchDaysAgg = {
          implicit val m = BitsMonoid
          sumAfter[MdaEvent, Map[String, Bits]] { event =>
            searchTime(event).mapValues(toBitsFun)
          }.andThenPresent(_.mapValues(b => b.count))
        }

    最后来处理第三类特征非空最新属性,这个属性是取按时间的最大值,空值需要特别处理一下, 使用Max, 把排序函数修改一下:

       import com.twitter.algebird.Aggregator.max
      def latestStringProperty[U <: ClientEvent](fn: U => String): Aggregator[U, U, String] = {
          import com.twitter.algebird.Aggregator.max
          implicit val ordU = Ordering.by { u: U =>
            val p = fn(u)
            val isEmpty = if (p.isEmpty) 0 else 1
            (isEmpty, u.opTime) // empty property always be covered by value property
          }
          max[U].andThenPresent(e => fn(e))
      }

    最后就能够使用这些聚合器, 提取所需的特征值了

      val multiOps = MultiAggregator(
        searchCountAgg,
        keywordCountAgg,
        searchDaysAgg,
        latestStringProperty(_.productVersion)
     )
      val daReport = daEvents.groupBy(_.userId).aggregate(multiOps)

    如何在Spark中的使用

    最后来讲讲如果在Spark中使用Algebird聚合器, 这个特征提取本来应该在Spark处理更为方便, 来研究了一下Spark的聚合器。

    Spark没有直接使用Algebird, 但其聚合器基本参照Algebird的, 我写了一个适配的类来方便直接在 Spark中使用上述的聚合器:

        import com.twitter.algebird.MonoidAggregator
      import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
      import org.apache.spark.sql.expressions.Aggregator
      import org.apache.spark.sql.{Encoder, SparkSession, TypedColumn}
    
      implicit class MonoidToTypedColumn[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) {
        def toColumn: TypedColumn[A,C] = new MonoidAggregatorAdaptor(m).toColumn
      }
    
      class MonoidAggregatorAdaptor[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) extends  Aggregator[A,B,C] {
          override def zero = m.monoid.zero
          override def reduce(b: B, a: A) = m.reduce(b, m.prepare(a))
          override def finish(reduction: B) =  m.present(reduction)
          override def merge(b1: B, b2: B) = m.reduce(b1, b2)
    
          override def bufferEncoder = implicitly[Encoder[B]]
          override def outputEncoder = implicitly[Encoder[C]]
     }

    这里只贴了适配Monoid的聚合器, Semigroup的会稍麻烦,代码比较多, 基本参考org.apache.spark.sql.expressions.ReduceAggregator。

    最后我们就可以直接再Spark使用Algebird:

      val latest = maxBy[DeviceEvent, Long](_.timestamp).toColumn.name("latest")
      val count = size.toColumn.name("count")
    
      spark.createDataset(Seq(DeviceEvent("a", "iphone", 10L), DeviceEvent("a", "android", 100L), DeviceEvent("a", "iphone", 123L)))
          .groupByKey(_.id)
          .agg(count, latest)
          .collect

    总结

    使用代数数据类型, 我们数据计算的代码更接近于问题描述语言, 表达力更强,避免了命令式的操作,bug更少。


    网易云免费体验馆,0成本体验20+款云产品! 

    更多网易技术、产品、运营经验分享请点击




    相关文章:
    【推荐】 一文了解安卓APP逆向分析与保护机制
    【推荐】 常用数据清洗方法大盘点
    【推荐】 selenium下拉框踩坑埋坑

  • 相关阅读:
    Candy leetcode java
    Trapping Rain Water leetcode java
    Best Time to Buy and Sell Stock III leetcode java
    Best Time to Buy and Sell Stock II leetcode java
    Best Time to Buy and Sell Stock leetcode java
    Maximum Subarray leetcode java
    Word Break II leetcode java
    Word Break leetcode java
    Anagrams leetcode java
    Clone Graph leetcode java(DFS and BFS 基础)
  • 原文地址:https://www.cnblogs.com/163yun/p/9871614.html
Copyright © 2011-2022 走看看