zoukankan      html  css  js  c++  java
  • Spark SQL中UDF和UDAF

    转载自:https://blog.csdn.net/u012297062/article/details/52227909

    UDF: User Defined Function,用户自定义的函数,函数的输入是一条具体的数据记录,实现上讲就是普通的Scala函数;
    UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作;

    实质上讲,例如说UDF会被Spark SQL中的Catalyst封装成为Expression,最终会通过eval方法来计算输入的数据Row(此处的Row和DataFrame中的Row没有任何关系)

    不说太多直接上代码

    1、创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息

    val conf = new SparkConf() //创建SparkConf对象  
    conf.setAppName("SparkSQLUDFUDAF") //设置应用程序的名称,在程序运行的监控界面可以看到名称  
    //conf.setMaster("spark://DaShuJu-040:7077") //此时,程序在Spark集群  
    conf.setMaster("local[4]")  

    2、创建SparkContext对象和SQLContext对象

    //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息  
    val sc = new SparkContext(conf)  
    val sqlContext = new SQLContext(sc) //构建SQL上下文  

    3、模拟实际使用的数据

    val bigData = Array("Spark", "Spark", "Hadoop", "Spark", "Hadoop", "Spark", "Spark", "Hadoop", "Spark", "Hadoop")

    4、基于提供的数据创建DataFrame

    val bigDataRDD =  sc.parallelize(bigData)  
    val bigDataRDDRow = bigDataRDD.map(item => Row(item))  
    val structType = StructType(Array(StructField("word", StringType, true)))  
    val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow,structType) 

    5、注册成为临时表

    bigDataDF.registerTempTable("bigDataTable") 

    6、通过SQLContext注册UDF,在Scala 2.10.x版本UDF函数最多可以接受22个输入参数

    sqlContext.udf.register("computeLength", (input: String) => input.length)  
    //直接在SQL语句中使用UDF,就像使用SQL自动的内部函数一样  
    sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show 

    7、通过SQLContext注册UDAF

    sqlContext.udf.register("wordCount", new MyUDAF)  
    sqlContext.sql("select word,wordCount(word) as count,computeLength(word) as length" +  
    " from bigDataTable group by word").show()  

    8、按照模板实现UDAF

    class  MyUDAF extends UserDefinedAggregateFunction {  
      // 该方法指定具体输入数据的类型  
      override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))  
      //在进行聚合操作的时候所要处理的数据的结果的类型  
      override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))  
      //指定UDAF函数计算后返回的结果类型  
      override def dataType: DataType = IntegerType  
      // 确保一致性 一般用true  
      override def deterministic: Boolean = true  
      //在Aggregate之前每组数据的初始化结果  
      override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) =0}  
      // 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算  
      // 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner  
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {  
        buffer(0) = buffer.getAs[Int](0) + 1  
      }  
      //最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作  
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {  
        buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)  
      }  
      //返回UDAF最后的计算结果  
      override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)  
    }  
  • 相关阅读:
    Reporting Services系列三:几个细节
    Tips&Tricks系列一:更改VS2005设置
    Reporting Services系列二:引用.NET DLL
    数据库基础系列之六:因空间不足导致IMP失败
    .NET基础示例系列之十四:C#导出建表语句及数据
    Reporting Services系列四:折叠报表
    数据库基础系列之七:IMP数据到指定的表空间
    .NET基础示例系列之十六:制做进程监视器
    .NET基础示例系列之十九:Dundas For ASP.NET
    .NET基础示例系列之十五:操作Excel
  • 原文地址:https://www.cnblogs.com/itboys/p/8922205.html
Copyright © 2011-2022 走看看