Spark SQL编程之DataFrame篇
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.DataFrame的创建
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:
(1)通过Spark的数据源进行创建;
(2)从一个存在的RDD进行转换;
(3)还可以从Hive Table进行查询返回。
1>.从Spark数据源进行创建
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json {"name":"yinzhengjie","passwd":"2020"} {"name":"Jason","passwd":"666666"} {"name":"Liming","passwd":"123"} {"name":"Jenny","passwd":"456"} {"name":"Danny","passwd":"789"} [root@hadoop101.yinzhengjie.org.cn ~]#
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 20/07/13 03:03:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594580701441). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> sc #Spark-shell内置的sc变量 res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40cd02fc scala> spark #spark-shell内置的spark变量 res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@16ef07fa scala> val df = spark.read.json("/tmp/user.json") #读取本地的json文件会返回一个DataFrame对象,我们命名为df。 df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.show #展示读取到的结果 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala>
2>.从RDD进行转换
博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/13185272.html https://www.cnblogs.com/yinzhengjie2020/p/13200300.html
3>.从Hive Table进行查询返回
博主推荐阅读: https://www.cnblogs.com/yinzhengjie2020/p/13211015.html
二.SQL风格语法
临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。
需要注意的是,使用全局表时需要全路径访问,如:"global_temp.user2"
1>.创建临时视图
scala> val df = spark.read.json("/tmp/user.json") #读取本地的json文件会返回一个DataFrame对象,我们命名为df。 df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.createTempView("user") #创建临时视图 scala> spark.sql("select * from user").show #使用Spark SQL来查询数据 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.sql("select * from user where passwd=2020").show #当然,我们也可以进行过滤操作。 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| +-----------+------+ scala>
2>.创建全局视图
scala> df.createGlobalTempView("user2") #创建一个全局视图 scala> spark.sql("select * from global_temp.user2").show #默认使用当前的session查询全局视图数据 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.sql("select * from global_temp.user2 user where passwd=2020").show +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| +-----------+------+ scala> spark.newSession().sql("select * from global_temp.user2").show #使用一个新session来查询全局视图数据 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.newSession().sql("select * from global_temp.user2 user where passwd=2020").show +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| +-----------+------+ scala>
三.DSL风格语法
1>.查看DataFrame的Schema信息
scala> val df = spark.read.json("/tmp/user.json") #创建一个DataFrame df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.printSchema #查看DataFrame的Schema信息 root |-- name: string (nullable = true) |-- passwd: string (nullable = true) scala>
2>.只查看"name"列数据
scala> df.select("name").show() +-----------+ | name| +-----------+ |yinzhengjie| | Jason| | Liming| | Jenny| | Danny| +-----------+ scala>
3>.查看”name”列数据以及”passwd+30”数据
scala> df.select($"name", $"passwd" + 10).show() +-----------+-------------+ | name|(passwd + 10)| +-----------+-------------+ |yinzhengjie| 2030.0| | Jason| 666676.0| | Liming| 133.0| | Jenny| 466.0| | Danny| 799.0| +-----------+-------------+ scala>
4>.查看”passwd”大于”2020”的数据
scala> df.filter($"passwd" > 2020).show() +-----+------+ | name|passwd| +-----+------+ |Jason|666666| +-----+------+ scala>
5>.按照”passwd”分组,查看数据条数
scala> df.groupBy("passwd").count().show() +------+-----+ |passwd|count| +------+-----+ | 2020| 1| | 789| 1| |666666| 1| | 456| 1| | 123| 1| +------+-----+ scala>
四.RDD转换为DataFrame
温馨提示: 如果需要RDD与DF或者DS之间操作,那么都需要引入"import spark.implicits._"(spark不是包名,而是sparkSession对象的名称),下面是具体的案例。
scala> import spark.implicits._ #导入隐式转换 import spark.implicits._ scala> val listRDD = sc.makeRDD(List((1,"YinZhengjie",18),(2,"Jason Yin",20),(3,"Danny",28))) #创建一个RDD listRDD: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[84] at makeRDD at <console>:27 scala> val df = listRDD.toDF("Id","Name","Age") #将RDD转换成DataFrame df: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show #查看将RDD转换成DataFrame后的数据 +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala>
五.DataFrame转换为RDD
scala> df #注意观察此时df是DataFrame res33: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala> df.rdd #直接调用rdd方法即可将DataFrame转换为RDD res35: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[97] at rdd at <console>:29 scala> res35.collect #查看DataFrame转换rdd后的数据(注意哈,这个res36是上一条命令执行的返回结果) res36: Array[org.apache.spark.sql.Row] = Array([1,YinZhengjie,18], [2,Jason Yin,20], [3,Danny,28]) scala>