zoukankan      html  css  js  c++  java
  • Spark2 加载保存文件,数据文件转换成数据框dataframe

    hadoop fs -put /home/wangxiao/data/ml/Affairs.csv /datafile/wangxiao/

    hadoop fs -ls -R /datafile
    drwxr-xr-x - wangxiao supergroup 0 2016-10-15 10:46 /datafile/wangxiao
    -rw-r--r-- 3 wangxiao supergroup 16755 2016-10-15 10:46 /datafile/wangxiao/Affairs.csv
    -rw-r--r-- 3 wangxiao supergroup 16755 2016-10-13 21:48 /datafile/wangxiao/Affairs.txt


    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    
    object ML1 {
    def main(args: Array[String]) {
    
    val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    
    // 创建数据框
    // val data1:DataFrame=spark.read.csv("hdfs://ns1/datafile/wangxiao/Affairs.csv")
    
    val data1: DataFrame = spark.read.format("csv").load("hdfs://ns1/datafile/wangxiao/Affairs.csv")
    
    val df = data1.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
    
    df.printSchema()
    
    //##############################################
    // 指定字段名和字段类型
    case class Affairs(affairs: Int, gender: String, age: Int,
    yearsmarried: Double, children: String, religiousness: Int,
    education: Double, occupation: Double, rating: Int)
    
    val res1 = data1.rdd.map { r =>
    Affairs(r(0).toString().toInt, r(1).toString(), r(2).toString().toInt,
    r(3).toString().toDouble, r(4).toString(), r(5).toString().toInt,
    r(6).toString().toDouble, r(7).toString().toDouble, r(8).toString().toInt)
    }.toDF()
    
    res1.printSchema()
    
    //################################################
    //创建RDD
    val data2: RDD[String] = spark.sparkContext.textFile("hdfs://ns1/datafile/wangxiao/Affairs.txt")
    
    case class Affairs1(affairs: Int, gender: String, age: Int,
    yearsmarried: Double, children: String, religiousness: Int,
    education: Double, occupation: Double, rating: Int)
    
    // RDD转换成数据框
    val res2 = data2.map { _.split(" ") }.map { line =>
    Affairs1(line(0).toInt, line(1).trim.toString(), line(2).toInt,
    line(3).toDouble, line(4).trim.toString(), line(5).toInt,
    line(6).toDouble, line(7).toDouble, line(8).toInt)
    }.toDF()
    
    //###############################################
    // 创建视图
    df.createOrReplaceTempView("Affairs")
    
    // 子查询
    //val df1 = spark.sql("SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25")
    val df1 = spark.sql("select gender, age,rating from ( SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25 ) t ")
    
    df1.show
    
    // 保存数据框到文件
    df.select("gender", "age", "education").write.format("csv").save("hdfs://ns1/datafile/wangxiao/data123.csv")
    
    // 请务必保证jar包运行完成,退出spark,释放资源
    spark.stop
    }
    }
    

      

    hadoop fs -ls -R /datafile
    drwxr-xr-x -  wangxiao supergroup 0 2016-10-15 11:43         /datafile/wangxiao
    -rw-r--r-- 3   wangxiao supergroup 16755 2016-10-15 10:46  /datafile/wangxiao/Affairs.csv
    -rw-r--r-- 3   wangxiao supergroup 16755 2016-10-13 21:48  /datafile/wangxiao/Affairs.txt
    drwxr-xr-x -  wangxiao supergroup 0 2016-10-15 11:43        /datafile/wangxiao/data123.csv

  • 相关阅读:
    Visual Studio 连接不上NuGet 官方程序包源的解决办法
    Microsoft Toolkit 2.5下载 – 一键激活Windows 8.1/2012 R2/Office 2013
    Intel® Ethernet Connection I217-V 网卡驱动(win10 ,2012)
    flashfxp3.41中文版注册码:(适合最新版本)
    Fiddler对安卓应用手机抓包图文教程
    网址检查器1.0
    Win10打不开chm文件的解决办法
    Android HTTP通讯
    对Android开发者有益的40条优化建议
    IIS下PHP的三种配置方式比较
  • 原文地址:https://www.cnblogs.com/wwxbi/p/6014276.html
Copyright © 2011-2022 走看看