Spark读取数据API
spark.read.format("json").load(path)
spark.read.format("text").load(path)
spark.read.format("parquet").load(path)
spark.read.format("json").option("...","...").load(path)
实例
package com.imooc.bigdata.chapter05
import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
object DataSourceApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local").getOrCreate()
text(spark)
json(spark)
common(spark)
parquet(spark)
convert(spark)
jdbc(spark)
jdbc2(spark)
spark.stop()
}
// 代码打包,提交到YARN或者Standalone集群上去,注意driver的使用
def jdbc2(spark:SparkSession): Unit = {
import spark.implicits._
val config = ConfigFactory.load()
val url = config.getString("db.default.url")
val user = config.getString("db.default.user")
val password = config.getString("db.default.password")
val driver = config.getString("db.default.driver")
val database = config.getString("db.default.database")
val table = config.getString("db.default.table")
val sinkTable = config.getString("db.default.sink.table")
val connectionProperties = new Properties()
connectionProperties.put("user", user)
connectionProperties.put("password", password)
val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)
jdbcDF.filter($"game_top" > 100).show() //.write.jdbc(url, s"$database.$sinkTable", connectionProperties)
}
/**
* 有些数据是在MySQL,如果使用Spark处理,肯定需要通过Spark读取出来MySQL的数据
* 数据源是text/json,通过Spark处理完之后,我们要将统计结果写入到MySQL
*/
def jdbc(spark:SparkSession): Unit = {
import spark.implicits._
// val jdbcDF = spark.read
// .format("jdbc")
// .option("url", "jdbc:mysql://10.133.3.10:3306")
// .option("dbtable", "spark.browser_stat")
// .option("user", "root")
// .option("password", "root")
// .load()
//
// jdbcDF.filter($"cnt" > 100).show(100)
// 死去活来法
val url = "jdbc:mysql://10.133.3.10:3306"
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
val jdbcDF: DataFrame = spark.read
.jdbc(url, "spark.taptap", connectionProperties)
jdbcDF.filter($"game_top" > 100)
.write.jdbc(url, "spark.taptaps", connectionProperties)
}
// 存储类型转换:JSON==>Parquet
def convert(spark:SparkSession): Unit = {
import spark.implicits._
val jsonDF: DataFrame = spark.read.format("json").load("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.json")
// jsonDF.show()
jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out")
spark.read.parquet("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\out").show()
}
// Parquet数据源
def parquet(spark:SparkSession): Unit = {
import spark.implicits._
val parquetDF: DataFrame = spark.read.parquet("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\users.parquet")
parquetDF.printSchema()
parquetDF.show()
// parquetDF.select("name","favorite_numbers")
// .write.mode("overwrite")
// .option("compression","none")
// .parquet("out")
// spark.read.parquet("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\out").show()
}
// 标准API写法
def common(spark:SparkSession): Unit = {
import spark.implicits._
// 源码面前 了无秘密
// val textDF: DataFrame = spark.read.format("text").load("file:///Users/rocky/IdeaProjects/imooc-workspace/sparksql-train/data/people.txt")
val jsonDF: DataFrame = spark.read.format("json").load("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.json")
//
// textDF.show()
// println("~~~~~~~~")
// jsonDF.show()
jsonDF.write.format("json").mode("overwrite").save("out")
}
// JSON
def json(spark:SparkSession): Unit = {
import spark.implicits._
val jsonDF: DataFrame = spark.read.json("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.json")
//jsonDF.show()
// TODO... 只要age>20的数据
//jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out")
val jsonDF2: DataFrame = spark.read.json("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people2.json")
jsonDF2.select($"name",$"age",$"info.work".as("work"), $"info.home".as("home")).write.mode("overwrite").json("out")
}
// 文本
def text(spark:SparkSession): Unit = {
import spark.implicits._
val textDF: DataFrame = spark.read.text("E:\06-work\03-java\01-JavaCodeDome\SparkSqlCode\sparksql-train\data\people.txt")
// textDF.show()
val result: Dataset[(String)] = textDF.map(x => {
val splits: Array[String] = x.getString(0).split(",")
(splits(0).trim) //, splits(1).trim
})
// SaveMode.Append:保留上次结果新增 overwrite:先删除上次结果,然后在添加这次结果
result.write.mode("overwrite").text("out") // 如果才能支持使用text方式输出多列的值呢?
}
}