zoukankan      html  css  js  c++  java
  • Spark:几种给Dataset增加列的方式、Dataset删除列、Dataset替换null列

    几种给Dataset增加列的方式

    首先创建一个DF对象:

    scala> spark.version
    res0: String = 2.2.0.cloudera1
    
    scala> val df = spark.createDataset(Seq(("key1", 23, 1.0), ("key1", 10, 2.0))).toDF("id", "rsrp", "rsrq")
    df: org.apache.spark.sql.DataFrame = [id: string, rsrp: int ... 1 more field]
    
    scala> df.show
    +----+----+----+
    |  id|rsrp|rsrq|
    +----+----+----+
    |key1|  23| 1.0|
    |key1|  10| 2.0|
    +----+----+----+
    
    
    scala> df.printSchema
    root
     |-- id: string (nullable = true)
     |-- rsrp: integer (nullable = false)
     |-- rsrq: double (nullable = false)

    第一种方式:使用lit()增加常量(固定值)

    可以是字符串类型,整型

    scala> df.withColumn("sinurl", lit(12)).show 
    +----+----+----+------+
    |  id|rsrp|rsrq|sinurl|
    +----+----+----+------+
    |key1|  23| 1.0|    12|
    |key1|  10| 2.0|    12|
    +----+----+----+------+
    
    scala> df.withColumn("type", lit("mr")).show 
    +----+----+----+----+
    |  id|rsrp|rsrq|type|
    +----+----+----+----+
    |key1|  23| 1.0|  mr|
    |key1|  10| 2.0|  mr|
    +----+----+----+----+

    注意:

    lit()是spark自带的函数,需要import org.apache.spark.sql.functions

    Since 1.3.0
    def lit(literal: Any): Column Creates a Column of literal value. The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value.

    第二种方式:使用当前已有的某列的变换新增

    scala> df.withColumn("rsrp2", $"rsrp"*2).show 
    +----+----+----+-----+
    |  id|rsrp|rsrq|rsrp2|
    +----+----+----+-----+
    |key1|  23| 1.0|   46|
    |key1|  10| 2.0|   20|
    +----+----+----+-----+

    第三种方式:使用select函数增加列

    java方式:

    import static org.apache.spark.sql.functions.col;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.functions;
    import org.apache.spark.sql.api.java.UDF1;
    import org.apache.spark.sql.types.DataTypes;
    ...
        private final SimpleDateFormat srcSdf = new SimpleDateFormat("yyyy-MM-dd HH:00:00");
        private final SimpleDateFormat destSdf = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
        
        public Dataset<Row> handler(Dataset<Row> esDataset){
            UDF1 date_fomat = new UDF1<String, String>() {
                private static final long serialVersionUID = 1L;
    
                public String call(final String value) throws Exception {
                    Date date = srcSdf.parse(value);
                    return destSdf.format(date);
                }
            };
            sparkSession.udf().register("date_fomat_func", date_fomat, DataTypes.StringType);
    
            UDF1 to_long = new UDF1<Long, Long>() {
                private static final long serialVersionUID = 1L;
    
                public Long call(final Long value) throws Exception {
                    Date date = srcSdf.parse(String.valueOf(value));
                    return destSdf.parse(destSdf.format(date)).getTime();
                }
            };
            sparkSession.udf().register("to_long_func", to_long, DataTypes.LongType);
    
            esDataset=esDataset.withColumn("scan_start_time", functions.callUDF("date_fomat_func", col("scan_start_time")));
            esDataset=esDataset.withColumn("scan_stop_time", functions.callUDF("date_fomat_func", col("scan_stop_time")));
            esDataset=esDataset.withColumn("timestamp", functions.callUDF("to_long_func", col("timestamp")));
            
            return esDataset;
        }
    ...

    scala

    scala> import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.DataTypes
    scala> df.select(col("*"), 
         |     udf{
         |         (e:Int) =>
         |             if(e == "23") {
         |                 1
         |             } else {
         |                 2
         |             }
         |     }.apply(df("rsrp")).cast(DataTypes.DoubleType).as("rsrp_udf")
         | ).show
    +----+----+----+--------+
    |  id|rsrp|rsrq|rsrp_udf|
    +----+----+----+--------+
    |key1|  23| 1.0|     2.0|
    |key1|  10| 2.0|     2.0|
    +----+----+----+--------+
    scala> df.select(col("*"),
         |     when(df("rsrp") > 10, lit(">10")).when(df("rsrp") === 10, "=10").otherwise("<10").as("rsrp_compare10")
         | ).show
    +----+----+----+--------------+
    |  id|rsrp|rsrq|rsrp_compare10|
    +----+----+----+--------------+
    |key1|  23| 1.0|           >10|
    |key1|  10| 2.0|           =10|
    +----+----+----+--------------+

    第四种方式:case when当参数嵌套udf

    df.withColumn("r",
       when($"rsrp".isNull, lit(null))
           .otherwise(udf1($"rsrp"))
           .cast(DataTypes.IntegerType)
    )

    第五种方式:使用expr()函数

    scala> df.withColumn("rsrp4", expr("rsrp * 4")).show
    +----+----+----+-----+
    |  id|rsrp|rsrq|rsrp4|
    +----+----+----+-----+
    |key1|  23| 1.0|   92|
    |key1|  10| 2.0|   40|
    +----+----+----+-----+

    Dataset删除列

    scala> df.drop("rsrp").show
    +----+----+
    |  id|rsrq|
    +----+----+
    |key1| 1.0|
    |key1| 2.0|
    +----+----+
    
    
    scala> df.drop("rsrp","rsrq").show
    +----+
    |  id|
    +----+
    |key1|
    |key1|
    +----+

    Dataset替换null列

    首先,在hadoop目录/user/spark/test.csv

    [spark@master ~]$ hadoop fs -text /user/spark/test.csv
    key1,key2,key3,key4,key5
    aaa,1,2,t1,4
    bbb,5,3,t2,8
    ccc,2,2,,7
    ,7,3,t1,
    bbb,1,5,t3,0
    ,4,,t1,8 

    备注:如果想在根目录下执行spark-shell.需要在/etc/profile中追加spark的安装目录:

    export SPARK_HOME=/opt/spark-2.2.1-bin-hadoop2.7
    export PATH=$PATH:$SPARK_HOME/bin

    使用spark加载.user/spark/test.csv文件

    [spark@master ~]$ spark-shell
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/10/29 21:50:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://192.168.0.120:4040
    Spark context available as 'sc' (master = local[*], app id = local-1540821032565).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.2.1
          /_/
             
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> val df = spark.read.option("header","true").csv("/user/spark/test.csv")
    18/10/29 21:51:16 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
    18/10/29 21:51:16 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
    18/10/29 21:51:37 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    df: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]
    
    scala> df.show
    +----+----+----+----+----+
    |key1|key2|key3|key4|key5|
    +----+----+----+----+----+
    | aaa|   1|   2|  t1|   4|
    | bbb|   5|   3|  t2|   8|
    | ccc|   2|   2|null|   7|
    |null|   7|   3|  t1|null|
    | bbb|   1|   5|  t3|   0|
    |null|   4|null|  t1|  8 |
    +----+----+----+----+----+
    
    scala> df.schema
    res3: org.apache.spark.sql.types.StructType = StructType(StructField(key1,StringType,true), StructField(key2,StringType,true), 
    StructField(key3,StringType,true), StructField(key4,StringType,true), StructField(key5,StringType,true)) scala> df.printSchema root |-- key1: string (nullable = true) |-- key2: string (nullable = true) |-- key3: string (nullable = true) |-- key4: string (nullable = true) |-- key5: string (nullable = true)

    一次修改相同类型的多个列的示例。 这里是把key3,key5列中所有的null值替换成1024。 csv导入时默认是string,如果是整型,写法是一样的,有各个类型的重载。

    scala>  df.na.fill("1024",Seq("key3","key5")).show
    +----+----+----+----+----+
    |key1|key2|key3|key4|key5|
    +----+----+----+----+----+
    | aaa|   1|   2|  t1|   4|
    | bbb|   5|   3|  t2|   8|
    | ccc|   2|   2|null|   7|
    |null|   7|   3|  t1|1024|
    | bbb|   1|   5|  t3|   0|
    |null|   4|1024|  t1|  8 |
    +----+----+----+----+----+

    一次修改不同类型的多个列的示例。 csv导入时默认是string,如果是整型,写法是一样的,有各个类型的重载。

    scala> df.na.fill(Map(("key1"->"yyy"),("key3","1024"),("key4","t88"),("key5","4096"))).show
    +----+----+----+----+----+
    |key1|key2|key3|key4|key5|
    +----+----+----+----+----+
    | aaa|   1|   2|  t1|   4|
    | bbb|   5|   3|  t2|   8|
    | ccc|   2|   2| t88|   7|
    | yyy|   7|   3|  t1|4096|
    | bbb|   1|   5|  t3|   0|
    | yyy|   4|1024|  t1|  8 |
    +----+----+----+----+----+

    不修改,只是过滤掉含有null值的行。 这里是过滤掉key3,key5列中含有null的行

    scala>  df.na.drop(Seq("key3","key5")).show
    +----+----+----+----+----+
    |key1|key2|key3|key4|key5|
    +----+----+----+----+----+
    | aaa|   1|   2|  t1|   4|
    | bbb|   5|   3|  t2|   8|
    | ccc|   2|   2|null|   7|
    | bbb|   1|   5|  t3|   0|
    +----+----+----+----+----+

    过滤掉指定的若干列中,有效值少于n列的行 这里是过滤掉key1,key2,key3这3列中有效值小于2列的行。最后一行中,这3列有2列都是null,所以被过滤掉了。

    scala> df.na.drop(2,Seq("key1","key2","key3")).show
    +----+----+----+----+----+
    |key1|key2|key3|key4|key5|
    +----+----+----+----+----+
    | aaa|   1|   2|  t1|   4|
    | bbb|   5|   3|  t2|   8|
    | ccc|   2|   2|null|   7|
    |null|   7|   3|  t1|null|
    | bbb|   1|   5|  t3|   0|
    +----+----+----+----+----+

    同上,如果不指定列名列表,则默认列名列表就是所有列

    scala> df.na.drop(4).show
    +----+----+----+----+----+
    |key1|key2|key3|key4|key5|
    +----+----+----+----+----+
    | aaa|   1|   2|  t1|   4|
    | bbb|   5|   3|  t2|   8|
    | ccc|   2|   2|null|   7|
    | bbb|   1|   5|  t3|   0|
    +----+----+----+----+----+

    参考:

    https://blog.csdn.net/coding_hello/article/details/75211995

    https://blog.csdn.net/xuejianbest/article/details/81666065

  • 相关阅读:
    如何根据二叉树 前序遍历 中序遍历 后序遍历 中的两种遍历来反推另一种遍历
    dijkstral改编
    纪念做出来的第一道计算几何题
    链式前向星
    一道简单树形dp
    算法进阶指南—特殊排序
    算法进阶指南二分章节的两道题
    秦皇岛winter camp 总结
    C
    一道cf水题
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9872492.html
Copyright © 2011-2022 走看看