zoukankan      html  css  js  c++  java
  • 【sparkSQL】创建DataFrame及保存

    首先我们要创建SparkSession

    val spark = SparkSession.builder()
                            .appName("test")
                            .master("local")
                            .getOrCreate()
    import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操作        
    

    然后我们通过SparkSession来创建DataFrame

    1.使用toDF函数创建DataFrame

     通过导入(importing)spark.implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。

     只要这些数据的内容能指定数据类型即可。

    import spark.implicits._
    val df = Seq(
      (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
      (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
    ).toDF("id", "name", "created_time")

    注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"

    可以通过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名

    2.使用createDataFrame函数创建DataFrame

    通过schema + row 来创建

    我们可以通俗的理解为schema为表的表头,row为表的数据记录

    import org.apache.spark.sql.types._
    //定义dataframe的结构的schema
    val schema = StructType(List(
        StructField("id", IntegerType, nullable = false),
        StructField("name", StringType, nullable = true),
        StructField("create_time", DateType, nullable = true)
    ))
    //定义dataframe内容的rdd
    val rdd = sc.parallelize(Seq(
      Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
      Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
    ))
    //创建dataframe
    val df = spark.createDataFrame(rdd, schema)
    

    不过,我们可以把文件结构当做参数来使用,通过rdd自动产生schema和row,不用自己手动生成。

    import org.apache.spark.sql.types._
    
    //传入属性参数
    val schemaString = " id name create_time" 
    //解析参数变成StructField
    val fields = schemaString.split(" ")
                             .map(fieldName => StructField(fieldname, StringType, nullable = true))
    //定义dataframe的结构的schema
    val schema = StructType(fields)
    
    //定义dataframe内容的rdd
    val lines = sc.textFile("file:///people.txt")
    val rdd = lines.spilt(_.split(","))
                   .map(attributes=>ROW(attributes(0),attributes(1).trim) )
    
    //创建dataframe
    val df = spark.createDataFrame(rdd, schema)       

    3.通过反射机制创建DataFrame

    首先要定义一个case class,因为只有case class才能被Spark隐式转化为DataFrame

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._
    //创建匹配类
    case class Person(id:Int,name:String,age:Long)
    //读取文件生成rdd
    val rdd = sc.textFile("file:///")
    //通过匹配类把rdd转化成dataframe
    val df = rdd.map(_.split(","))
                .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()  

    4.通过文件直接创建DataFrame

     (1)使用parquet文件read创建  

    val df = spark.read.parquet("hdfs:/path/to/file")
    

     (2)使用json文件read创建

    val df = spark.read.json("examples/src/main/resources/people.json")
    

     (3)使用csv文件load创建

    val df = spark.read
            .format("com.databricks.spark.csv")
            .option("header", "true") //reading the headers
            .option("mode", "DROPMALFORMED")
            .load("csv/file/path")
    

     (4)使用Hive表创建

    spark.table("test.person") // 库名.表名 的格式
         .registerTempTable("person")  // 注册成临时表
    spark.sql(
          """
            | select *
            | from person
            | limit 10
          """.stripMargin).show()
    

    记得,最后我们要调用spark.stop()来关闭SparkSession。  

    5.保存

     (1)通过df.write.format().save("file:///")保存

      write.format()支持输出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式

      ,save()定义保存的位置

      当我们保存成功后可以在保存位置的目录下看到文件,但是这个文件并不是一个文件而是一个目录。

      里面的内容一般为

      

      不用担心,这是没错的。

      我们读取的时候,并不需要使用文件夹里面的part-xxxx文件,直接读取目录即可。

     (2)通过df.rdd.saveAsTextFile("file:///")转化成rdd再保存

    我们对于不同格式的文件读写来说,我们一般使用两套对应方式

    val df = spark.read.格式("file:///")//读取文件
    df.write.格式("file:///")//保存文件
    val df = spark.read.format("").load("file:///")//读取文件
    df.write.save("file:///")//保存文件
    

    具体read和load方法有什么不同,我还不是很清楚,弄明白了回来补充。

    6.通过JDBC创建DataFrame 

    我们在启动Spark-shell或者提交任务的时候需要添加相应的jar包

    spark-shell(spark-submit)

    --jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar

    --driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar

    val jdbcDf = spark.read.format("jdbc")
        .option("driver", "com.mysql.jdbc.Driver")   //驱动
        .option("url", "jdbc:mysql://ip:3306")  //数据库地址
        .option("dbtable", "db.user_test") //表名:数据库名.表名
        .option("user", "test") //用户名
        .option("password", "123456")  //密码
        .load()
    jdbcDf.show()
  • 相关阅读:
    mysql性能优化
    pymysql模块
    mysql数据表约束
    MySQL数据库
    IO模型
    8451
    8946531
    6783
    256213
    27822
  • 原文地址:https://www.cnblogs.com/zzhangyuhang/p/9040442.html
Copyright © 2011-2022 走看看