zoukankan      html  css  js  c++  java
  • spark 之 UDF的两种方式

    详见:https://www.cnblogs.com/itboys/p/9347403.html

    1)如果使用spark.sql("") 

    => 内部调用hive处理,只能使用spark.udf.register("",)

    例如:

    import org.apache.spark.sql.functions._
    val maxandmin = udf{
      (cdata:Double,maxdata:Double,mindata:Double)=>{
        (cdata-mindata)/(maxdata-mindata)
      }
    }
    spark.udf.register("maxandmin",maxandmin)
    
    def getUserbaseinfo(spark:SparkSession)={
      val sql = s"""select
                  |userid,locale,gender,
                  |location,
                  |maxandmin(cage,max_age,min_age) age,
                  |maxandmin(timezone,max_timezone,min_timezone) timezone,
                  |maxandmin(members,max_members,min_members) members
                  |from
                  |(select  userid,
                  |case when l.locale is null then 0 else l.localeid end locale,
                  |gender,location,
                  |calcage(birthyear) cage,min_age,max_age,
                  |timezone,min_timezone,max_timezone,
                  |members,min_members,max_members
                  |from dwd_events.dwd_users u
                  |left join dwd_events.dwd_locale l
                  |on lower(u.locale)=lower(l.locale)
                  |cross join (select min(calcage(birthyear)) min_age
                  |,max(calcage(birthyear)) max_age,min(timezone) min_timezone,
                  |max(timezone) max_timezone, min(members) min_members,max(members) max_members
                  |from dwd_events.dwd_users) b ) c""".stripMargin
      spark.sql(sql)
    }

    2)如果使用DataFrame API

    => 仅使用udf()就行

  • 相关阅读:
    由浅入深——从ArrayList浅谈并发容器
    Lambda表达式
    JVM初体验
    Redis主从复制
    Redis事务
    Redis基本命令
    Redis概述与安装
    Linux之SSH免密登录
    源码安装Nginx以及用systemctl管理
    CentOS 7 :Failed to start IPv4 firewall with iptables.
  • 原文地址:https://www.cnblogs.com/sabertobih/p/14198800.html
Copyright © 2011-2022 走看看