SparkSQL简介
SparkSQL 的出现契机
SparkSQL
-
- 解决的问题
-
-
Spark SQL
使用Hive
解析SQL
生成AST
语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖Hive
-
执行计划和优化交给优化器
Catalyst
-
内建了一套简单的
SQL
解析器, 可以不使用HQL
, 此外, 还引入和DataFrame
这样的DSL API
, 完全可以不依赖任何Hive
的组件 -
Shark
只能查询文件,Spark SQL
可以直接降查询作用于RDD
, 这一点是一个大进步
-
- 新的问题
-
对于初期版本的
SparkSQL
, 依然有挺多问题, 例如只能支持SQL
的使用, 不能很好的兼容命令式, 入口不够统一等
Dataset
-
SparkSQL
在 2.0 时代, 增加了一个新的API
, 叫做Dataset
,Dataset
统一和结合了SQL
的访问和命令式API
的使用, 这是一个划时代的进步在
Dataset
中可以轻易的做到使用SQL
查询并且筛选数据, 然后使用命令式API
进行探索式分析
重要性
![]()
|
SparkSQL
是什么SparkSQL
是一个为了支持 SQL
而设计的工具, 但同时也支持命令式的 API
SparkSQL 的适用场景
定义 | 特点 | 举例 | |
---|---|---|---|
结构化数据 |
有固定的 |
有预定义的 |
关系型数据库的表 |
半结构化数据 |
没有固定的 |
没有固定的 |
指一些有结构的文件格式, 例如 |
非结构化数据 |
没有固定 |
没有固定 |
指文档图片之类的格式 |
- 结构化数据
-
一般指数据有固定的
Schema
, 例如在用户表中,name
字段是String
型, 那么每一条数据的name
字段值都可以当作String
来使用+----+--------------+---------------------------+-------+---------+ | id | name | url | alexa | country | +----+--------------+---------------------------+-------+---------+ | 1 | Google | https://www.google.cm/ | 1 | USA | | 2 | 淘宝 | https://www.taobao.com/ | 13 | CN | | 3 | 菜鸟教程 | http://www.runoob.com/ | 4689 | CN | | 4 | 微博 | http://weibo.com/ | 20 | CN | | 5 | Facebook | https://www.facebook.com/ | 3 | USA | +----+--------------+---------------------------+-------+---------+
- 半结构化数据
一般指的是数据没有固定的 Schema
, 但是数据本身是有结构的
{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"phoneNumber":
[
{
"type": "home",
"number": "212 555-1234"
},
{
"type": "fax",
"number": "646 555-4567"
}
]
}
- 没有固定
Schema
-
指的是半结构化数据是没有固定的
Schema
的, 可以理解为没有显式指定Schema
比如说一个用户信息的JSON
文件, 第一条数据的phone_num
有可能是String
, 第二条数据虽说应该也是String
, 但是如果硬要指定为BigInt
, 也是有可能的
因为没有指定Schema
, 没有显式的强制的约束
SparkSQL
处理什么数据的问题?-
-
Spark
的RDD
主要用于处理 非结构化数据 和 半结构化数据 -
SparkSQL
主要用于处理 结构化数据
-
SparkSQL
相较于RDD
的优势在哪?-
-
SparkSQL
提供了更好的外部数据源读写支持-
因为大部分外部数据源是有结构化的, 需要在
RDD
之外有一个新的解决方案, 来整合这些结构化数据源
-
-
SparkSQL
提供了直接访问列的能力-
因为
SparkSQL
主要用做于处理结构化数据, 所以其提供的API
具有一些普通数据库的能力
-
-
SparkSQL
适用于什么场景?SparkSQL
适用于处理结构化数据的场景
SparkSQL 初体验
命令式 API 的入门案例
@Test def dsIntro(): Unit ={ val spark=new SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() import spark.implicits._ val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15), Person("wangwu", 30))) val personDS = sourceRDD.toDS() val result=personDS.where('age>10) .where('age<20) .select('name) .as[String] result.show() }
SparkSQL 中有一个新的入口点, 叫做 SparkSessionSparkSQL 中有一个新的类型叫做 DatasetSparkSQL 有能力直接通过字段名访问数据集, 说明 SparkSQL 的 API 中是携带 Schema 信息的
DataFrame & Dataset
![eca0d2e1e2b5ce678161438d87707b61](https://doc-1256053707.cos.ap-beijing.myqcloud.com/eca0d2e1e2b5ce678161438d87707b61.png)
sparkSQL
对外提供的 API
有两类:SQL
, 另外一类就是命令式. SparkSQL
提供的命令式 API
就是 DataFrame
和 Dataset
, 暂时也可以认为 DataFrame
就是 Dataset
, 只是在不同的 API
中返回的是 Dataset
的不同表现形式// RDD
rdd.map { case Person(id, name, age) => (age, 1) }
.reduceByKey {case ((age, count), (totalAge, totalCount)) => (age, count + totalCount)}
// DataFrame
df.groupBy("age").count("age")
通过上面的代码, 可以清晰的看到, SparkSQL
的命令式操作相比于 RDD
来说, 可以直接通过 Schema
信息来访问其中某个字段, 非常的方便
SQL版本方式
@Test def dfIntro(): Unit ={ val spark=new SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() import spark.implicits._ val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15), Person("wangwu", 30))) val df = sourceRDD.toDF() df.createOrReplaceTempView("person") val result = spark.sql("select name from person where age>10 and age <20") result.show() }
以往使用 SQL
肯定是要有一个表的, 在 Spark
中, 并不存在表的概念, 但是有一个近似的概念, 叫做 DataFrame
, 所以一般情况下要先通过 DataFrame
或者 Dataset
注册一张临时表, 然后使用 SQL
操作这张临时表
SparkSQL
提供了 SQL
和 命令式 API
两种不同的访问结构化数据的形式, 并且它们之间可以无缝的衔接
命令式 API
由一个叫做 Dataset
的组件提供, 其还有一个变形, 叫做 DataFrame
Dataset 的特点
Dataset
是什么?
@Test def dataset1(): Unit ={ //1.创建SparkSession val spark=new SparkSession.Builder() .appName("dataset1") .master("local[6]") .getOrCreate() //2.导入隐式转换 import spark.implicits._ //3.演示 val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15), Person("wangwu", 30))) val ds = sourceRDD.toDS() //DataSet 支持强类型的API ds.filter( item => item.age>10).show() //DataSet 支持弱类型的API ds.filter('age>10).show() ds.filter($"age">10).show() //DataSet 可以直接编写SQL 表达式 ds.filter("age >10").show() }
Dataset
是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API
和类似 RDD
一样的命令式 API
可以获取 Dataset
对应的 RDD
表示:
- 使用
Dataset.rdd
将Dataset
转为RDD
的形式 Dataset
的执行计划底层的RDD
DataFrame 的作用和常见操作
DataFrame
是什么?
DataFrame
中有 Schema
信息, 可以像操作表一样操作 DataFrame
.![](https://img2020.cnblogs.com/blog/1717524/202101/1717524-20210112201821551-572514819.png)
DataFrame
由两部分构成, 一是 row
的集合, 每个 row
对象表示一个行, 二是描述 DataFrame
结构的 Schema
.
DataFrame
支持 SQL
中常见的操作, 例如: select
, filter
, join
, group
, sort
, join
等
@Test def dataframe1(): Unit ={ //1.创建SparkSession val spark=SparkSession.builder() .appName("datafreame1") .master("local[6]") .getOrCreate() //2.创建DataFrame import spark.implicits._ val dataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF() //3.DataFrame花样 dataFrame.where('age>10) .select('name) .show() }
通过集合创建 DataFrame
的时候, 集合中不仅可以包含样例类, 也可以只有普通数据类型, 后通过指定列名来创建
//1.创建SparkSession val spark=SparkSession.builder() .appName("datafreame1") .master("local[6]") .getOrCreate() //2.创建DataFrame import spark.implicits._ val personList = Seq(Person("zhangsan", 15), Person("lisi", 20)) //1.toDF val df1=personList.toDF() val df2=spark.sparkContext.parallelize(personList).toDF()
在 DataFrame
上可以使用的常规操作
DataFrame
的 Schema
, 查看其中所包含的列, 以及列的类型val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate() val df = spark.read .option("header", true) .csv("dataset/BeijingPM20100101_20151231.csv") df.printSchema()
Step 2: 对于大部分计算来说, 可能不会使用所有的列, 所以可以选择其中某些重要的列
df.select('year, 'month, 'PM_Dongsi)
Step 3: 可以针对某些列进行分组, 后对每组数据通过函数做聚合
df.select('year, 'month, 'PM_Dongsi)
.where('PM_Dongsi =!= "Na")
.groupBy('year, 'month)
.count()
.show()
使用 SQL
操作 DataFrame
@Test def dataframe3(): Unit ={ //1.创建SparkSession val spark=SparkSession.builder() .appName("datafreame1") .master("local[6]") .getOrCreate() //2.读取数据集 import spark.implicits._ val dataFrame = spark.read .option("header",value = true) .csv("dataset/BeijingPM20100101_20151231.csv") //3.处理 //处理一 // dataFrame.select('year,'month,'PM_Dongsi) // .where('PM_Dongsi =!= "NA") // .groupBy('year,'month) // .count() // .show() //处理二 dataFrame.createOrReplaceTempView("pm") val result=spark.sql("select year,month,count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year,month") result.show() spark.stop() //4.得出结论 }
-
DataFrame
是一个类似于关系型数据库表的函数式组件 -
DataFrame
一般处理结构化数据和半结构化数据 -
DataFrame
具有数据对象的 Schema 信息 -
可以使用命令式的
API
操作DataFrame
, 同时也可以使用SQL
操作DataFrame
-
DataFrame
可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建
Dataset 和 DataFrame 的异同
DataFrame
就是 Dataset
第一点: DataFrame
表达的含义是一个支持函数式操作的 表
, 而 Dataset
表达是是一个类似 RDD
的东西, Dataset
可以处理任何对象
- 第二点:
DataFrame
中所存放的是Row
对象, 而Dataset
中可以存放任何类型的对象 - 第三点:
DataFrame
的操作方式和Dataset
是一样的, 但是对于强类型操作而言, 它们处理的类型不同 - 第四点:
DataFrame
只能做到运行时类型检查,Dataset
能做到编译和运行时都有类型检查
@Test def qubie(): Unit ={ //1.创建SparkSession val spark=SparkSession.builder() .appName("datafreame1") .master("local[6]") .getOrCreate() //2.创建DataFrame import spark.implicits._ val personList = Seq(Person("zhangsan", 15), Person("lisi", 20)) //DataFrame 是弱类型 val df=personList.toDF() df.map( (row:Row) =>Row(row.get(0),row.getAs[Int](1)*2) )(RowEncoder.apply(df.schema)) .show() //DataFrame 所代表的弱类型操作编译不安全 df.groupBy("name,age") //Dataset 是强类型 val ds=personList.toDS() ds.map( (person:Person) => Person(person.name,person.age*2) ) .show() //Dataset 所代表的强类型操作编译运行都安全 // ds.filter(person =>person.age) }
Row
是什么?
Row
对象表示的是一个 行
Row
的操作类似于 Scala
中的 Map
数据类型
@Test def row(): Unit ={ //1.row如何创建,他是什么 val p=Person("zhangsan",14) val row=Row("zhangsan",14) //2.如何从row中获取数据 println(row.getString(0)) println(row.getInt(1)) //3.Row是样例类 row match { case Row(name,age) => println(name,age) }
DataFrame
和 Dataset
之间可以非常简单的相互转换
val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate() import spark.implicits._ val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF() val ds_fdf: Dataset[People] = df.as[People] val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS() val df_fds: DataFrame = ds.toDF()
总结
-
DataFrame
就是Dataset
, 他们的方式是一样的, 也都支持API
和SQL
两种操作方式 -
DataFrame
只能通过表达式的形式, 或者列的形式来访问数据, 只有Dataset
支持针对于整个对象的操作 -
DataFrame
中的数据表示为Row
, 是一个行的概念
数据读写
DataFrameReader
DataFrameReader
由如下几个组件组成
组件 | 解释 |
---|---|
|
结构信息, 因为 |
|
连接外部数据源的参数, 例如 |
|
外部数据源的格式, 例如 |
@Test def reader1(): Unit ={ //1.创建SparkSession val spark=SparkSession.builder() .appName("reader1") .master("local[6]") .getOrCreate() //2.框架 val read: DataFrameReader = spark.read }
DataFrameReader
有两种访问方式, 一种是使用 load
方法加载, 使用 format
指定加载格式, 还有一种是使用封装方法, 类似 csv
, json
, jdbc
等
@Test def reader2(): Unit ={ //1.创建SparkSession val spark=SparkSession.builder() .appName("reader1") .master("local[6]") .getOrCreate() //2.第一种形式 spark.read .format("csv") .option("header",value = true) .option("inferSchema",value = true) .load("dataset/BeijingPM20100101_20151231.csv") .show() //3.第二种形式 spark.read .option("header",value = true) .option("inferSchema",value = true) .csv("dataset/BeijingPM20100101_20151231.csv") .show(10) }
DataFrameWriter
DataFrameWriter
中由如下几个部分组成
组件 | 解释 |
---|---|
|
写入目标, 文件格式等, 通过 |
|
写入模式, 例如一张表已经存在, 如果通过 |
|
外部参数, 例如 |
|
类似 |
|
类似 |
|
用于排序的列, 通过 |
mode
指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等
Scala 对象表示 | 字符串表示 | 解释 |
---|---|---|
|
|
将 |
|
|
将 |
|
|
将 |
|
|
将
|
DataFrameWriter
也有两种使用方式, 一种是使用 format
配合 save
, 还有一种是使用封装方法, 例如 csv
, json
, saveAsTable
等@Test def writer1(): Unit = { //2.读取数据集 val df=spark.read.option("header",true).csv("dataset/BeijingPM20100101_20151231.csv") //3.写入数据集 df.write.json("dataset/bejing_pm.json") df.write.format("json").save("dataset/beijing_pm2.json") }
读写 Parquet 格式文件
读写 Parquet
文件:
@Test def parquet(): Unit = { //2.读取数据集 val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv") df.write .mode(SaveMode.Overwrite) .format("parquet") .save("dataset/beijing_pm3") spark.read .load("dataset/beijing_pm3") .show() }
写入 Parquet
的时候可以指定分区
Spark
在写入文件的时候是支持分区的, 可以像 Hive
一样设置某个列为分区列
@Test def parquetPartitions(): Unit ={ val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv") //2.写文件 表分区 //写分区的时候,分区列不会包含在生成的文件中 //直接通过文件来读取,分区信息会丢失 //spark sql 会进行自动分区发现 df.write .partitionBy("year","month") .save("dataset/beijing_pm4") //3.读文件,自动发现分区 spark.read .parquet("dataset/beijing_pm4") .printSchema() }
读写 JSON 格式文件
@Test def json(){ val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv") df.write .json("dataset/beijing_pm5.json") spark.read .json("dataset/beijing_pm5.json") .show() }
Spark
可以从一个保存了 JSON
格式字符串的 Dataset[String]
中读取 JSON
信息, 转为 DataFrame
@Test def json2(): Unit ={ val df = spark.read.option("header", true).csv("dataset/BeijingPM20100101_20151231.csv") //直接从rdd读取json的DataFrame val jsonrdd = df.toJSON.rdd spark.read.json(jsonrdd).show() }
访问 Hive
启动 Hive MetaStore
nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
访问 Hive 表
在 Hive
中创建表
第一步, 需要先将文件上传到集群中, 使用如下命令上传到 HDFS
中
hdfs dfs -mkdir -p /dataset
hdfs dfs -put studenttabl10k /dataset/
第二步, 使用 Hive
或者 Beeline
执行如下 SQL
CREATE DATABASE IF NOT EXISTS spark_integrition; USE spark_integrition; CREATE EXTERNAL TABLE student ( name STRING, age INT, gpa string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION '/dataset/hive';
第三步:将hdfs上的文件,加载到该表
LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student;
通过 SparkSQL
查询 Hive
的表
scala> spark.sql("use spark_integrition") scala> val resultDF = spark.sql("select * from student limit 10") scala> resultDF.show()
通过 SparkSQL
创建 Hive
表
val createTableStr = """ |CREATE EXTERNAL TABLE student |( | name STRING, | age INT, | gpa string |) |ROW FORMAT DELIMITED | FIELDS TERMINATED BY ' ' | LINES TERMINATED BY ' ' |STORED AS TEXTFILE |LOCATION '/dataset/hive' """.stripMargin spark.sql("CREATE DATABASE IF NOT EXISTS spark03") spark.sql("USE spark03") spark.sql(createTableStr) spark.sql("LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student") spark.sql("select * from student limit 100").show()
使用 SparkSQL
处理数据并保存进 Hive 表
package cn.itcast.spark.sql import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType} object HiveAccess { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("hive example") .config("spark.sql.warehouse.dir", "hdfs://hadoop101:8020/dataset/hive") .config("hive.metastore.uris", "thrift://hadoop101:9083") .enableHiveSupport() .getOrCreate() import spark.implicits._ //读取数据 val schema = StructType( List( StructField("name", StringType), StructField("age", IntegerType), StructField("gpa", FloatType) ) ) val studentDF = spark.read .option("delimiter", " ") .schema(schema) .csv("hdfs://hadoop101:8020/dataset/studenttab10k") val resultDF = studentDF.where('age < 50) //写入 resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark03.student") } }
打包后:通过spark-submit来运行
bin/spark-submit --class cn.itcast.spark.rdd.WordCount --master spark://hadoop101:7077 ~/original-spark-1.0-SNAPSHOT.jar
然后去hive中查看:
JDBC
package cn.itcast.spark.sql import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType} object Mysql { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("mysql example") .master("local[6]") .getOrCreate() val schema = StructType( List( StructField("name", StringType), StructField("age", IntegerType), StructField("gpa", FloatType) ) ) val studentDF = spark.read //分隔符:制表符 .option("delimiter", " ") .schema(schema) .csv("dataset/studenttab10k") studentDF.write .format("jdbc") .mode(SaveMode.Overwrite) .option("url", "jdbc:mysql://hadoop101:3306/spark02") .option("dbtable", "student") .option("user", "spark") .option("password", "fengge666") .save() } }
SparkSQL
中并没有直接提供按照 SQL
进行筛选读取数据的 API
和参数, 但是可以通过 dbtable
来曲线救国, dbtable
指定目标表的名称, 但是因为 dbtable
中可以编写 SQL
, 所以使用子查询即可做到
属性 | 含义 |
---|---|
|
指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 |
|
确定步长的参数, |
|
分区数量 |
spark.read.format("jdbc") .option("url", "jdbc:mysql://node01:3306/spark_test") .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu") .option("user", "spark") .option("password", "Spark123!") .option("partitionColumn", "age") .option("lowerBound", 1) .option("upperBound", 60) .option("numPartitions", 10) .load() .show()