zoukankan      html  css  js  c++  java
  • Spark之spark.sql

    spark-shell之spark.sql



    数据结构

    订单表: badou.orders

    字段(string)

    • order_id 订单id
    • user_id 用户id
    • eval_set 值:prior(历史行为),train(训练,test中user已经购买的商品),test(预测的数据集)
    • order_number 订单编号
    • order_dow 周几下的订单
    • order_hour_of_day 一天中的哪个小时(24小时制)
    • days_since_prior_order order_number与上一个订单的间隔天数

    行为表: badou.priors

    字段(string)

    • order_id 订单id,与orders订单表关联
    • product_id 订单中的产品id
    • add_to_cart_order 加购物车的位置
    • reordered 有没有被再次购买,以用户为单位,首次为0,再次为1

    登录spak-shell

    • 在yarn上运行spark-shell
    [root@master ~]# /usr/local/src/spark-2.0.2-bin-hadoop2.6/bin/spark-shell --master yarn
    
    • 引包,生成表df
    scala> import spark.sql
    scala> val orders=sql("select * from badou.orders")
    scala> val priors=sql("select * from badou.priors")
    

    练习

    product 统计/特征

    统计product被购买的数据量

    scala> priors.groupBy("product_id").agg(count("product_id") as "cnt_product").show(3)
    +----------+-----------+
    |product_id|cnt_product|
    +----------+-----------+
    |     48370|       3934|
    |     10096|        792|
    |     16974|       2773|
    +----------+-----------+
    

    统计product被reordered的数量(再次购买),以及再次购买的比率

    scala> priors.selectExpr("product_id","cast(reordered as int) as reordered").groupBy("product_id").agg(sum("reordered") as "sum_r",count("product_id") as "cnt_product",avg("reordered") as "avg_reordered").show(3)
    +----------+-----+-----------+------------------+
    |product_id|sum_r|cnt_product|     avg_reordered|
    +----------+-----+-----------+------------------+
    |     48370| 2751|       3934| 0.699288256227758|
    |     10096|  503|        792|  0.63510101010101|
    |     16974| 1845|       2773|0.6653443923548503|
    +----------+-----+-----------+------------------+
    

    user 统计/特征

    每个用户平均购买订单的间隔周期

    scala> val ord=orders.selectExpr("user_id","if(days_since_prior_order='',0,days_since_prior_order) as dspo")
    scala> ord.selectExpr("user_id","cast(dspo as int) as dspo").groupBy("user_id").agg(sum("dspo") as "sum_dspo",count("user_id") as "cnt_user_id",avg("dspo") as "avg_dspo").show(5)
    +-------+--------+-----------+------------------+
    |user_id|sum_dspo|cnt_user_id|          avg_dspo|
    +-------+--------+-----------+------------------+
    | 104454|     147|          6|              24.5|
    | 104603|     132|          6|              22.0|
    | 104665|     273|         18|15.166666666666666|
    | 104870|      90|          4|              22.5|
    | 105344|      60|          5|              12.0|
    +-------+--------+-----------+------------------+
    

    每个用户的总订单数量

    scala> orders.groupBy("user_id").count().show(5)
    +-------+-----+
    |user_id|count|
    +-------+-----+
    | 104454|    6|
    | 104603|    6|
    | 104665|   18|
    | 104870|    4|
    | 105344|    5|
    +-------+-----+
    

    每个用户购买的product商品去重后的集合数据

    scala> import spark.implicits._
    scala> val op=orders.join(priors,"order_id").selectExpr("user_id","product_id").distinct
    scala> val req=op.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().mapValues(_.mkString(","))
    scala> req.toDF("user_id","product_id").filter(col("user_id")===124168).show()
    +-------+-----------------------------+
    |user_id|product_id                   |
    +-------+-----------------------------+
    |124168 |14303,20082,11323,46522,22108|
    +-------+-----------------------------+
    // 下面的是集合数据带 count
    scala> val req=op.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().mapValues{rs=>(rs.size,rs.mkString(","))}
    scala> req.toDF("user_id","value").selectExpr("user_id","value._1 as product_cnt","value._2 as product_id_list").show(1,false)
    +-------+-----------+-----------------------------+
    |user_id|product_cnt|product_id_list                 |
    +-------+-----------+-----------------------------+
    |124168 |5          |14303,46522,20082,11323,22108|
    +-------+-----------+-----------------------------+
    

    import spark.implicits._ 是 RDD 转 DF 的一个隐式转换引包

    每个用户购买的平均每个订单的商品数量

    1. 每个订单的商品数量
    2. 每个用户的总商品数量
    3. 每个用户有多少个订单
    scala> val product_cnt = priors.selectExpr("order_id","product_id").groupBy("order_id").count()
    scala> orders.join(product_cnt,"order_id").selectExpr("user_id","count as product_cnt").groupBy("user_id").agg(sum("product_cnt"),count("user_id"),avg("product_cnt")).withColumnRenamed("sum(product_cnt)","sum_product_cnt").show(3)
    +-------+---------------+--------------+------------------+
    |user_id|sum_product_cnt|count(user_id)|  avg(product_cnt)|
    +-------+---------------+--------------+------------------+
    | 115053|            414|            39|10.615384615384615|
    |  65897|           1811|            83|21.819277108433734|
    |  48147|            475|            95|               5.0|
    +-------+---------------+--------------+------------------+
    
  • 相关阅读:
    JDK动态代理源码分析
    使用docker-compose快速搭建本地ElasticSearch7和Elastichd环境
    IDEA导入SVN项目提示HTTPS:Server SSL certificate verification failed
    双重校验锁为什么要用volatile修饰
    Ribbon的基础知识
    Eureka的基础知识
    JDK1.8 JVM内存模型个人理解
    OAuth2+Zuul报RedisConnection.set([B[B)V解决方案
    Spring aop @aspect不生效问题
    教你使用markdown画程序流程图
  • 原文地址:https://www.cnblogs.com/blogyuhan/p/9306678.html
Copyright © 2011-2022 走看看