zoukankan      html  css  js  c++  java
  • sparkSQL--over窗口函数(实战案例)

    一、over(窗口函数)

    指的是对多行数据进行处理返回普通列和聚合列的过程

    详细语法:

    窗口函数sql语法:窗口函数名()over (partition by 划分窗口字段 order by 窗口内的排序规则 rows between (start,end))

    窗口函数分类:

    • 聚合窗口函数 aggregate 聚合类
    • 排名窗口函数 ranking 排名类
    • 数据分析窗口函数 analytic 分析类

    参考链接:https://www.cnblogs.com/abc8023/p/10910741.html

    Function Type SQL DataFrame API Description
    Ranking rank rank rank值可能是不连续的
    Ranking dense_rank denseRank rank值一定是连续的
    Ranking percent_rank percentRank 相同的分组中 (rank -1) / ( count(score) - 1 )
    Ranking ntile ntile 将同一组数据循环的往n个桶中放,返回对应的桶的index,index从1开始
    Ranking row_number rowNumber 很单纯的行号,类似excel的行号
    Analytic cume_dist cumeDist
    Analytic first_value firstValue 相同的分组中最小值
    Analytic last_value lastValue 相同的分组中最大值
    Analytic lag lag 取前n行数据
    Analytic lead lead 取后n行数据
    Aggregate min min 最小值
    Aggregate max max 最大值
    Aggregate sum sum 求和
    Aggregate avg avg 求平均

    二、具体用法如下

    count(...) over(partition by ... order by ...) --求分组后的总数。
    sum(...) over(partition by ... order by ...) --求分组后的和。
    max(...) over(partition by ... order by ...) --求分组后的最大值。
    min(...) over(partition by ... order by ...) --求分组后的最小值。
    avg(...) over(partition by ... order by ...) --求分组后的平均值。
    rank() over(partition by ... order by ...) --rank值可能是不连续的。
    dense_rank() over(partition by ... order by ...) --rank值是连续的。
    first_value(...) over(partition by ... order by ...) --求分组内的第一个值。
    last_value(...) over(partition by ... order by ...) --求分组内的最后一个值。
    lag() over(partition by ... order by ...) --取出前n行数据。
    lead() over(partition by ... order by ...) --取出后n行数据。
    ratio_to_report() over(partition by ... order by ...) --Ratio_to_report() 括号中就是分子,over() 括号中就是分母。
    percent_rank() over(partition by ... order by ...)

    三、应用案例

    问题

    某app访问页面的日志详细记录字段如下:day, user_id, page_id, time

    求某天每个用户访问页面次数前10的页面。

    ("2018-01-01",1,"www.baidu.com","10:01"),
    ("2018-01-01",2,"www.baidu.com","10:01"),
    ("2018-01-01",1,"www.sina.com","10:01"),
    ("2018-01-01",3,"www.baidu.com","10:01"),
    ("2018-01-01",3,"www.baidu.com","10:01"),
    ("2018-01-01",1,"www.sina.com","10:01")

    思路

    1. 每个用户访问不同页面的次数

    select user_id,page_id,count(page_id) from t_log group by user_id, page_id

    +-------+-------------+-----+---+---|
    |user_id| page_id|count|
    +-------+-------------+-----+----+--|
    | 2|www.baidu.com| 1|
    | 3|www.baidu.com| 2|
    | 1|www.baidu.com| 1|
    | 1| www.sina.com| 2|
    +-------+-------------+-----+----+--|

    1. 对每个用户点击页面次数降序排列,并且使用窗口函数中的排名函数,对点击页面进行排名

    w1:

    | 1| www.sina.com| 2| 1

    | 1|www.baidu.com| 1| 2

    w2:

    | 2|www.baidu.com| 1| 1

    w3:

    | 3|www.baidu.com| 2| 1

    +-------+-------------+-----+----+
    |user_id| page_id|count|rank|
    +-------+-------------+-----+----+
    | 1| www.sina.com| 2| 1|
    | 1|www.baidu.com| 1| 2|
    | 3|www.baidu.com| 2| 1|
    | 2|www.baidu.com| 1| 1|
    +-------+-------------+-----+----+

    1. 获得每个用户访问次数前10的页面

    where rank <= 10

    +-------+-------------+-----+----+
    |user_id| page_id|count|rank|
    +-------+-------------+-----+----+
    | 1| www.sina.com| 2| 1|
    | 1|www.baidu.com| 1| 2|
    | 3|www.baidu.com| 2| 1|
    | 2|www.baidu.com| 1| 1|
    +-------+-------------+-----+----+

    代码

    package method
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.expressions.Window
    
    object SQLDemo3 {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("sql operation").master("local[*]").getOrCreate()
        val rdd = spark.sparkContext.makeRDD(
          List(
            ("2018-01-01",1,"www.baidu.com","10:01"),
            ("2018-01-01",2,"www.baidu.com","10:01"),
            ("2018-01-01",1,"www.sina.com","10:01"),
            ("2018-01-01",3,"www.baidu.com","10:01"),
            ("2018-01-01",3,"www.baidu.com","10:01"),
            ("2018-01-01",1,"www.sina.com","10:01")
          )
        )
        import spark.implicits._
        val df = rdd.toDF("day","user_id","page_id","time")
      df.createTempView("t_log")
        //注意:""" 包裹内容 “”“自动进行字符串的拼接
        spark
          .sql(
            """
              |select *
              |from
              | (select user_id,page_id, num,
              |   rank() over(partition by user_id order by num desc) as rank
              |   from
              |     (select
              |       user_id,
              |       page_id,
              |       count(page_id) as num
              |       from t_log
              |       group by user_id,page_id))
              | where rank <= 10
              |
              |""".stripMargin
          )
          .show()
      spark.stop()
      }
    }
    //结果
    +-------+-------------+---+----+
    |user_id|      page_id|num|rank|
    +-------+-------------+---+----+
    |      1| www.sina.com|  2|   1|
    |      1|www.baidu.com|  1|   2|
    |      3|www.baidu.com|  2|   1|
    |      2|www.baidu.com|  1|   1|
    +-------+-------------+---+----+
    
  • 相关阅读:
    举例一个IO多路复用的C/S例子
    简单介绍协程
    生产者消费者模型
    多进程介绍
    有关多线程(同步锁,递归锁,同步对象,信号量)
    threading多线程模块
    开发一个支持多用户在线的FTP程序
    NTP时间服务器与客户端
    EF之增删改查
    返回新插入的数据的主键ID
  • 原文地址:https://www.cnblogs.com/wanpi/p/14969000.html
Copyright © 2011-2022 走看看