zoukankan      html  css  js  c++  java
  • Spark 常用的读取数据api

    Spark读取数据API

    spark.read.format("json").load(path)
    spark.read.format("text").load(path)
    spark.read.format("parquet").load(path)
    spark.read.format("json").option("...","...").load(path)
    

    实例

    package com.imooc.bigdata.chapter05
    
    import java.util.Properties
    
    import com.typesafe.config.ConfigFactory
    import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
    
    object DataSourceApp {
    
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
          .master("local").getOrCreate()
    
         text(spark)
         json(spark)
         common(spark)
         parquet(spark)
    
         convert(spark)
    
         jdbc(spark)
        jdbc2(spark)
        spark.stop()
      }
    
      // 代码打包,提交到YARN或者Standalone集群上去,注意driver的使用
      def jdbc2(spark:SparkSession): Unit = {
        import spark.implicits._
    
        val config = ConfigFactory.load()
        val url = config.getString("db.default.url")
        val user = config.getString("db.default.user")
        val password = config.getString("db.default.password")
        val driver = config.getString("db.default.driver")
        val database = config.getString("db.default.database")
        val table = config.getString("db.default.table")
        val sinkTable = config.getString("db.default.sink.table")
    
        val connectionProperties = new Properties()
        connectionProperties.put("user", user)
        connectionProperties.put("password", password)
    
        val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)
    
        jdbcDF.filter($"game_top" > 100).show() //.write.jdbc(url, s"$database.$sinkTable", connectionProperties)
      }
      /**
        * 有些数据是在MySQL,如果使用Spark处理,肯定需要通过Spark读取出来MySQL的数据
        * 数据源是text/json,通过Spark处理完之后,我们要将统计结果写入到MySQL
        */
      def jdbc(spark:SparkSession): Unit = {
        import spark.implicits._
    
    //    val jdbcDF = spark.read
    //      .format("jdbc")
    //      .option("url", "jdbc:mysql://10.133.3.10:3306")
    //      .option("dbtable", "spark.browser_stat")
    //      .option("user", "root")
    //      .option("password", "root")
    //      .load()
    //
    //    jdbcDF.filter($"cnt" > 100).show(100)
    
        // 死去活来法
    
        val url = "jdbc:mysql://10.133.3.10:3306"
        val connectionProperties = new Properties()
        connectionProperties.put("user", "root")
        connectionProperties.put("password", "root")
    
        val jdbcDF: DataFrame = spark.read
          .jdbc(url, "spark.taptap", connectionProperties)
    
        jdbcDF.filter($"game_top" > 100)
          .write.jdbc(url, "spark.taptaps", connectionProperties)
      }
    
      // 存储类型转换:JSON==>Parquet
      def convert(spark:SparkSession): Unit = {
        import spark.implicits._
    
        val jsonDF: DataFrame = spark.read.format("json").load("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.json")
    //    jsonDF.show()
    
        jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out")
    
        spark.read.parquet("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\out").show()
    
      }
    
      // Parquet数据源
      def parquet(spark:SparkSession): Unit = {
        import spark.implicits._
    
        val parquetDF: DataFrame = spark.read.parquet("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\users.parquet")
        parquetDF.printSchema()
        parquetDF.show()
    
    //    parquetDF.select("name","favorite_numbers")
    //      .write.mode("overwrite")
    //        .option("compression","none")
    //      .parquet("out")
    
    //    spark.read.parquet("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\out").show()
      }
    
      // 标准API写法
      def common(spark:SparkSession): Unit = {
        import spark.implicits._
    
        // 源码面前 了无秘密
    //    val textDF: DataFrame = spark.read.format("text").load("file:///Users/rocky/IdeaProjects/imooc-workspace/sparksql-train/data/people.txt")
        val jsonDF: DataFrame = spark.read.format("json").load("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.json")
    //
    //    textDF.show()
    //    println("~~~~~~~~")
    //    jsonDF.show()
    
        jsonDF.write.format("json").mode("overwrite").save("out")
    
      }
    
        // JSON
      def json(spark:SparkSession): Unit = {
        import spark.implicits._
        val jsonDF: DataFrame = spark.read.json("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.json")
    
        //jsonDF.show()
    
        // TODO... 只要age>20的数据
        //jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out")
    
        val jsonDF2: DataFrame = spark.read.json("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people2.json")
        jsonDF2.select($"name",$"age",$"info.work".as("work"), $"info.home".as("home")).write.mode("overwrite").json("out")
      }
    
      // 文本
      def text(spark:SparkSession): Unit = {
    
        import spark.implicits._
        val textDF: DataFrame = spark.read.text("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.txt")
    
        // textDF.show()
        val result: Dataset[(String)] = textDF.map(x => {
          val splits: Array[String] = x.getString(0).split(",")
          (splits(0).trim) //, splits(1).trim
        })
    
    //     SaveMode.Append:保留上次结果新增   overwrite:先删除上次结果,然后在添加这次结果
        result.write.mode("overwrite").text("out") // 如果才能支持使用text方式输出多列的值呢?
      }
    }
    

      

  • 相关阅读:
    第11组 团队Git现场编程实战
    第11组 团队项目-需求分析报告
    团队项目-选题报告
    第二次结对编程作业
    第11组 团队展示
    第一次结对编程作业
    Nginx学习笔记
    Git学习笔记
    Qt学习笔记
    Eclipse中Outline里各种图标的含义
  • 原文地址:https://www.cnblogs.com/yoyo1216/p/13534566.html
Copyright © 2011-2022 走看看