zoukankan      html  css  js  c++  java
  • csv数据文件清洗【DataFrame】

    package march.sql
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.functions._
    
    /**
      * Description: TODO
      *
      * @Author: 留歌36
      * @Date: 2019/3/6 8:57
      */
    object ChengduHouseAPP {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
        // 隐式转换
        val path = "f:\data\chengdu_house.csv"
        val path2 = "f:\data\hangzhou_house.csv"
        val path3 = "f:\data\shanghai_house.csv"
        val path4 = "f:\data\kunming_house.csv"
        val DF = spark.read.option("header","true").option("inferSchema","true").csv(path)
        val DF2 = spark.read.option("header","true").option("inferSchema","true").csv(path2)
        val DF3 = spark.read.option("header","true").option("inferSchema","true").csv(path3)
        val DF4 = spark.read.option("header","true").option("inferSchema","true").csv(path4)
    
        val DF5 = DF
          .union(DF2.select("house_info","region","publishday","visited","attention","total_price","unit_price","url"))
          .union(DF3.select("house_info","region","publishday","visited","attention","total_price","unit_price","url"))
          .union(DF4.select("house_info","region","publishday","visited","attention","total_price","unit_price","url"))
    
        println("chengdu_houseDF:"+DF.count())
        println("hangzhou_houseDF2:"+DF2.count())
        println("shanghai_houseDF3:"+DF3.count())
        println("kunming_houseDF4:"+DF4.count())
        println("AllDF5:"+DF5.count())
    
        val DF6 = DF5
          .drop("url")
          .drop("publishday")
          .filter(DF.col("region").isNotNull)
          .filter(DF.col("unit_price").isNotNull)
          //      .filter(DF.col("visited").isNotNull)
          //      .filter(DF.col("attention").isNotNull)
    //      .filter(size(split(col("house_info") ,"\|")) === 6)
          .withColumn("rooms", split(col("house_info"), "\|").getItem(1).substr(2,1))
          .withColumn("halls", split(col("house_info"), "\|").getItem(1).substr(4,1))
          .withColumn("towards", split(col("house_info"), "\|").getItem(3))
          .withColumn("area", split(col("house_info"), "\|").getItem(2))
          .withColumn("decoration", split(col("house_info"), "\|").getItem(4))
          .withColumn("have_elevator", split(col("house_info"), "\|").getItem(5))
          .drop("house_info")
    
        val DF7 = DF6.select("region","rooms","halls","towards","decoration","have_elevator","visited"
          ,"attention","unit_price","area","total_price"
        )
    
        DF7.show(10,false)
        println(DF7.count())
        DF7.coalesce(1).write.option("header", "true").mode(SaveMode.Overwrite).csv("f:\data\logout\")
        spark.stop()
      }
    //  private def NoNull(: SparkSession) = {
    //    AllDF.col("").isNotNull
    //  }
    
    
    }
    
    
  • 相关阅读:
    1008 Elevator
    mysql---时间类型详解
    mysql导入导出
    mysql不能启动报error2013错误的解决办法
    mysql总结
    mysql安装图解
    Access连接数据源配置(新手必知)
    eclipse建包的一些细节
    数据库操作(存着用来复制省的每次写)
    (工具类)MD5算法|时间格式转换|字符串转数字
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614754.html
Copyright © 2011-2022 走看看