Spark SQL
发家史
熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);
同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。
Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行计划生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。
Spark SQL中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。
一、DataFrame对象的生成
Spark-SQL可以以其他RDD对象、parquet文件、json文件、hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。
以MySQL数据库为数据源,生成DataFrame对象后进行相关的DataFame之上的操作。
文中生成DataFrame的代码如下:
1 object DataFrameOperations { 2 def main (args: Array[String ]) { 3 val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" ) 4 val sparkContext = new SparkContext(sparkConf) 5 6 val sqlContext = new SQLContext(sparkContext) 7 val url = "jdbc:mysql://k131:3306/test" 8 9 val jdbcDF = sqlContext.read.format( "jdbc" ).options( 10 Map( "url" -> url, 11 "user" -> "root", 12 "password" -> "root", 13 "dbtable" -> "spark_sql_test" )).load() 14 15 val joinDF1 = sqlContext.read.format( "jdbc" ).options( 16 Map("url" -> url , 17 "user" -> "root", 18 "password" -> "root", 19 "dbtable" -> "spark_sql_join1" )).load() 20 21 val joinDF2 = sqlContext.read.format( "jdbc" ).options( 22 Map ( "url" -> url , 23 "user" -> "root", 24 "password" -> "root", 25 "dbtable" -> "spark_sql_join2" )).load() 26 27 ... ... 28 } 29 }
DataFrame对象上Action操作
1、show:展示数据
以表格的形式在输出中展示jdbcDF中的数据,类似于select * from spark_sql_test的功能。
show方法有四种调用方式,分别为,
(1)show
只显示前20条记录。
示例:
jdbcDF.show
(2)show(numRows: Int)
显示numRows
条
示例:
jdbcDF.show(3)
(3)show(truncate: Boolean)
是否最多只显示20个字符,默认为true
。
示例:
jdbcDF.show(true)
jdbcDF.show(false)
(4)show(numRows: Int, truncate: Boolean)
综合前面的显示记录条数,以及对过长字符串的显示格式。
示例:
jdbcDF.show(3, false)
2、collect
:获取所有数据到数组
不同于前面的show
方法,这里的collect
方法会将jdbcDF
中的所有数据都获取到,并返回一个Array
对象。
jdbcDF.collect()
3、collectAsList
:获取所有数据到List
功能和collect
类似,只不过将返回结构变成了List
对象,使用方法如下
jdbcDF.collectAsList()
4、describe(cols: String*):获取指定字段的统计信息
这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。
使用方法如下,其中c1字段为字符类型,c2字段为整型,c4字段为浮点型
jdbcDF .describe("c1" , "c2", "c4" ).show()
5、first, head, take, takeAsList:获取若干行记录
这里列出的四个方法比较类似,其中
(1)first获取第一行记录
(2)head获取第一行记录,head(n: Int)获取前n行记录
(3)take(n: Int)获取前n行数据
(4)takeAsList(n: Int)获取前n行数据,并以List的形式展现
以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError
二、DataFrame对象上的条件查询和join等操作
以下返回为DataFrame类型的方法,可以连续调用。
1、where条件相关
(1)where(conditionExpr: String):SQL语言中where关键字后的条件
传入筛选条件表达式,可以用and和or。得到DataFrame类型的返回结果,
示例:
jdbcDF .where("id = 1 or c1 = 'b'" ).show()
(2)filter
:根据字段进行筛选
传入筛选条件表达式,得到DataFrame类型的返回结果。和where
使用条件相同
示例:
jdbcDF .filter("id = 1 or c1 = 'b'" ).show()
2、查询指定字段
(1)select
:获取指定字段值
根据传入的String
类型字段名,获取指定字段的值,以DataFrame类型返回
示例:
jdbcDF.select( "id" , "c3" ).show( false)