zoukankan      html  css  js  c++  java
  • Spark:实现行转列

    示例JAVA代码:

    import static org.apache.spark.sql.functions.col;
    import static org.apache.spark.sql.functions.split;
    import static org.apache.spark.sql.functions.explode;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    public class TestSparkSqlSplit {
        public static void main(String[] args){
            SparkSession sparkSession =SparkSession.builder().appName("test").master("local[*]").getOrCreate();
            List<MyEntity> items=new ArrayList<MyEntity>();
            MyEntity myEntity=new MyEntity();
            myEntity.setId("scene_id1,scene_name1;scene_id2,scene_name2|id1");
            myEntity.setName("name");
            myEntity.setFields("other");
            items.add(myEntity);
            
            sparkSession.createDataFrame(items, MyEntity.class).createOrReplaceTempView("test");
            
            Dataset<Row> rows=sparkSession.sql("select * from test");
            rows = rows.withColumn("id", explode(split(split(col("id"), "\|").getItem(0), ";")));
            
            rows=rows.withColumn("id1",split(rows.col("id"),",").getItem(0))
                    .withColumn("name1",split(rows.col("id"),",").getItem(1));
            
            rows=rows.withColumn("id",rows.col("id1"))
                    .withColumn("name",rows.col("name1"));
            
            rows=rows.drop("id1","name1");
            
            rows.show();
            
            sparkSession.stop();        
        }
    }

    MyEntity.java

    import java.io.Serializable;
    
    public class MyEntity implements Serializable{
        private String id;
        private String name;
        private String fields;
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getFields() {
            return fields;
        }
        public void setFields(String fields) {
            this.fields = fields;
        }
        
    }
    View Code

    打印结果:

    18/12/05 17:28:53 INFO codegen.CodeGenerator: Code generated in 36.359731 ms
    +------+---------+-----------+
    |fields|       id|       name|
    +------+---------+-----------+
    | other|scene_id1|scene_name1|
    | other|scene_id2|scene_name2|
    +------+---------+-----------+

     Scala实现:

    [dx@CDH-143 ~]$ spark-shell2
    -bash: spark-shell2: command not found
    [boco@CDH-143 ~]$ spark2-shell
    Setting default log level to "WARN".
    ...
    Spark context available as 'sc' (master = yarn, app id = application_1552012317155_0189).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.2.0.cloudera1
          /_/
             
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> 
    scala> val df = Seq(
         |   (1, "scene_id1,scene_name1;scene_id2,scene_name2",""),
         |   (2, "scene_id1,scene_name1;scene_id2,scene_name2;scene_id3,scene_name3",""),
         |   (3, "scene_id4,scene_name4;scene_id2,scene_name2",""),
         |   (4, "scene_id6,scene_name6;scene_id5,scene_name5","")
         | ).toDF("id", "int_id","name");
    df: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> df.show;
    +---+--------------------+----+
    | id|              int_id|name|
    +---+--------------------+----+
    |  1|scene_id1,scene_n...|    |
    |  2|scene_id1,scene_n...|    |
    |  3|scene_id4,scene_n...|    |
    |  4|scene_id6,scene_n...|    |
    +---+--------------------+----+
    scala> df.withColumn("int_id", explode(split(col("int_id"), ";")));
    res1: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> res1.show();
    +---+--------------------+----+
    | id|              int_id|name|
    +---+--------------------+----+
    |  1|scene_id1,scene_n...|    |
    |  1|scene_id2,scene_n...|    |
    |  2|scene_id1,scene_n...|    |
    |  2|scene_id2,scene_n...|    |
    |  2|scene_id3,scene_n...|    |
    |  3|scene_id4,scene_n...|    |
    |  3|scene_id2,scene_n...|    |
    |  4|scene_id6,scene_n...|    |
    |  4|scene_id5,scene_n...|    |
    +---+--------------------+----+
    scala> res1.withColumn("int_id", split(col("int_id"), ",")(0)).withColumn("name", split(col("int_id"), ",")(1));
    res5: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> res5.show
    +---+---------+----+
    | id|   int_id|name|
    +---+---------+----+
    |  1|scene_id1|null|
    |  1|scene_id2|null|
    |  2|scene_id1|null|
    |  2|scene_id2|null|
    |  2|scene_id3|null|
    |  3|scene_id4|null|
    |  3|scene_id2|null|
    |  4|scene_id6|null|
    |  4|scene_id5|null|
    +---+---------+----+
    scala> res1.withColumn("name", split(col("int_id"), ",")(1)).withColumn("int_id", split(col("int_id"), ",")(0));
    res7: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> res7.show
    +---+---------+-----------+
    | id|   int_id|       name|
    +---+---------+-----------+
    |  1|scene_id1|scene_name1|
    |  1|scene_id2|scene_name2|
    |  2|scene_id1|scene_name1|
    |  2|scene_id2|scene_name2|
    |  2|scene_id3|scene_name3|
    |  3|scene_id4|scene_name4|
    |  3|scene_id2|scene_name2|
    |  4|scene_id6|scene_name6|
    |  4|scene_id5|scene_name5|
    +---+---------+-----------+
    scala> 

    int_id(string类型)为null,会自动转化为空字符串,如果filter中写过滤条件col("int_id").notEqual(null),将会过滤掉所有数据:

    // MARK:如果int_id(string类型)为null,会自动转化为空字符串,如果filter中写过滤条件col("int_id").notEqual(null),将会过滤掉所有数据。
     
    scala> val df = Seq(
         |             (1, null,""),
         |             (2, "-1",""), 
         |             (3, "scene_id4,scene_name4;scene_id2,scene_name2",""),
         |             (4, "scene_id6,scene_name6;scene_id5,scene_name5","")
         |           ).toDF("id", "int_id","name");
    df: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    
    scala> df.filter(col("int_id").notEqual(null).and(col("int_id").notEqual("-1")));
    res5: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, int_id: string ... 1 more field]
    
    scala> res5.show;
    +---+------+----+
    | id|int_id|name|
    +---+------+----+
    +---+------+----+
    
    scala> df.filter(col("int_id").notEqual("").and(col("int_id").notEqual("-1")));
    res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, int_id: string ... 1 more field]
    
    scala> res7.show;
    +---+--------------------+----+
    | id|              int_id|name|
    +---+--------------------+----+
    |  3|scene_id4,scene_n...|    |
    |  4|scene_id6,scene_n...|    |
    +---+--------------------+----+

    int_id如果不包含列传行的条件,数据不会丢失:

    scala> 
    
    scala> val df = Seq(
         | (1, null,""),
         | (2, "-1",""), 
         | (3, "scene_id4,scene_name4;scene_id2,scene_name2",""),
         | (4, "scene_id6,scene_name6;scene_id5,scene_name5","")
         | ).toDF("id", "int_id","name");
    df: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    
    scala> 
    
    scala> df.withColumn("name", split(col("int_id"), ",")(1)).withColumn("int_id", split(col("int_id"), ",")(0));
    res0: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    
    scala> res0.show;
    +---+---------+--------------------+
    | id|   int_id|                name|
    +---+---------+--------------------+
    |  1|     null|                null|
    |  2|       -1|                null|
    |  3|scene_id4|scene_name4;scene...|
    |  4|scene_id6|scene_name6;scene...|
    +---+---------+--------------------+
    
    
    scala> 
  • 相关阅读:
    设计模式(8)[JS版]-JavaScript设计模式之如何实现适配器模式???
    JS+CSS实现左右文字滚动
    设计模式(7)[JS版]-JavaScript设计模式之原型模式如何实现???
    如何更聪明地学习:20种让你更高效学习的科学方法
    设计模式(6)[JS版]-JavaScript如何实现抽象工厂模式?
    设计模式(5)[JS版]-JavaScript如何实现工厂方法模式?
    设计模式(4)[JS版]-JavaScript如何实现建造者模式?
    设计模式(3)[JS版]-JavaScript中的构造函数模式是什么?
    PAT A1094 The Largest Generation (25分)
    PAT A1090 Highest Price in Supply Chain (25分)(边界问题)
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/10072234.html
Copyright © 2011-2022 走看看