zoukankan      html  css  js  c++  java
  • spark2.1:使用df.select(when(a===b,1).otherwise(0))替换(case when a==b then 1 else 0 end)

    最近工作中把一些sql.sh脚本执行hive的语句升级为spark2.1版本,其中遇到将case when 替换为scala操作df的方式实现的问题:

    代码数据:

    scala>     import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    
    scala>     import spark.implicits._
    import spark.implicits._
    
    scala>     case class fpb_servercls(gridid: String, height: Int, objectid: Int, rsrp: Double, calibrategridid: Int, calibartetype: String)
    defined class fpb_servercls
    
    scala>     
         |     val fpb_server_test = List(
         |       fpb_servercls("grid1", 0, 888888, -88, 53, null),
         |       fpb_servercls("grid1", 5, 888888, -99, 53, null),
         |       fpb_servercls("grid2", 0, 333333, -78, 53, null),
         |       fpb_servercls("grid4", 0, 444444, -78, 53, null)
         |     ).toDF
    fpb_server_test: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields]
    
    scala>     val sampe_data_test = List(
         |       fpb_servercls("grid1", 0, 888888, -78, 53, "HOMEWIFI"),
         |       fpb_servercls("grid1", 5, 999999, -89, 53, null),
         |       fpb_servercls("grid2", 0, 333333, -87, 53, null)
         |     ).toDF
    sampe_data_test: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields]

    错误代码:

    scala>         val temp_result = fpb_server_test.alias("fpb").join(sampe_data_test.alias("sample"),
         |           fpb_server_test("gridid") === sampe_data_test("gridid")
         |             && fpb_server_test("height") === sampe_data_test("height")
         |             && fpb_server_test("objectid") === sampe_data_test("objectid"), "left_outer")
    | .select(
    | fpb_server_test("gridid"), | fpb_server_test("height"), | fpb_server_test("objectid"), | when(sampe_data_test("gridid") === lit(null), fpb_server_test("rsrp")).otherwise(sampe_data_test("rsrp")).alias("rsrp"), | fpb_server_test("calibrategridid"), | when(sampe_data_test("gridid") === lit(null), fpb_server_test("calibartetype")).otherwise(sampe_data_test("calibartetype")).alias("f_calibartetype") | ) temp_result: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields] scala> temp_result.show +------+------+--------+-----+---------------+---------------+ |gridid|height|objectid| rsrp|calibrategridid|f_calibartetype| +------+------+--------+-----+---------------+---------------+ | grid1| 0| 888888|-78.0| 53| HOMEWIFI| | grid1| 5| 888888| null| 53| null| | grid2| 0| 333333|-87.0| 53| null| | grid4| 0| 444444| null| 53| null| +------+------+--------+-----+---------------+---------------+

    错误的愿意就是这里的判定是否为空的地方。

    正确用法:

    scala>  val temp_result = fpb_server_test.alias("fpb").join(sampe_data_test.alias("sample"),
         |       fpb_server_test("gridid") === sampe_data_test("gridid")
         |         && fpb_server_test("height") === sampe_data_test("height")
         |         && fpb_server_test("objectid") === sampe_data_test("objectid"), "left_outer")
    | .select(
    | fpb_server_test("gridid"), | fpb_server_test("height"), | fpb_server_test("objectid"), | when(sampe_data_test("gridid").isNull, fpb_server_test("rsrp")).otherwise(sampe_data_test("rsrp")).alias("rsrp"), | fpb_server_test("calibrategridid"), | when(sampe_data_test("gridid").isNull, fpb_server_test("calibartetype")).otherwise(sampe_data_test("calibartetype")).alias("f_calibartetype") | ) temp_result: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields] scala> temp_result.show +------+------+--------+-----+---------------+---------------+ |gridid|height|objectid| rsrp|calibrategridid|f_calibartetype| +------+------+--------+-----+---------------+---------------+ | grid1| 0| 888888|-78.0| 53| HOMEWIFI| | grid1| 5| 888888|-99.0| 53| null| | grid2| 0| 333333|-87.0| 53| null| | grid4| 0| 444444|-78.0| 53| null| +------+------+--------+-----+---------------+---------------+

    疑问代码,如下代码在spark-shell中执行没有问题,但是使用spark-submit提交脚本后就提示错误:

    scala>   val temp_result = fpb_server_test.alias("fpb").join(sampe_data_test.alias("sample"),
         |       fpb_server_test("gridid") === sampe_data_test("gridid")
         |         && fpb_server_test("height") === sampe_data_test("height")
         |         && fpb_server_test("objectid") === sampe_data_test("objectid"), "left_outer")
    | .selectExpr("fpb.gridid", "fpb.height", "fpb.objectid", | "(case when sample.gridid is null then fpb.rsrp else sample.rsrp end) as rsrp", | "fpb.calibrategridid", | "(case when sample.gridid is null then fpb.calibartetype else sample.calibartetype end) as calibartetype") temp_result: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields] scala> temp_result.show +------+------+--------+-----+---------------+-------------+ |gridid|height|objectid| rsrp|calibrategridid|calibartetype| +------+------+--------+-----+---------------+-------------+ | grid1| 0| 888888|-78.0| 53| HOMEWIFI| | grid1| 5| 888888|-99.0| 53| null| | grid2| 0| 333333|-87.0| 53| null| | grid4| 0| 444444|-78.0| 53| null| +------+------+--------+-----+---------------+-------------+
  • 相关阅读:
    Python全栈开发之---mysql数据库
    python爬虫项目(scrapy-redis分布式爬取房天下租房信息)
    python多线程爬虫+批量下载斗图啦图片项目(关注、持续更新)
    python爬虫+数据可视化项目(关注、持续更新)
    超融合基本架构简单定义
    开启新生之路,,,学习网络
    Redhat7.2 ----team网卡绑定
    设计原则
    java应用程序的运行机制
    java三大版本和核心优势
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8525499.html
Copyright © 2011-2022 走看看