1、DataFrameReader
SparkSQL 的一个非常重要的目标就是完善数据读取,所以SparkSQL 中增加了一个新的框架,专门用于读取外部数据源,叫做DataFrameReader。
例如:通过代码 val reader: DataFrameReader = spark.read 可以看出 spark.read 的框架类型
DataFrameReader
由如下几个组件组成
组件 | 解释 |
---|---|
|
结构信息, 因为 |
|
连接外部数据源的参数, 例如 |
|
外部数据源的格式, 例如 |
DataFrameReader
有两种访问方式, 一种是使用 load
方法加载, 使用 format
指定加载格式, 还有一种是使用封装方法, 类似 csv
, json
, jdbc
等
/** * 初体验 Reader: Reader 读取的两种方式 * 1. load方法加载 * 2. 封装方法 */ @Test def reader2(): Unit ={ val spark = SparkSession.builder().master("local[6]").appName("reader2").getOrCreate() // 第一种形式 spark.read .format("csv") // 告诉程序是一个 csv 文件 .option("header", value = true) // 告诉程序文件数据第一行是一个 header .option("inferSchema", value = true) // 告诉程序根据文件内容推断数据字段的类型 .load("../dataset/BeijingPM20100101_20151231.csv") .show() // 第二种形式 spark.read .option("header", value = true) .option("inferSchema", value = true) .csv("../dataset/BeijingPM20100101_20151231.csv") .show() }
2、DataFrameWriter
DataFrameWriter
中由如下几个部分组成
组件 | 解释 |
---|---|
|
写入目标, 文件格式等, 通过 |
|
写入模式, 例如一张表已经存在, 如果通过 |
|
外部参数, 例如 |
|
类似 |
|
类似 |
|
用于排序的列, 通过 |
mode
指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等
Scala 对象表示 | 字符串表示 | 解释 |
---|---|---|
|
|
将 |
|
|
将 |
|
|
将 |
|
|
将
|
DataFrameWriter
也有两种使用方式, 一种是使用 format
配合 save
, 还有一种是使用封装方法, 例如 csv
, json
, saveAsTable
等
例如:读取文件并以json格式存储。
val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate() val df = spark.read.option("header", true).csv("../dataset/BeijingPM20100101_20151231.csv") df.write.json("../dataset/Beijing_PM1") df.write.format("json").save("../dataset/Beijing_PM2")
3、Parquet
什么时候会用到 Parquet ?
在ETL中,Spark经常扮演T的职务,也就是进行数据清洗和数据转换.
为了能够保存比较复杂的数据,并且保证性能和压缩率,通常使用Parquet是一个比较不错的选择.
所以外部系统收集过来的数据,有可能会使用Parquet ,而 Spark 进行读取和转换的时候,就需要支持对Parquet格式的文件的支持.
读写 Parquet 格式的文件
通过读取一个csv文件,实现写入 parquet 格式文件,并读取该 parquet 文件。
// 读取 csv 文件 val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate() val df = spark.read.option("header", true).csv("../dataset/BeijingPM20100101_20151231.csv") // 把数据写为 parquet 格式 df.write .format("parquet") // 也可以不指定,默认就是 parquet .mode(SaveMode.Overwrite) // 指定写入方式为 overwrite(覆盖),还有Ignore(如果有文件,什么都不做) Append(追加) ... .save("../dataset/Beijing_PM3") // 读取 parquet 格式的文件 spark.read.load("../dataset/Beijing_PM3") // 默认读取的格式是 parquet, 并且可以直接指定文件夹 .show()
可以知道,不指定format的读写默认都为parquet格式。
4、分区
写入 Parquet
的时候可以指定分区,分区后文件夹名上带有分区信息,例如 year=2010,month=1 等等
Spark
在写入文件的时候是支持分区的, 可以像 Hive
一样设置某个列为分区列。Hive 很重要,Spark 经常要和 Hive 配合,所以 Spark 支持分区的读写。
// 1. 读写数据 val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate() val df = spark.read.option("header", value = true).csv("../dataset/BeijingPM20100101_20151231.csv") // 2. 写文件,指定进行分区 df.write .partitionBy("year", "month") // 按照 year 和 month 分区 .save("../dataset/Beijing_PM4") // 3. 读文件 spark.read // .parquet("../dataset/Beijing_PM4/year=2010/month=1") // 直接通过文件进行读取,分区信息会丢失 .parquet("../dataset/Beijing_PM4") // 自动发现分区 .printSchema() // 打印schema信息
5、JSON
在ETL 中,Spark经常扮演T的职务,也就是进行数据清洗和数据转换.
在业务系统中,JSON是一个非常常见的数据格式,在前后端交互的时候也往往会使用JSON ,所以从业务系统获取的数据很大可能性是使用JSON格式,所以就需要Spark能够支持JSON格式文件的读取
在 Spark 中对JSON 的读写很容易。
JSON 读写
df.write.json("dataset/beijing_pm5.json")
spark.read.json("dataset/beijing_pm5.json").show()
需要注意,这种方式写入的JSON文件,并不是完整意义的 json 文件,它是以每行作为一个单独的 JSON 格式的,也称为 JSON 行文件。
JSON 格式
df.toJSON.show() // 将读取的csv文件内容直接转换为 JSON 格式 // Spark 可以从一个保存了 JSON 格式字符串的 Dataset[String] 中读取 JSON 信息, 转为 DataFrame //直接从rdd读取json的DataFrame val jsonRdd = df.toJSON.rdd spark.read.json(jsonRdd).show()
6、连接 MySQL
package com.thorine import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType} /** * MySQL 的操作方式有两种: 使用本地 和 提交到集群上运行 * 写入MySQL数据时,使用本地运行,读取的时候使用集群 */ object MySQLWrite { def main(args: Array[String]): Unit = { // 创建 SparkSession 对象 val spark = SparkSession.builder().master("local[6]").appName("MySQLWrite").getOrCreate() // 读取数据创建 DataFrame // 1. 创建 schema 结构信息 val schema = StructType( List( StructField("name", StringType), StructField("age", IntegerType), StructField("gpa", FloatType) ) ) // 2. 文件读取 val df = spark.read .schema(schema) .option("delimiter", " ") // 指定分隔符 .csv("dataset/studenttab10k") // 处理数据 val resultDF = df.where(" age < 30 ") // 落地数据到 MySQL resultDF.write .format("jdbc") .option("url","jdbc:mysql://bigdata3:3306/spark01") .option("dbtable","student") .option("user","spark01") .option("password", "spark01") .mode(SaveMode.Overwrite) .save() } }
SparkSQL
中并没有直接提供按照 SQL
进行筛选读取数据的 API
和参数, 但是可以通过 dbtable
来曲线救国, dbtable
指定目标表的名称, 但是因为 dbtable
中可以编写 SQL
, 所以使用子查询即可做到
属性 | 含义 |
---|---|
|
指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 |
|
确定步长的参数, |
|
分区数量 |
spark.read.format("jdbc") .option("url", "jdbc:mysql://node01:3306/spark_test") .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu") .option("user", "spark") .option("password", "Spark123!") .option("partitionColumn", "age") .option("lowerBound", 1) .option("upperBound", 60) .option("numPartitions", 10) .load() .show()