SparkSQL 简介
SparkSQL 是一个即支持 SQL 又支持命令式数据处理的工具
SparkSQL 不只是一个 SQL 引擎, SparkSQL 也包含了一套对 结构化数据的命令式 API, 事实上, 所有 Spark 中常见的工具, 都是依赖和依照于 SparkSQL 的 API 设计的
发展历程

解决的问题
Spark SQL使用Hive解析SQL生成AST语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖Hive- 执行计划和优化交给优化器
Catalyst - 内建了一套简单的
SQL解析器, 可以不使用HQL, 此外, 还引入和DataFrame这样的DSL API, 完全可以不依赖任何Hive的组件 Shark只能查询文件,Spark SQL可以直接降查询作用于RDD, 这一点是一个大进步
适用场景
| 定义 | 特点 | 举例 | |
|---|---|---|---|
| 结构化数据 | 有固定的 Schema |
有预定义的 Schema |
关系型数据库的表 |
| **半结构化数据 ** | 没有固定的 Schema, 但是有结构 |
没有固定的 Schema, 有结构信息, 数据一般是自描述的 |
指一些有结构的文件格式, 例如 JSON |
| 非结构化数据 | 没有固定 Schema, 也没有结构 |
没有固定 Schema, 也没有结构 |
指文档图片之类的格式 |
SparkSQL 主要用于处理 结构化数据
SparkSQL 初体验
命令式 API 的入门案例
@Test
def dsIntro(): Unit = {
val spark = new sql.SparkSession.Builder()
.appName("ds_intro")
.master("local[2]")
.getOrCreate()
import spark.implicits._
var sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三",18),Person("李四",10)))
var personDS = sourceRDD.toDS()
var resultDS = personDS.where('age>10)
.where('age<20)
.select('name)
.as[String]
resultDS.show()
}
SparkSQL 中有一个新的入口点, 叫做 SparkSession
SparkSQL 中有一个新的类型叫做 Dataset
SparkSQL 有能力直接通过字段名访问数据集, 说明 SparkSQL 的 API 中是携带 Schema 信息的
SparkSession
-
SparkContext 作为 RDD 的创建者和入口, 其主要作用有如下两点
- 创建
RDD, 主要是通过读取文件创建RDD - 监控和调度任务, 包含了一系列组件, 例如
DAGScheduler,TaskSheduler
- 创建
-
为什么无法使用
SparkContext作为SparkSQL的入口?SparkContext在读取文件的时候, 是不包含Schema信息的, 因为读取出来的是RDDSparkContext在整合数据源如Cassandra,JSON,Parquet等的时候是不灵活的, 而DataFrame和Dataset一开始的设计目标就是要支持更多的数据源SparkContext的调度方式是直接调度RDD, 但是一般情况下针对结构化数据的访问, 会先通过优化器优化一下
所以
SparkContext确实已经不适合作为SparkSQL的入口, 所以刚开始的时候Spark团队为SparkSQL设计了两个入口点, 一个是SQLContext对应Spark标准的SQL执行, 另外一个是HiveContext对应HiveSQL的执行和Hive的支持.在
Spark 2.0的时候, 为了解决入口点不统一的问题, 创建了一个新的入口点SparkSession, 作为整个Spark生态工具的统一入口点, 包括了SQLContext,HiveContext,SparkContext等组件的功能 -
新的入口应该有什么特性?
- 能够整合
SQLContext,HiveContext,SparkContext,StreamingContext等不同的入口点 - 为了支持更多的数据源, 应该完善读取和写入体系
- 同时对于原来的入口点也不能放弃, 要向下兼容
- 能够整合
Catalyst 优化器
RDD 和 SparkSQL 运行时的区别
-
RDD的运行流程
大致运行步骤先将 RDD解析为由Stage组成的DAG, 后将Stage转为Task直接运行问题任务会按照代码所示运行, 依赖开发者的优化, 开发者的会在很大程度上影响运行效率解决办法创建一个组件, 帮助开发者修改和优化代码, 但是这在RDD上是无法实现的 -
为什么
RDD无法自我优化?RDD没有Schema信息RDD可以同时处理结构化和非结构化的数据 -
SparkSQL提供了什么?
和 RDD不同,SparkSQL的Dataset和SQL并不是直接生成计划交给集群执行, 而是经过了一个叫做Catalyst的优化器, 这个优化器能够自动帮助开发者优化代码也就是说, 在SparkSQL中, 开发者的代码即使不够优化, 也会被优化为相对较好的形式去执行为什么SparkSQL提供了这种能力?首先,SparkSQL大部分情况用于处理结构化数据和半结构化数据, 所以SparkSQL可以获知数据的Schema, 从而根据其Schema来进行优化
Catalyst
为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst, 整个 SparkSQL 的架构大致如下
API层简单的说就是Spark会通过一些API接受SQL语句- 收到
SQL语句以后, 将其交给Catalyst,Catalyst负责解析SQL, 生成执行计划等 Catalyst的输出应该是RDD的执行计划- 最终交由集群运行

-
Step 1 : 解析
SQL, 并且生成AST(抽象语法树)
-
Step 2 : 在
AST中加入元数据信息, 做这一步主要是为了一些优化, 例如col = col这样的条件, 下图是一个简略图, 便于理解
score.id → id#1#L为score.id生成id为 1, 类型是 `Long``- ``score.math_score → math_score#2#L
为score.math_score生成id为 2, 类型为Long` - people.id → id#3#L
为people.id生成id为 3, 类型为Long - ``people.age → age#4#L
为people.age生成id为 4, 类型为Long`
-
Step 3 : 对已经加入元数据的
AST, 输入优化器, 进行优化, 从两种常见的优化开始, 简单介绍
- 谓词下推
Predicate Pushdown, 将Filter这种可以减小数据集的操作下推, 放在Scan的位置, 这样可以减少操作时候的数据量
- 列值裁剪
Column Pruning, 在谓词下推后,people表之上的操作只用到了id列, 所以可以把其它列裁剪掉, 这样可以减少处理的数据量, 从而优化处理速度 - 还有其余很多优化点, 大概一共有一二百种, 随着
SparkSQL的发展, 还会越来越多, 感兴趣的同学可以继续通过源码了解, 源码在org.apache.spark.sql.catalyst.optimizer.Optimizer
- 谓词下推
-
Step 4 : 上面的过程生成的
AST其实最终还没办法直接运行, 这个AST叫做逻辑计划, 结束后, 需要生成物理计划, 从而生成RDD来运行- 在生成
物理计划的时候, 会经过成本模型对整棵树再次执行优化, 选择一个更好的计划 - 在生成
物理计划以后, 因为考虑到性能, 所以会使用代码生成, 在机器中运行
可以使用
queryExecution方法查看逻辑执行计划, 使用explain方法查看物理执行计划 - 在生成
Catalyst 的主要运作原理是分为三步, 先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD 的形式运行
Dataset 的特点
@Test
def dataset1() :Unit = {
//1. 创建SparkSession
val spark = new sql.SparkSession.Builder()
.master("local[2]")
.appName(this.getClass.getName)
.getOrCreate()
//2. 导入隐式转换
import spark.implicits._
//3. 演示
var sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三",18),Person("李四",10)))
val dataset: Dataset[Person] = sourceRDD.toDS()
// Dataset支持强类型的API
dataset.filter(item => item.age>10).show()
// Dataset支持强类型的API
dataset.filter('age>10).show()
dataset.filter($"age">10).show()
dataset.filter("age>10").show()
spark.stop()
}
Dataset是一个新的Spark组件, 其底层还是RDDDataset提供了访问对象中某个特定字段的能力, 不用像RDD一样每次都要针对整个对象做操作Dataset和RDD不同, 如果想把Dataset[T]转为RDD[T], 则需要对Dataset底层的InternalRow做转换, 是一个比价重量级的操作- Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询
API和类似RDD一样的命令式API - ``Dataset
具有RDD的方便, 同时也具有DataFrame的性能优势, 并且Dataset` 还是强类型的, 能做到类型安全.
DataFrame 的作用和常见操作
DataFrame是什么
DataFrame 是 SparkSQL 中一个表示关系型数据库中 表 的函数式抽象, 其作用是让 Spark 处理大规模结构化数据的时候更加容易. 一般 DataFrame 可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema 信息. 也就是说 DataFrame 中有 Schema 信息, 可以像操作表一样操作 DataFrame.

DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

创建DataFrame
通过隐式转换创建
@Test
def datafream1: Unit = {
val spark =new SparkSession.Builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()
// 必须要导入隐式转换
// 注意: spark 在此处不是包, 而是 SparkSession 对象
import spark.implicits._
val list = Seq(Person("张三",3),Person("lisi",18))
val df = Seq(Person("张三",3),Person("lisi",18)).toDF()
val df2 = spark.sparkContext.parallelize(list).toDF()
}

创建 DataFrame 的时候, 不仅可以包含样例类, 也可以只有普通数据类型, 后通过指定列名来创建
val df2: DataFrame = Seq(("a", 1), ("b", 1)).toDF("word", "count")
通过外部集合创建
@Test
def datafream2: Unit = {
val spark =new SparkSession.Builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()
import spark.implicits._
val df = spark.read
.csv("dataset/BeijingPM20100101_20151231_noheader.csv")
}
其他方式
val list = Seq(Person("张三",3),Person("lisi",18))
val df = spark.createDataFrame(list)
注意:不仅仅可以从 csv 文件创建 DataFrame, 还可以从 Table, JSON, Parquet 等中创建 DataFrame
常见操作
DataFrame 支持 SQL 中常见的操作, 例如: select, filter, join, group, sort, join 等
-
首先可以打印
DataFrame的Schema, 查看其中所包含的列, 以及列的类型val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate() val df = spark.read .option("header", true) .csv("dataset/BeijingPM20100101_20151231.csv") df.printSchema() -
对于大部分计算来说, 可能不会使用所有的列, 所以可以选择其中某些重要的列
df.select('year, 'month, 'PM_Dongsi) -
可以针对某些列进行分组, 后对每组数据通过函数做聚合
df.select('year, 'month, 'PM_Dongsi) .where('PM_Dongsi =!= "Na") .groupBy('year, 'month) .count() .show() -
使用
SQL操作DataFrame//先将 DataFrame 注册为一张临时表 df.createOrReplaceTempView("temp_table") spark.sql("select year,month,count(PM_Dongsi) from temp_table where PM_Dongsi !='NA' group by year,month").show()
总结
DataFrame是一个类似于关系型数据库表的函数式组件DataFrame一般处理结构化数据和半结构化数据DataFrame具有数据对象的 Schema 信息- 可以使用命令式的
API操作DataFrame, 同时也可以使用SQL操作DataFrame DataFrame可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建
Dataset 和 DataFrame 的异同
DataFrame 和 Dataset 所表达的语义不同
-
第一点:
DataFrame表达的含义是一个支持函数式操作的表, 而Dataset表达是是一个类似RDD的东西,Dataset可以处理任何对象 -
第二点:
DataFrame中所存放的是Row对象, 而Dataset中可以存放任何类型的对象@Test def datafream4: Unit = { val spark =new SparkSession.Builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() import spark.implicits._ val list = Seq(Person("张三",3),Person("lisi",18)) val df: DataFrame = list.toDF() val ds: Dataset[Person] = list.toDS() spark.stop() }DataFrame就是Dataset[Row]Dataset的范型可以是任意类型
-
DataFrame的操作方式和Dataset是一样的, 但是对于强类型操作而言, 它们处理的类型不同DataFrame在进行强类型操作时候, 例如map算子, 其所处理的数据类型永远是 `Rowdf.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 10) )(RowEncoder.apply(df.schema)).show()但是对于
Dataset来讲, 其中是什么类型, 它就处理什么类型ds.map( (item: People) => People(item.name, item.age * 10) ).show() -
DataFrame只能做到运行时类型检查,Dataset能做到编译和运行时都有类型检查DataFrame中存放的数据以Row表示, 一个Row代表一行数据, 这和关系型数据库类似DataFrame在进行map等操作的时候,DataFrame不能直接使用Person这样的Scala对象, 所以无法做到编译时检查Dataset表示的具体的某一类对象, 例如Person, 所以再进行map等操作的时候, 传入的是具体的某个Scala对象, 如果调用错了方法, 编译时就会被检查出来
ROW是什么
Row 对象表示的是一个行
Row 的操作类似于 Scala 中的 Map 数据类型
// 一个对象就是一个对象
val p = Person("张三",18)
// 同样一个对象, 还可以通过一个 Row 对象来表示
val row = Row("张三",18)
// 获取 Row 中的内容
println(row.get(1))
println(row(1))
// 获取时可以指定类型
println(row.getString(1))
println(row.getAs[Int](1))
// 同时 Row 也是一个样例类, 可以进行 match
row match {
case Row(name, age) => println(name, age)
}
两者之间的转化
DataFrame 和 Dataset 之间可以非常简单的相互转换
val spark: SparkSession = new sql.SparkSession.Builder()
.appName(this.getClass.getName)
.master("local[6]")
.getOrCreate()
import spark.implicits._
val df: DataFrame = Seq(People("张三", 15), People("李四", 15)).toDF()
val ds_fdf: Dataset[People] = df.as[People]
val ds: Dataset[People] = Seq(People("张三", 15), People("李四", 15)).toDS()
val df_fds: DataFrame = ds.toDF()
总结
DataFrame就是Dataset, 他们的方式是一样的, 也都支持API和SQL两种操作方式DataFrame只能通过表达式的形式, 或者列的形式来访问数据, 只有Dataset支持针对于整个对象的操作DataFrame中的数据表示为Row, 是一个行的概念