zoukankan      html  css  js  c++  java
  • spark SQL之 DataFrame使用

    pom.xml中

    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>14.0.1</version>
        </dependency>

    一、toDF

    1.直接写全部列名

    val spark = SparkSession.builder().master("local[2]").appName("app").getOrCreate()
    val df = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv")
          .toDF("id", "orddate", "itemid", "status")
    // 去表头的写法
    spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/users.csv").cache() dfUsers.show(false)

    2.选择需要的列

    val orders = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv")
          .select("_c0","_c2").toDF("orderid","userid")

    二、使用DF-SQL语句方式

    1.全局TempView需要 global_temp.orders 

    //df.createOrReplaceGlobalTempView("orders")  
     df.createOrReplaceTempView("orders")

    2.使用spark.sql()

    spark.sql("select if(dayofweek(orddate)==1,'Sun',dayofweek(orddate)-1) as s ,count(id) from orders group by dayofweek(orddate)").show(false)

     spark.sql()中写多行:使用stripMargin,按照“ | ”识别开头,避免格式不一致的问题

        val sql="""select
                  |userid,eventid,
                  |max(case when statu='invited' then 1 else 0 end) invited,
                  |max(case when statu='yes' then 1 else 0 end) yes,
                  |max(case when statu='maybe' then 1 else 0 end) maybe,
                  |max(case when statu='no' then 1 else 0 end) no
                  |from
                  |dwd_events.dwd_eventAttendees group by userid,eventid""".stripMargin
        spark.sql(sql)

    三、使用DF-SQL API

    【如何实现CASE WHEN1】

    1.自定义UDF函数:

    1. 如何调用?参数是Column对象 wdchange($"weekdate") 
    2. 入参/出参 是什么类型?
      udf{(sc:Int)=>{返回String}} 
      udf{(sc:String)=>{返回Int}}
      // 都没有问题
    import org.apache.spark.sql.functions._
        val wdchange = udf{(num:Int)=>
          num match {
            case 1 => "七"
            case 2 => "一"
            case 3 => "二"
            case 4 => "三"
            case 5 => "四"
            case 6 => "五"
            case _ => "六"
          }
        }
    
    // 先groupby再agg
    // 操作几个列,如果最后不select,都会显示、
    // agg里面两种写法
    // withColumn表示再添一列,$表示列对象,需要implicts._
    df // 几周?date_format(,"u")
        .groupBy(date_format($"orddate","u").alias("weekdate"))
         // .agg("id"->"sum","id"->"count")  
        .agg(count("id").alias("countId"),sum("id").alias("sumId"))
        .withColumn("wd",wdchange($"weekdate"))
        .show(false)

    【如何实现CASE WHEN2】

    2.使用when.otherwise

        import spark.implicits._
        df
          .groupBy(when(dayofweek($"orddate")===1,"七")
            .when(dayofweek($"orddate")===2,"一")
            .when(dayofweek($"orddate")===3,"二")
            .when(dayofweek($"orddate")===4,"三")
            .when(dayofweek($"orddate")===5,"四")
            .when(dayofweek($"orddate")===6,"五")
            .otherwise("六").alias("day"))
          .agg(count("id").alias("countId"),sum("id").alias("sumId"))
          .show(false)

    结果:

    +---+-------+------------+
    |day|countId|sumId |
    +---+-------+------------+
    |六 |9984 |3.35875381E8|
    |三 |9758 |3.46913986E8|
    |四 |9862 |3.32166696E8|
    |二 |9964 |3.45180682E8|
    |一 |9292 |3.23174788E8|
    |五 |10288 |3.53314523E8|
    |七 |9735 |3.3584223E8 |
    +---+-------+------------+

    3.另一个例子

    分析:根据city分组sum(countprice)

    orderid  - countprice : 1:n   =>   orderid  - sum(countprice) : 1: 1

                  - userid      :  n:1   =>    sum(countprice) - userid   : n:1 

          userid - city: 1 :1   =>   sum(countprice)  - city : n:1      => sum(countprice)  - groupby(city): 1 : 1

    // groupBy(列)必须是当表有的列

    // groupBy可以加列对象也可以加列名,返回的是 RelationalGroupedDataset,只有DataFrame格式的可以show()

    def groupBy(col1 : scala.Predef.String, cols : scala.Predef.String*)
    def groupBy(cols : org.apache.spark.sql.Column*)
     orderitem
          //.select($"orderid",$"countprice".cast(DataTypes.DoubleType))
          .groupBy("orderid").agg(sum("countprice").alias("sumPrice"))
          .join(orders,Seq("orderid"),"inner")
          .join(customers,Seq("userid"),"inner").groupBy($"city").agg(sum("sumPrice").alias("cityPrice"))
          .orderBy(desc("cityPrice"))
          .show()

    4. 使用窗口函数

     row_number().over(Window.orderBy("good_id")).alias("gid")) 
    import org.apache.spark.sql.expressions.Window
    val wnd = Window.partitionBy(month($"date"))
    .orderBy("date") 
    df.select($"name",$"date",$"count",sum("count") over(wnd)).show(false)

     为什么使用窗口函数?因为使用了sum/min聚合函数,必须有group by;如果不想group by,就需要over()成为单独的一列

    .withColumn("min_login_time",min($"last_login_time")over())       // 聚合函数,不是groupBy,就是over()
    .withColumn("lasttime",datediff($"last_login_time",$"min_login_time")) // 普通函数,不需要窗口

    四、使用JAVA的Calendar + RDD 形式

    // date转星期 JAVA
      def daychange(dateStr:String): String ={
        val arr = Array("日","一","二","三","四","五","六")
        val cal = Calendar.getInstance()
        val sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
        cal.setTime(sdf.parse(dateStr))
        val week = arr(cal.get(Calendar.DAY_OF_WEEK)-1)
       // println(week)
        week
      }
    // Java + RDD
    // dataFrame.map => dataSet 转变成DataSet
    val str = df.select("id", "orddate") .map(x => { (daychange(x(1).toString), x(0).toString) }) .rdd.groupByKey().foreach(x=>println(x._1,x._2.size))
  • 相关阅读:
    微信公众号 发送客服消息
    juqery 点击谁获取他的值,赋给input标签
    微信执行退出页面,直接回到微信对话窗口
    微信jssdk上传图片,一张一张的上传 和 一次性传好几张
    juqery 判断所有input 不能为空 判断只能为数字 判断身份证号:18位和15位 判断是否银行卡号
    php foreach
    现在越来越喜欢用ajax传值了,这样能让网站的体验性很好,今天就总结了一下常用的
    有时候不用explode截取字符串了,可以用用substr()
    ztree 文件夹类型的 树状图
    POJ 1065 Wooden Sticks
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13749820.html
Copyright © 2011-2022 走看看