zoukankan      html  css  js  c++  java
  • Pyspark 最近使用的一些有趣姿势的梳理

    之前对 SQL 还是不是非常熟悉的,但是现在或多或少还是会写一些计算任务。比如最近在推送将所有天级的耗时任务都从传统关系型数据库迁移至 Spark 集群当中进行计算,中间遇到一些有趣的小问题在这里记录一下。

    Q: 我想按照某个字段分组并且把一组查询字段连起来得到一个 json 然后把结果作为一个字段应该怎么弄?

    A: 这里我的思路是将我们需要 dumps 的字段给拼接起来,然后使用列表将同一个分组里面的是数据组合起来。然后过一个 udf 把列表中的记录处理成数组最后 json.dumps 一下即可。来看个栗子

    # 先查询出要操作的目标信息 这一步可以和下面的操作合并,我这里为了方便看拆开了
    df = ss.sql("""
                            SELECT 
                                t1.pay_id, 
                                t1.pay_money, 
                                t1.user_id
                            FROM
                                analytics_db.hd_day_order_record t1 
                        """)
    
    # 拼接目标字符串并且组合
    df = df.select(
                   df.pay_id,
                   df.pay_money,
                   df.pay_user_id,
                   f.concat_ws('01', df.pay_id,  df.pay_user_id, df.pay_money).alias('sku_buys'))
    )
    
    # 拼接一个重复 user_id 的 list
    df = df.groupBy(df.pay_user_id).agg(f.collect_list('sku_buys').alias('sku_buys'))
    
    # 将 sku_buys 丢给一个 jsondump 的 udf 就可以得到结果了
    df = df.select(df.pay_user_id, sb_json(df.sku_buys).alias('sku_buys'))

    Q: 如果我想对目标进行分组,并且让他在组内有序应该怎么做?

    A: 这通常被称为进行组内排序。其实我之前一直尝试用类似的语法来达到这种效果

    df = ss.sql("""
            SELECT
                first(t1.sku_mode) AS sku_mode,
                first(t1.exchange_type_t01) AS exchange_type_t01,
                first(t1.user_id) AS user_id,
                first(t1.pay_id) AS pay_id,
                first(t1.charge_time) AS charge_time,
                first(t2.has_yxs_payment) AS has_yxs_payment,
                first(t2.has_sxy_payment) AS has_sxy_payment,
                first(t2.has_cxy_payment) AS has_cxy_payment,
                first(t2.has_sxy19_payment) AS has_sxy19_payment,
                first(t2.sxy19_join_time) AS sxy19_join_time,
                first(t2.yxs_join_time) AS yxs_join_time
            FROM
                d_exchange_info t1
            JOIN
                analytics_db.md_day_dump_users t2
            ON
                t2.the_day = '{}'
                AND t1.user_id = t2.user_id
            GROUP BY
                t1.user_id
            ORDER BY
                charge_time
    """.format(st))

    其实这是错的,这里的 order by 并不能达到一个组内排序的效果,而是一个外部排序。所以这里取 first 是一个不稳定的结果。有时候你拿到的是一个结果,也许有时候你拿到的是另外一个结果。要进行组内排序,我们可以先用这样的思路,先对需要 order by 字段的表进行组内排序,然后再让他与其他表 join 获得更多的信息,这样就能保证是有序的。

    这里我引用一个窗口函数来达到这样的效果。

            _df = ss.sql("""
                            SELECT 
                                t1.user_id,
                                t1.pay_id,
                                t1.sku_mode,
                                t1.charge_time,
                                t1.exchange_type_t01,
                                ROW_NUMBER() OVER(PARTITION BY t1.user_id ORDER BY t1.charge_time) as rid
                            FROM 
                                {} t1 
                            WHERE 
                                t1.refund_state = 0
                        """.format(exchange_info_table))
        _df = _df.filter(_df.rid==1)

    我先使用窗口函数 ROW_NUMBER 以 user_id 分组并且根据 charge_time 对表一进行组内排序。得到结果之后,使用 filter 过滤一下 rid =1 的结果。再与另外一张表 join 得到补充信息就能达到想要的效果。

    Q: 我想对结果进行转列应该怎么做?

    A: 行转列 列转行可能是 SQL 计算里面会经常使用到的方法,但是对于 SQL 并不熟悉的同学(比如我)就不知道该怎么整来看个例子

    df = ss.sql("""
        SELECT
            user_id,
            sku_mode,
            credit_score
        FROM
            analytics_db.hd_day_user_credit
        WHERE
            gain_time >= '{}'
            AND gain_time < '{}'
            AND the_day = '{}'
    """.format(start_time, end_time, st))
    # df.show(10)

    展示的数据类似于

    +--------------------+--------+------------+
    |             user_id|sku_mode|credit_score|
    +--------------------+--------+------------+
    |d394899919216bc10...|     yxs|           3|
    |625002ad625bc9a69...|     yxs|           3|
    |8dd11e29bf50cb8c8...|     cxy|           3|
    |0f0b88ff589cb46cd...|     yxs|           3|
    |eeb8e839139876971...|     yxs|           1|
    |f63b2b9c8340d3c80...|     cxy|           1|
    |806c9f0349e7e8389...|     cxy|           1|
    |bf312181eaaa0ec9e...|     yxs|           1|
    |ee4a7984dc40cabbf...|     yxs|           3|
    |7a6b15f16c1f782de...|   sxy19|           3|
    +--------------------+--------+------------+
    only showing top 10 rows

    我们可以基于此将 sku_mode 一样的类型合并进行行转列变换

    df = df.groupby('user_id').pivot(
        'sku_mode', ['yxs', 'cxy', 'sxy', 'sxy19']
    ).agg(
        f.sum('credit_score')
    ).fillna(0)

    这句话的意思是根据 user_id 进行分组,并且将 sku_mode 的行转列,需要转列的类型需要在后面的 list 中添加,并且列里记录 各sku_mode credit_score 汇总的量。

    +--------------------+---+---+---+-----+
    |             user_id|yxs|cxy|sxy|sxy19|
    +--------------------+---+---+---+-----+
    |5ec336994e7b5d73f...|  0|  0|  0|    2|
    |06b1120a4544b1b8a...|  0|  0|  0|    2|
    |6fe19e193ad43bafc...|  0|  0|  0|    3|
    |3e5f9fc4550ae7cba...|  1|  0|  0|    0|
    |b1d1d856e28908f5a...|  1|  0|  0|    3|
    |7a065e02ed1693cf4...|  2|  0|  0|    0|
    |651f9f0b11de08003...|  0|  0|  0|    3|
    |d02491502946aba2f...|  0|  0|  0|    2|
    |e24b58cb87762b2da...|  0|  6|  0|   15|
    |925f6a832b1a95c45...|  1|  0|  0|    0|
    +--------------------+---+---+---+-----+
    only showing top 10 rows

    Q: 我想对结果进行列转行应该怎么做?

    A: 我们接着上面的例子来讲 unpivot 行转列的逆操作。还是接着刚才那个栗子。

    df2 = df
    df2 = df2.selectExpr("user_id", 
                         "stack(4, 'yxs', yxs, 'cxy', cxy, 'sxy', sxy, 'sxy19', sxy19) AS (sku_mode, credit_score)")
    
    df.where(df.user_id=='e24b58cb87762b2da9fa118316e9c91a').show(10, False)
    df2.filter(df2.user_id=='e24b58cb87762b2da9fa118316e9c91a').show(10, False)
    
    
    +--------------------------------+---+---+---+-----+
    |user_id                         |yxs|cxy|sxy|sxy19|
    +--------------------------------+---+---+---+-----+
    |e24b58cb87762b2da9fa118316e9c91a|0  |6  |0  |15   
    +--------------------------------+---+---+---+-----+
    
    +--------------------------------+--------+------------+
    |user_id                         |sku_mode|credit_score|
    +--------------------------------+--------+------------+
    |e24b58cb87762b2da9fa118316e9c91a|yxs     |0           |
    |e24b58cb87762b2da9fa118316e9c91a|cxy     |6           |
    |e24b58cb87762b2da9fa118316e9c91a|sxy     |0           |
    |e24b58cb87762b2da9fa118316e9c91a|sxy19   |15          |
    +--------------------------------+--------+------------+

    可以看到我们通过这种办法将列重新组合成行记录。这里需要多延伸一下,这里使用的 selectExpr 语句的语意是将里面的参数直接解析成 select 里面的内容。

    stack 函数是 spark 中的 func.他接收无数个参数,第一个参数 n 的意义是转换的行数,对二个开始到后面的参数都是内容。

    stack 的作用是将第二个开始的到后面的参数 塞进 n 行中。

    举个栗子来说哦,就是上文使用的

    stack(4, 'yxs', yxs, 'cxy', cxy, 'sxy', sxy, 'sxy19', sxy19) AS (sku_mode, credit_score)

    这里的语意就是切分成 4 行。从第二个字段开始 字符串部分表达的是匹配的 sku_mode 分辨是('yxs', 'cxy', 'sxy', 'sxy19')然后跟在他们后面的分别是credit_score 的值  然后展现成两列两个字段。有点绕需要多理解一下。最好是在 spark 终端中试一试比较有感觉。

    之后还有有意思的姿势会继续补充在这里。

    Reference:

    https://sparkbyexamples.com/how-to-pivot-table-and-unpivot-a-spark-dataframe/   How to Pivot and Unpivot a Spark SQL DataFrame

    https://stackoverflow.com/questions/56371391/in-group-sort-table-join-a-another-table-use-first-func/56371513#56371513

  • 相关阅读:
    完成后台管理系统功能(三)查询商品信息列表
    完成后台管理系统功能(二)有关SSM的整合
    实现后台管理系统功能(一)maven工程的建立
    开始‘京西商城’的电商项目(SSM)
    到此,使用struts2+hibernate实现登陆以及学生列表的增删改查 结束 -------------------------------------------------------
    chrome 中 preview 和 response 数据不一致
    单元测试执行过程中忽略出错的单元测试
    使用Maven 构建时跳过单元测试
    使用 AutoIt3 加密解密数据
    SpringBoot 启动流程
  • 原文地址:https://www.cnblogs.com/piperck/p/10917368.html
Copyright © 2011-2022 走看看