zoukankan      html  css  js  c++  java
  • spark自定义函数之——UDF使用详解及代码示例

    前言

    本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。

    关于UDF:UDF:User Defined Function,用户自定义函数

    创建测试用DataFrame

    spark2.0创建DataFrame

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("A", 16), ("B", 21), ("B", 14), ("B", 18))
    
    //创建测试df
    val userDF = spark.createDataFrame(userData).toDF("name", "age")
    userDF.show
    +-----+---+
    | name|age|
    +-----+---+
    | A | 16|
    | B | 21|
    | C | 14|
    | D | 18|
    +-----+---+
    // 注册一张user表 
    userDF.createOrReplaceTempView("user")

    spark1.0创建DataFrame

     // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("A", 16), ("B", 21), ("C", 14), ("D", 18))
    //创建测试df
    val userDF = sc.parallelize(userData).toDF("name", "age")
    // 注册一张user表
     userDF.registerTempTable("user")

    spark-sql中SQL中UDF用法

    1. 通过匿名函数注册UDF

    下面的UDF的功能是计算某列的长度,该列的类型为String

    // Spark2.x:
    spark.udf.register("strLen", (str: String) => str.length())
    
    // Spark1.x:
    sqlContext.udf.register("strLen", (str: String) => str.length())
    
    // 仅以Spark2.x为例
    spark.sql("select name,strLen(name) as name_len from user").show

    2. 通过实名函数注册UDF

    实名函数的注册有点不同,要在后面加 _(注意前面有个空格) 

    // 定义一个实名函数
    
    /**
     * 根据年龄大小返回是否成年 成年:true,未成年:false
    */
    def isAdult(age: Int) = {
      if (age < 18) {
        false
      } else {
        true
      }
    }
    
    // 注册(仅以Spark2.x为例)
    spark.udf.register("isAdult", isAdult _)

    spark-sql中DataFrame中UDF用法

    DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法

    1. 注册

    import org.apache.spark.sql.functions._
    //方法一:注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //方法二:注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _)

    2. 使用

    可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能 
    * 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究

    // 通过withColumn添加列
    userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
    
    //通过select添加列
    userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
    
    +-----+---+--------+-------+
    | name|age|name_len|isAdult|
    +-----+---+--------+-------+
    | A | 16| 3| false|
    | B | 21| 5| true|
    | C | 14| 4| false|
    | D | 18| 3| true|
    +-----+---+--------+-------+

    withColumn和select的区别

    可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。 

    注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。

     参考:https://dongkelun.com/2018/08/02/sparkUDF/

  • 相关阅读:
    121. Best Time to Buy and Sell Stock
    玩转算法2.3常见的算法复杂度分析
    数组中的逆序对
    一些基本的代码模板
    230. Kth Smallest Element in a BST
    42. Trapping Rain Water
    api token
    仿百度查询
    baidu jsonp
    How to fix Error: laravel.log could not be opened?
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/10280657.html
Copyright © 2011-2022 走看看