数据分析的方式:
命令式
在前面的 RDD
部分, 非常明显可以感觉的到是命令式的, 主要特征是通过一个算子, 可以得到一个结果, 通过结果再进行后续计算.
命令式的优点
-
-
操作粒度更细, 能够控制数据的每一个处理环节
-
操作更明确, 步骤更清晰, 容易维护
-
支持非结构化数据的操作
-
命令式的缺点
-
-
需要一定的代码功底
-
写起来比较麻烦
-
SQL
-
对于一些数据科学家, 要求他们为了做一个非常简单的查询, 写一大堆代码, 明显是一件非常残忍的事情, 所以
SQL on Hadoop
是一个非常重要的方向.SQL 的优点
-
-
表达非常清晰, 比如说这段
SQL
明显就是为了查询三个字段, 又比如说这段SQL
明显能看到是想查询年龄大于 10 岁的条目
-
SQL 的缺点
-
-
想想一下 3 层嵌套的
SQL
, 维护起来应该挺力不从心的吧 -
试想一下, 如果使用
SQL
来实现机器学习算法, 也挺为难的吧
-
-
SQL
擅长数据分析和通过简单的语法表示查询, 命令式操作适合过程式处理和算法性的处理. 在 Spark
出现之前, 对于结构化数据的查询和处理, 一个工具一向只能支持 SQL
或者命令式, 使用者被迫要使用多个工具来适应两种场景, 并且多个工具配合起来比较费劲.
而 Spark
出现了以后, 统一了两种数据处理范式, 是一种革新性的进步.
因为 SQL
是数据分析领域一个非常重要的范式, 所以 Spark
一直想要支持这种范式, 而伴随着一些决策失误, 这个过程其实还是非常曲折的
Hive
-
解决的问题
-
-
Hive
实现了SQL on Hadoop
, 使用MapReduce
执行任务 -
简化了
MapReduce
任务
-
新的问题
-
-
Hive
的查询延迟比较高, 原因是使用MapReduce
做调度
-
-
Shark
-
解决的问题
-
-
Shark
改写Hive
的物理执行计划, 使用Spark
作业代替MapReduce
执行物理计划 -
使用列式内存存储
-
以上两点使得
Shark
的查询效率很高
-
新的问题
-
-
Shark
重用了Hive
的SQL
解析, 逻辑计划生成以及优化, 所以其实可以认为Shark
只是把Hive
的物理执行替换为了Spark
作业 -
执行计划的生成严重依赖
Hive
, 想要增加新的优化非常困难 -
Hive
使用MapReduce
执行作业, 所以Hive
是进程级别的并行, 而Spark
是线程级别的并行, 所以Hive
中很多线程不安全的代码不适用于Spark
-
由于以上问题,
Shark
维护了Hive
的一个分支, 并且无法合并进主线, 难以为继 -
SparkSQL
-
解决的问题
-
-
Spark SQL
使用Hive
解析SQL
生成AST
语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖Hive
-
执行计划和优化交给优化器
Catalyst
-
内建了一套简单的
SQL
解析器, 可以不使用HQL
, 此外, 还引入和DataFrame
这样的DSL API
, 完全可以不依赖任何Hive
的组件 -
Shark
只能查询文件,Spark SQL
可以直接降查询作用于RDD
, 这一点是一个大进步
-
新的问题
-
对于初期版本的
SparkSQL
, 依然有挺多问题, 例如只能支持SQL
的使用, 不能很好的兼容命令式, 入口不够统一等
-
Dataset
-
SparkSQL
在 2.0 时代, 增加了一个新的API
, 叫做Dataset
,Dataset
统一和结合了SQL
的访问和命令式API
的使用, 这是一个划时代的进步在
Dataset
中可以轻易的做到使用SQL
查询并且筛选数据, 然后使用命令式API
进行探索式分析
重要性
|
总结: SparkSQL
是什么
SparkSQL
是一个为了支持 SQL
而设计的工具, 但同时也支持命令式的 API
SparkSQL 的适用场景:
结构化数据
一般指数据有固定的 Schema
, 例如在用户表中, name
字段是 String
型, 那么每一条数据的 name
字段值都可以当作 String
来使用
+----+--------------+---------------------------+-------+---------+
| id | name | url | alexa | country |
+----+--------------+---------------------------+-------+---------+
| 1 | Google | https://www.google.cm/ | 1 | USA |
| 2 | 淘宝 | https://www.taobao.com/ | 13 | CN |
| 3 | 菜鸟教程 | http://www.runoob.com/ | 4689 | CN |
| 4 | 微博 | http://weibo.com/ | 20 | CN |
| 5 | Facebook | https://www.facebook.com/ | 3 | USA |
+----+--------------+---------------------------+-------+---------+
半结构化数据
一般指的是数据没有固定的 Schema
, 但是数据本身是有结构的
没有固定 Schema
-
指的是半结构化数据是没有固定的
Schema
的, 可以理解为没有显式指定Schema
比如说一个用户信息的JSON
文件, 第一条数据的phone_num
有可能是String
, 第二条数据虽说应该也是String
, 但是如果硬要指定为BigInt
, 也是有可能的
因为没有指定Schema
, 没有显式的强制的约束
有结构
-
虽说半结构化数据是没有显式指定
Schema
的, 也没有约束, 但是半结构化数据本身是有有隐式的结构的, 也就是数据自身可以描述自身
例如JSON
文件, 其中的某一条数据是有字段这个概念的, 每个字段也有类型的概念, 所以说JSON
是可以描述自身的, 也就是数据本身携带有元信息
SparkSQL
处理什么数据的问题?
-
-
Spark
的RDD
主要用于处理 非结构化数据 和 半结构化数据 -
SparkSQL
主要用于处理 结构化数据
-
SparkSQL
相较于 RDD
的优势在哪?
-
-
SparkSQL
提供了更好的外部数据源读写支持-
因为大部分外部数据源是有结构化的, 需要在
RDD
之外有一个新的解决方案, 来整合这些结构化数据源
-
-
SparkSQL
提供了直接访问列的能力-
因为
SparkSQL
主要用做于处理结构化数据, 所以其提供的API
具有一些普通数据库的能力
-
-
总结: SparkSQL
适用于什么场景?
SparkSQL
适用于处理结构化数据的场景
本章总结
-
SparkSQL
是一个即支持SQL
又支持命令式数据处理的工具 -
SparkSQL
的主要适用场景是处理结构化数据
SparkSQL初体验(案例使用)命令式API:
@Test def rddIntro(): Unit = { val conf = new SparkConf().setMaster("local[6]").setAppName("rdd intro") val sc = new SparkContext(conf) sc.textFile("dataset/wordcount.txt") .flatMap( _.split(" ") ) .map( (_, 1) ) .reduceByKey( _ + _ ) .collect() .foreach( println(_) ) }
@Test def dsIntro(): Unit = { val spark = new SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() import spark.implicits._//导入一些隐式转换 val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15))) val personDS: Dataset[Person] = sourceRDD.toDS() val resultDS = personDS.where( 'age > 10 ) .where( 'age < 20 ) .select( 'name ) .as[String] resultDS.show() }
SparkSession
SparkContext
作为 RDD
的创建者和入口, 其主要作用有如下两点
-
-
创建
RDD
, 主要是通过读取文件创建RDD
-
监控和调度任务, 包含了一系列组件, 例如
DAGScheduler
,TaskSheduler
-
为什么无法使用 SparkContext
作为 SparkSQL
的入口?
-
-
SparkContext
在读取文件的时候, 是不包含Schema
信息的, 因为读取出来的是RDD
-
SparkContext
在整合数据源如Cassandra
,JSON
,Parquet
等的时候是不灵活的, 而DataFrame
和Dataset
一开始的设计目标就是要支持更多的数据源 -
SparkContext
的调度方式是直接调度RDD
, 但是一般情况下针对结构化数据的访问, 会先通过优化器优化一下
-
所以 SparkContext
确实已经不适合作为 SparkSQL
的入口, 所以刚开始的时候 Spark
团队为 SparkSQL
设计了两个入口点, 一个是 SQLContext
对应 Spark
标准的 SQL
执行, 另外一个是 HiveContext
对应 HiveSQL
的执行和 Hive
的支持.
在 Spark 2.0
的时候, 为了解决入口点不统一的问题, 创建了一个新的入口点 SparkSession
, 作为整个 Spark
生态工具的统一入口点, 包括了 SQLContext
, HiveContext
, SparkContext
等组件的功能
新的入口应该有什么特性?
-
-
能够整合
SQLContext
,HiveContext
,SparkContext
,StreamingContext
等不同的入口点 -
为了支持更多的数据源, 应该完善读取和写入体系
-
同时对于原来的入口点也不能放弃, 要向下兼容
-
DataFrame & Dataset
SparkSQL
最大的特点:就是它针对于结构化数据设计, 所以 SparkSQL
应该是能支持针对某一个字段的访问的, 而这种访问方式有一个前提, 就是 SparkSQL
的数据集中, 要 包含结构化信息, 也就是俗称的 Schema
而 SparkSQL
对外提供的 API
有两类, 一类是直接执行 SQL
, 另外一类就是命令式. SparkSQL
提供的命令式 API
就是 DataFrame
和 Dataset
, 暂时也可以认为 DataFrame
就是 Dataset
, 只是在不同的 API
中返回的是 Dataset
的不同表现形式
// RDD
rdd.map { case Person(id, name, age) => (age, 1) }
.reduceByKey {case ((age, count), (totalAge, totalCount)) => (age, count + totalCount)}
// DataFrame
df.groupBy("age").count("age")
通过上面的代码, 可以清晰的看到, SparkSQL
的命令式操作相比于 RDD
来说, 可以直接通过 Schema
信息来访问其中某个字段, 非常的方便
SparkSQL初体验(案例使用)声明式API:
@Test def dfIntro(): Unit = { val spark = new SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() import spark.implicits._ val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15))) val df = sourceRDD.toDF() df.createOrReplaceTempView("person") val resultDF = spark.sql("select name from person where age > 10 and age < 20") resultDF.show() }