zoukankan      html  css  js  c++  java
  • spark UDAF

    感谢我的同事 李震给我讲解UDAF

    网上找到的大部分都只有代码,但是缺少讲解,官网的的API有讲解,但是看不太明白。我还是自己记录一下吧,或许对其他人有帮助。

    接下来以一个求几何平均数的例子来说明如何实现一个自己的UDAF

    首先需要导入这些包:

    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    
    需要继承实现这个抽象类 class GeometricMean extends UserDefinedAggregateFunction { // This is the input fields for your aggregate function.
    就是需要输入的列的类型,可以有多个列,多个列的写法如下:
    /*
    StructType(StructField("slot",IntegerType) :: StructField("score",IntegerType)::Nil)
    */
      override def inputSchema: org.apache.spark.sql.types.StructType =
        StructType(StructField("value", DoubleType) :: Nil)
    

    存储聚合结果的中间buffer // This is the internal fields you keep for computing your aggregate. override def bufferSchema: StructType = StructType( StructField("count", LongType) :: StructField("product", DoubleType) :: Nil ) // This is the output type of your aggregatation function.
    返回结果的类型,比如这个集合平均数就是返回一个double值 override def dataType: DataType = DoubleType
    是每次运行结果都过一样,但是我也不太明白啊 override def deterministic: Boolean = true

    初始化存储聚合结果的buffer // This is the initial value for your buffer schema. override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 1.0 }
    每次更新怎么更新,比如新来了一行,如何加入更新聚合的结果 // This is how to update your buffer schema given an input. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Long](0) + 1 buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0) }
    spark会把数据划分成多个块,每个块都会进行处理,然后把每个块的结果进行合并处理 // This is how to merge two objects with the bufferSchema type. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0) buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1) }
    返回结果 // This is where you output the final value, given the final value of your bufferSchema. override def evaluate(buffer: Row): Any = { math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0)) } }

    使用方法:

    先注册

    sqlContext.udf.register("gm", new GeometricMean)


    使用自定义的UDAF

    %sql
    -- Use a group_by statement and call the UDAF.
    select group_id, gm(id) from simple group by group_id
     
     
     

    参考资料:

    https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

  • 相关阅读:
    Fiddler无法抓取HTTPS的问题,Fiddler证书无法安装终极解决方案,
    锤子手机做appium自动化测试时,运行脚本总是弹出警告框的问题
    jmeter4+win10+jdk1.8环境下,jmeter输入中文就卡死的问题
    jenkins构建邮件自动发送,测试邮件发送成功,构建项目邮件发送不成功的问题
    Jenkins安装部署
    Appium中wait_activity的使用以及XPATH定位
    Appium连接夜神模拟器,模拟手势点击(tap)
    Appium如何查看webview上元素
    Appium启动淘宝APP,输入搜索内容
    Error while obtaining UI hierarchy XML file: com.android.ddmlib.SyncException: Remote object doesn't
  • 原文地址:https://www.cnblogs.com/earendil/p/8510680.html
Copyright © 2011-2022 走看看