zoukankan      html  css  js  c++  java
  • Spark UDAF实现举例 -- average pooling

    1.UDAF定义

    spark中的UDF(UserDefinedFunction)大家都不会陌生, UDF其实就是将一个普通的函数, 包装为可以按 操作DataFrame中指定Columns的函数.

    例如, 对某一列的所有元素进行+1操作, 它对应mapreduce操作中的map操作. 这种操作有的主要特点是:

    • 行与行之间的操作是独立的, 可以非常方便的并行计算
    • 每一行的操作完成后, map的任务就完成了, 直接将结果返回就行, 它是一种”无状态的“

    但是UDAF(UserDefinedAggregateFunction)则不同, 由于存在聚合(Aggregate)操作, 它对应mapreduce操作中的reduce操作. SparkSQL中有很多现成的聚合函数, 常用的sum, count, avg等等都是. 这种操作的主要特点是:

    • 每一轮reduce之间可以是并行, 但是多轮reduce的执行是串行的, 下一轮依靠前一轮的结果, 它是一种“有状态的”, 需要记录中间的计算结果

    分析上图, 96 => (96, 1)这一步是一个map操作, 给每个样本添加一个1, 表示它的数量. 它们之间的计算是独立的, 也不影响数据的行数. 然后(96, 1)和(54, 1)求和, 得到(150, 2), 它是一轮reduce的其中一个中间结果, 等三个中间结果都结束了, 才能继续后续的reduce, 得到最终的reduce结果(303, 6), 因此完整的reduce需要记录并不断更新中间结果.

    2.向量平均(average pooling)

    向量平均是个很常用的操作, 比如我们现在有1000个64维的向量, 想要求这1000个点的中心点. 通常来说我们不会用64列float column去存储一个向量, 因此无法使用原生的avg函数.

    下面介绍如何自定义一个avgvector函数, 去处理array[float] column的平均值计算问题. 通过这个例子学会如何在spark下实现自定义的聚合函数

    2.1 average的并行化

    average算法非常简单, 求个和, 然后除以样本个数就好了. 它的并行化也很好理解

    • reduce的过程只进行sum的累积和样本数num的累积, 在最后一步将sum/num

    因此我们的在reduce的过程中, 需要时刻记录当前task处理的样本的个数, 和它们的和.

    由于这样的原因, 不像UDF只需要定义一个函数就可以, UDAF通常需要定义一个类, 用来保存中间结果

    2.2 代码实现

    // 从基类UserDefinedAggregateFunction继承
    class VectorMean64 extends UserDefinedAggregateFunction {
      // 定义输入的格式
      // 这个函数将会处理的那一列的数据类型, 因为是64维的向量, 因此是Array[Float]
      override def inputSchema: org.apache.spark.sql.types.StructType =
        StructType(StructField("vector", ArrayType(FloatType)) :: Nil)
    
      // 这个就是上面提到的状态
      // 在reduce过程中, 需要记录的中间结果. vector_count即为已经统计的向量个数, 而vector_sum即为已经统计的向量的和
      override def bufferSchema: StructType =
        StructType(
          StructField("vector_count", IntegerType) ::
            StructField("vector_sum", ArrayType(FloatType)) :: Nil)
    
      // 最终的输出格式
      // 既然是求平均, 最后当然还是一个向量, 依然是Array[Float]
      override def dataType: DataType = ArrayType(FloatType)
    
      override def deterministic: Boolean = true
    
      // 初始化
      // buffer的格式即为bufferSchema, 因此buffer(0)就是向量个数, 初始化当然是0, buffer(1)为向量和, 初始化为零向量
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0
        buffer(1) = Array.fill[Float](64)(0).toSeq
      }
    
      // 定义reduce的更新操作: 如何根据一行新数据, 更新一个聚合buffer的中间结果
      // 一行数据是一个向量, 因此需要将count+1, 然后sum+新向量
      // addTwoEmb为向量相加的基本实现
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getInt(0) + 1
    
        val inputVector = input.getAs[Seq[Float]](0)
        buffer(1) = addTwoEmb(buffer.getAs[Seq[Float]](1), inputVector)
      }
    
      // 定义reduce的merge操作: 两个buffer结果合并到其中一个bufer上
      // 两个buffer各自统计的样本个数相加; 两个buffer各自的sum也相加
      // 注意: 为什么buffer1和buffer2的数据类型不一样?一个是MutableAggregationBuffer, 一个是Row
      // 因为: 在将所有中间task的结果进行reduce的过程中, 两两合并时是将一个结果合到另外一个上面, 因此一个是mutable的, 它们两者的schema其实是一样的, 都对应bufferSchema
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
        buffer1(1) = addTwoEmb(buffer1.getAs[Seq[Float]](1), buffer2.getAs[Seq[Float]](1))
      }
    
      // 最终的结果, 依赖最终的buffer中的数据计算的到, 就是将sum/count
      override def evaluate(buffer: Row): Any = {
        val result = buffer.getAs[Seq[Float]](1).toArray
        val count = buffer.getInt(0)
        for (i <- result.indices) {
          result(i) /= (count + 1)
        }
        result.toSeq
      }
    
    	// 向量相加
      private def addTwoEmb(emb1: Seq[Float], emb2: Seq[Float]): Seq[Float] = {
        val result = Array.fill[Float](emb1.length)(0)
        for (i <- emb1.indices) {
          result(i) = emb1(i) + emb2(i)
        }
        result.toSeq
      }
    

    解释可以参考上面的代码注释. 核心就是定义四个模块:

    • 中间结果的格式 - bufferSchema
    • 将一行数据更新到中间结果buffer中 - update
    • 将两个中间结果buffer合并 - merge
    • 从最后的buffer计算需要的结果 - evaluate

    2.3 使用

    // 注册一下, 使其可以在Spark SQL中使用
    spark.udf.register("avgVector64", new VectorMean64)
    spark.sql("""
    |select group_id, avgVector64(embedding) as avg_embedding
    |from embedding_table_name
    |group by group_id
    """.stripMargin)
    
    // 当然不注册也可以用, 只是不能在SQL中用, 可以直接用来操作DataFrame
    val avgVector64 = new VectorMean64
    val df = spark.sql("select group_id, embedding from embedding_table_name")
    df.groupBy("group_id").agg(avgVector64(col("embedding")))
    

    参考

    https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

  • 相关阅读:
    svn环境搭建
    Svn正确的使用方法
    基于phpExcel写的excel类
    关于ecshop中jquery与js冲突解决的方案
    学习Javascript闭包(Closure)
    JS全局变量VAR和THIS
    python六十六课——单元测试(二)
    python六十五课——单元测试(一)
    python六十四课——高阶函数练习题(三)
    python六十四课——高阶函数练习题(二)
  • 原文地址:https://www.cnblogs.com/yeluzi/p/14218052.html
Copyright © 2011-2022 走看看