背景
做过数据清洗ETL工作的都知道,行列转换是一个常见的数据整理需求。在不同的编程语言中有不同的实现方法,比如SQL中使用case+group,或者Power BI的M语言中用拖放组件实现。今天正好需要在pyspark中处理一个数据行列转换,就把这个方法记录下来。
首先明确一下啥叫行列转换,因为这个叫法也不是很统一,有的地方叫转置,有的地方叫透视,不一而足。我们就以下图为例,定义如下:
- 从左边这种变成右边这种,叫透视(pivot)
- 反之叫逆透视(unpivot)
Spark实现
构造样本数据
首先我们构造一个以行格式保存数据的数据集
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('JupyterPySpark').enableHiveSupport().getOrCreate() import pyspark.sql.functions as F # 原始数据 df = spark.createDataFrame([('2018-01','项目1',100), ('2018-01','项目2',200), ('2018-01','项目3',300), ('2018-02','项目1',1000), ('2018-02','项目2',2000), ('2018-03','项目x',999) ], ['年月','项目','收入'])
样本数据如下,我们可以看到,每一个项目在指定月份都只有一行记录,并且项目是稀疏的。即,不是每个项目都会出现在每一个月份中,如项目2仅出现在2018-01当中。
+-------+---+----+ | 年月| 项目| 收入| +-------+---+----+ |2018-01|项目1| 100| |2018-01|项目2| 200| |2018-01|项目3| 300| |2018-02|项目1|1000| |2018-02|项目2|2000| |2018-03|项目x| 999| +-------+---+----+
透视Pivot
透视操作简单直接,逻辑如下
- 按照不需要转换的字段分组,本例中是年月;
- 使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项;
- 汇总数字字段,本例中是收入;
代码如下
df_pivot = df.groupBy('年月') .pivot('项目', ['项目1','项目2','项目3','项目x']) .agg(F.sum('收入')) .fillna(0)
结果如下
+-------+----+----+---+---+ | 年月| 项目1| 项目2|项目3|项目x| +-------+----+----+---+---+ |2018-03| 0| 0| 0|999| |2018-02|1000|2000| 0| 0| |2018-01| 100| 200|300| 0| +-------+----+----+---+---+
逆透视Unpivot
Spark没有提供内置函数来实现unpivot操作,不过我们可以使用Spark SQL提供的stack函数来间接实现需求。有几点需要特别注意:
- 使用selectExpr在Spark中执行SQL片段;
- 如果字段名称有中文,要使用反引号**`** 把字段包起来;
代码如下
df_pivot.selectExpr("`年月`", "stack(4, '项目1', `项目1`,'项目2', `项目2`, '项目3', `项目3`, '项目x', `项目x`) as (`项目`,`收入`)") .filter("`收入` > 0 ") .orderBy(["`年月`", "`项目`"]) .show()
结果如下
+-------+---+----+ | 年月| 项目| 收入| +-------+---+----+ |2018-01|项目1| 100| |2018-01|项目2| 200| |2018-01|项目3| 300| |2018-02|项目1|1000| |2018-02|项目2|2000| |2018-03|项目x| 999| +-------+---+----+
Reference:
https://juejin.im/post/5b1e343f518825137c1c6a27 掘金