zoukankan      html  css  js  c++  java
  • csv数据文件清洗【DataFrame】

    package march.sql
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.functions._
    
    /**
      * Description: TODO
      *
      * @Author: 留歌36
      * @Date: 2019/3/6 8:57
      */
    object ChengduHouseAPP {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
        // 隐式转换
        val path = "f:\data\chengdu_house.csv"
        val path2 = "f:\data\hangzhou_house.csv"
        val path3 = "f:\data\shanghai_house.csv"
        val path4 = "f:\data\kunming_house.csv"
        val DF = spark.read.option("header","true").option("inferSchema","true").csv(path)
        val DF2 = spark.read.option("header","true").option("inferSchema","true").csv(path2)
        val DF3 = spark.read.option("header","true").option("inferSchema","true").csv(path3)
        val DF4 = spark.read.option("header","true").option("inferSchema","true").csv(path4)
    
        val DF5 = DF
          .union(DF2.select("house_info","region","publishday","visited","attention","total_price","unit_price","url"))
          .union(DF3.select("house_info","region","publishday","visited","attention","total_price","unit_price","url"))
          .union(DF4.select("house_info","region","publishday","visited","attention","total_price","unit_price","url"))
    
        println("chengdu_houseDF:"+DF.count())
        println("hangzhou_houseDF2:"+DF2.count())
        println("shanghai_houseDF3:"+DF3.count())
        println("kunming_houseDF4:"+DF4.count())
        println("AllDF5:"+DF5.count())
    
        val DF6 = DF5
          .drop("url")
          .drop("publishday")
          .filter(DF.col("region").isNotNull)
          .filter(DF.col("unit_price").isNotNull)
          //      .filter(DF.col("visited").isNotNull)
          //      .filter(DF.col("attention").isNotNull)
    //      .filter(size(split(col("house_info") ,"\|")) === 6)
          .withColumn("rooms", split(col("house_info"), "\|").getItem(1).substr(2,1))
          .withColumn("halls", split(col("house_info"), "\|").getItem(1).substr(4,1))
          .withColumn("towards", split(col("house_info"), "\|").getItem(3))
          .withColumn("area", split(col("house_info"), "\|").getItem(2))
          .withColumn("decoration", split(col("house_info"), "\|").getItem(4))
          .withColumn("have_elevator", split(col("house_info"), "\|").getItem(5))
          .drop("house_info")
    
        val DF7 = DF6.select("region","rooms","halls","towards","decoration","have_elevator","visited"
          ,"attention","unit_price","area","total_price"
        )
    
        DF7.show(10,false)
        println(DF7.count())
        DF7.coalesce(1).write.option("header", "true").mode(SaveMode.Overwrite).csv("f:\data\logout\")
        spark.stop()
      }
    //  private def NoNull(: SparkSession) = {
    //    AllDF.col("").isNotNull
    //  }
    
    
    }
    
    
  • 相关阅读:
    VS2017 无法连接到Web服务器 IIS Express ,IIS Express可以启动,但是无法连接
    ADO.Net实体数据模型添加DB-First/Code First报错
    VS2017+EF6+MySQL8.0配置(.Net Framework 4.5)
    C#对象、List<>转DataTable
    ubuntu 16 安装django nginx uWSGI
    iOS真机测试could not find developer disk image
    iOS App 不支持http协议 App Transport Security has blocked a cleartext HTTP (http://)
    Objective-C Mojo和Django 对接
    第一课 ionic 日志输出
    ionic 使用sqlite
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614754.html
Copyright © 2011-2022 走看看