zoukankan      html  css  js  c++  java
  • Spark:实现行转列

    示例JAVA代码:

    import static org.apache.spark.sql.functions.col;
    import static org.apache.spark.sql.functions.split;
    import static org.apache.spark.sql.functions.explode;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    public class TestSparkSqlSplit {
        public static void main(String[] args){
            SparkSession sparkSession =SparkSession.builder().appName("test").master("local[*]").getOrCreate();
            List<MyEntity> items=new ArrayList<MyEntity>();
            MyEntity myEntity=new MyEntity();
            myEntity.setId("scene_id1,scene_name1;scene_id2,scene_name2|id1");
            myEntity.setName("name");
            myEntity.setFields("other");
            items.add(myEntity);
            
            sparkSession.createDataFrame(items, MyEntity.class).createOrReplaceTempView("test");
            
            Dataset<Row> rows=sparkSession.sql("select * from test");
            rows = rows.withColumn("id", explode(split(split(col("id"), "\|").getItem(0), ";")));
            
            rows=rows.withColumn("id1",split(rows.col("id"),",").getItem(0))
                    .withColumn("name1",split(rows.col("id"),",").getItem(1));
            
            rows=rows.withColumn("id",rows.col("id1"))
                    .withColumn("name",rows.col("name1"));
            
            rows=rows.drop("id1","name1");
            
            rows.show();
            
            sparkSession.stop();        
        }
    }

    MyEntity.java

    import java.io.Serializable;
    
    public class MyEntity implements Serializable{
        private String id;
        private String name;
        private String fields;
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getFields() {
            return fields;
        }
        public void setFields(String fields) {
            this.fields = fields;
        }
        
    }
    View Code

    打印结果:

    18/12/05 17:28:53 INFO codegen.CodeGenerator: Code generated in 36.359731 ms
    +------+---------+-----------+
    |fields|       id|       name|
    +------+---------+-----------+
    | other|scene_id1|scene_name1|
    | other|scene_id2|scene_name2|
    +------+---------+-----------+

     Scala实现:

    [dx@CDH-143 ~]$ spark-shell2
    -bash: spark-shell2: command not found
    [boco@CDH-143 ~]$ spark2-shell
    Setting default log level to "WARN".
    ...
    Spark context available as 'sc' (master = yarn, app id = application_1552012317155_0189).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.2.0.cloudera1
          /_/
             
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> 
    scala> val df = Seq(
         |   (1, "scene_id1,scene_name1;scene_id2,scene_name2",""),
         |   (2, "scene_id1,scene_name1;scene_id2,scene_name2;scene_id3,scene_name3",""),
         |   (3, "scene_id4,scene_name4;scene_id2,scene_name2",""),
         |   (4, "scene_id6,scene_name6;scene_id5,scene_name5","")
         | ).toDF("id", "int_id","name");
    df: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> df.show;
    +---+--------------------+----+
    | id|              int_id|name|
    +---+--------------------+----+
    |  1|scene_id1,scene_n...|    |
    |  2|scene_id1,scene_n...|    |
    |  3|scene_id4,scene_n...|    |
    |  4|scene_id6,scene_n...|    |
    +---+--------------------+----+
    scala> df.withColumn("int_id", explode(split(col("int_id"), ";")));
    res1: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> res1.show();
    +---+--------------------+----+
    | id|              int_id|name|
    +---+--------------------+----+
    |  1|scene_id1,scene_n...|    |
    |  1|scene_id2,scene_n...|    |
    |  2|scene_id1,scene_n...|    |
    |  2|scene_id2,scene_n...|    |
    |  2|scene_id3,scene_n...|    |
    |  3|scene_id4,scene_n...|    |
    |  3|scene_id2,scene_n...|    |
    |  4|scene_id6,scene_n...|    |
    |  4|scene_id5,scene_n...|    |
    +---+--------------------+----+
    scala> res1.withColumn("int_id", split(col("int_id"), ",")(0)).withColumn("name", split(col("int_id"), ",")(1));
    res5: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> res5.show
    +---+---------+----+
    | id|   int_id|name|
    +---+---------+----+
    |  1|scene_id1|null|
    |  1|scene_id2|null|
    |  2|scene_id1|null|
    |  2|scene_id2|null|
    |  2|scene_id3|null|
    |  3|scene_id4|null|
    |  3|scene_id2|null|
    |  4|scene_id6|null|
    |  4|scene_id5|null|
    +---+---------+----+
    scala> res1.withColumn("name", split(col("int_id"), ",")(1)).withColumn("int_id", split(col("int_id"), ",")(0));
    res7: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    scala> res7.show
    +---+---------+-----------+
    | id|   int_id|       name|
    +---+---------+-----------+
    |  1|scene_id1|scene_name1|
    |  1|scene_id2|scene_name2|
    |  2|scene_id1|scene_name1|
    |  2|scene_id2|scene_name2|
    |  2|scene_id3|scene_name3|
    |  3|scene_id4|scene_name4|
    |  3|scene_id2|scene_name2|
    |  4|scene_id6|scene_name6|
    |  4|scene_id5|scene_name5|
    +---+---------+-----------+
    scala> 

    int_id(string类型)为null,会自动转化为空字符串,如果filter中写过滤条件col("int_id").notEqual(null),将会过滤掉所有数据:

    // MARK:如果int_id(string类型)为null,会自动转化为空字符串,如果filter中写过滤条件col("int_id").notEqual(null),将会过滤掉所有数据。
     
    scala> val df = Seq(
         |             (1, null,""),
         |             (2, "-1",""), 
         |             (3, "scene_id4,scene_name4;scene_id2,scene_name2",""),
         |             (4, "scene_id6,scene_name6;scene_id5,scene_name5","")
         |           ).toDF("id", "int_id","name");
    df: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    
    scala> df.filter(col("int_id").notEqual(null).and(col("int_id").notEqual("-1")));
    res5: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, int_id: string ... 1 more field]
    
    scala> res5.show;
    +---+------+----+
    | id|int_id|name|
    +---+------+----+
    +---+------+----+
    
    scala> df.filter(col("int_id").notEqual("").and(col("int_id").notEqual("-1")));
    res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, int_id: string ... 1 more field]
    
    scala> res7.show;
    +---+--------------------+----+
    | id|              int_id|name|
    +---+--------------------+----+
    |  3|scene_id4,scene_n...|    |
    |  4|scene_id6,scene_n...|    |
    +---+--------------------+----+

    int_id如果不包含列传行的条件,数据不会丢失:

    scala> 
    
    scala> val df = Seq(
         | (1, null,""),
         | (2, "-1",""), 
         | (3, "scene_id4,scene_name4;scene_id2,scene_name2",""),
         | (4, "scene_id6,scene_name6;scene_id5,scene_name5","")
         | ).toDF("id", "int_id","name");
    df: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    
    scala> 
    
    scala> df.withColumn("name", split(col("int_id"), ",")(1)).withColumn("int_id", split(col("int_id"), ",")(0));
    res0: org.apache.spark.sql.DataFrame = [id: int, int_id: string ... 1 more field]
    
    scala> res0.show;
    +---+---------+--------------------+
    | id|   int_id|                name|
    +---+---------+--------------------+
    |  1|     null|                null|
    |  2|       -1|                null|
    |  3|scene_id4|scene_name4;scene...|
    |  4|scene_id6|scene_name6;scene...|
    +---+---------+--------------------+
    
    
    scala> 
  • 相关阅读:
    Open source cryptocurrency exchange
    Salted Password Hashing
    95. Unique Binary Search Trees II
    714. Best Time to Buy and Sell Stock with Transaction Fee
    680. Valid Palindrome II
    Java compiler level does not match the version of the installed Java project facet.
    eclipse自动编译
    Exception in thread "main" java.lang.StackOverflowError(栈溢出)
    博客背景美化——动态雪花飘落
    java九九乘法表
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/10072234.html
Copyright © 2011-2022 走看看