zoukankan      html  css  js  c++  java
  • Spark开发_Spark数据变换-透视(Pivot)

    数据变换

     长表和宽表的变换,使用Spark进行变换,SQL中使用 case when。涉及维度比较多的时候,采用数据透视的方式进行数据变换
     在Spark SQL 3.0.1 中有相关的子句实现了。(2020年6月15日 发布的版本)
      PIVOT ( { aggregate_expression [ AS aggregate_expression_alias ] } [ , ... ]FOR column_list IN ( expression_list ) )
     目前使用的版本不支持,所以采用了spark API在Dataset中的实现的透视功能,如下所示,开发使用的是Java语言。
    

    附录:

         /**
         *   groupBy 分组字段: 按照不需要转换的字段分组,本例中是年月;pivot 跟在groupby后
         *   pivot 维度转换字段:使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项;
         *   agg 汇总数字字段:本例中是 stu_cnt
         *   na() 空值处理 
         */
    
     package com.le;
    
     import org.apache.spark.sql.*;
     import org.apache.spark.sql.types.DataTypes;
     import org.apache.spark.sql.types.Metadata;
     import org.apache.spark.sql.types.StructField;
     import org.apache.spark.sql.types.StructType;
     import static org.apache.spark.sql.functions.sum;
     import java.util.ArrayList;
     import java.util.Arrays;
     import java.util.List;
    
    public class DataTransfer {
        public static void main(String[] args) {
            SparkSession spark = SparkSession.builder()
                    .master("local[2]")
                    .appName("DataSetTransferExample")
                   // .enableHiveSupport()
                    .getOrCreate();
            List<Row> dimensionData = Arrays.asList(
                    RowFactory.create("CD165",   "2",  "Test", 1),
                    RowFactory.create("CD165",   "3",  "Action" ,1),
                    RowFactory.create("CD165",   "2",  "Learn",1),
                    RowFactory.create("CD165",   "1",  "Test",1)
            );
            StructType dimensionSchema = new StructType(new StructField[]{
                    new StructField("grade_id", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("stu_state", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("stu_detail", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("stu_cnt", DataTypes.IntegerType, false, Metadata.empty())
            });
    
            Dataset<Row> dimensionDF = spark.createDataFrame(dimensionData, dimensionSchema);
            dimensionDF.show();
    
            ArrayList arrColName = new ArrayList();
            arrColName.add("Test");
            arrColName.add("Action");
            Dataset<Row> df_pivot = dimensionDF
                    .groupBy("grade_id","stu_state")
                    .pivot("stu_detail",arrColName)
                    .agg(sum("stu_cnt"))
                    .na().fill(0)
           ;
            df_pivot.show();
            Dataset<Row> df_pivot_2 = df_pivot
                    .groupBy("grade_id")
                    .pivot("stu_state")
                    .agg(sum("Test"))
                    .na().fill(0);
            Dataset<Row> df_pivot_3 = df_pivot
                    .groupBy("grade_id")
                    .pivot("stu_state")
                    .agg(sum("Action"))
                    .na().fill(0);
            Dataset<Row>  resultDF = df_pivot_2
                    .withColumnRenamed("stu_state", "post_id_acc")
                    .join(df_pivot_3,"grade_id");
            resultDF.show();
        }
    }
    

    其他

     stack 是内置函数
     SELECT * FROM person PIVOT ( SUM(age) AS a, AVG(class) AS c  FOR name IN ('John' AS john, 'Mike' AS mike) );
      在Spark SQL 中3.0.1版本中有相关的实现
       https://spark.apache.org/docs/3.0.1/sql-ref-syntax-qry-select-pivot.html
    

    参考

    Spark实现行列转换pivot和unpivot https://juejin.im/post/6844903619171631117
    Spark--透视函数pivot应用(行列转换) https://www.jianshu.com/p/36bdf156cbda
    http://spark.apache.org/docs/latest/sql-ref-syntax.html
    http://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-pivot.html
    https://sparkbyexamples.com/spark/how-to-pivot-table-and-unpivot-a-spark-dataframe/#pivot-performance
  • 相关阅读:
    BadUSB 利用
    java 将函数作为参数传递
    odoo12 修行提升篇之 常用的高阶函数 (二)
    odoo12 修行提升篇之 异步定时任务 (一)
    odoo12 修行基础篇之 利用kanban做分析 点击跳转分析模型列表 (九)
    odoo12 修行基础篇之 kanban (八)
    odoo12 修行基础篇之 记录批处理 (七)
    odoo12 修行基础篇之 列表的筛选和分组 (六)
    odoo12 修行基础篇之 添加记录编码 (五)
    odoo12 修行基础篇之 添加工作流和操作记录 (四)
  • 原文地址:https://www.cnblogs.com/ytwang/p/13746148.html
Copyright © 2011-2022 走看看