zoukankan      html  css  js  c++  java
  • Spark SQL编程之DataFrame篇

                 Spark SQL编程之DataFrame篇

                                         作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.DataFrame的创建

      在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:
        (1)通过Spark的数据源进行创建;
        (2)从一个存在的RDD进行转换;
        (3)还可以从Hive Table进行查询返回。

    1>.从Spark数据源进行创建

    [root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json
    {"name":"yinzhengjie","passwd":"2020"}
    {"name":"Jason","passwd":"666666"}
    {"name":"Liming","passwd":"123"}
    {"name":"Jenny","passwd":"456"}
    {"name":"Danny","passwd":"789"}
    [root@hadoop101.yinzhengjie.org.cn ~]# 
    [root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json        #创建测试数据
    [root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 
    20/07/13 03:03:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040
    Spark context available as 'sc' (master = local[*], app id = local-1594580701441).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.4.6
          /_/
             
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> sc                            #Spark-shell内置的sc变量
    res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40cd02fc
    
    scala> spark                          #spark-shell内置的spark变量
    res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@16ef07fa
    
    scala> val df = spark.read.json("/tmp/user.json")     #读取本地的json文件会返回一个DataFrame对象,我们命名为df。
    df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]
    
    scala> df.show                         #展示读取到的结果
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    |      Jason|666666|
    |     Liming|   123|
    |      Jenny|   456|
    |      Danny|   789|
    +-----------+------+
    
    
    scala> 

    2>.从RDD进行转换

      博主推荐阅读:
        https://www.cnblogs.com/yinzhengjie2020/p/13185272.html
        https://www.cnblogs.com/yinzhengjie2020/p/13200300.html

    3>.从Hive Table进行查询返回

      博主推荐阅读:
        https://www.cnblogs.com/yinzhengjie2020/p/13211015.html

    二.SQL风格语法

      临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。
    
      需要注意的是,使用全局表时需要全路径访问,如:"global_temp.user2"

    1>.创建临时视图

    scala> val df = spark.read.json("/tmp/user.json")     #读取本地的json文件会返回一个DataFrame对象,我们命名为df。
    df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]
    
    scala> df.createTempView("user")              #创建临时视图
    
    scala> spark.sql("select * from user").show        #使用Spark SQL来查询数据
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    |      Jason|666666|
    |     Liming|   123|
    |      Jenny|   456|
    |      Danny|   789|
    +-----------+------+
    
    
    scala> spark.sql("select * from user where passwd=2020").show      #当然,我们也可以进行过滤操作。
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    +-----------+------+
    
    
    scala> 

    2>.创建全局视图

    scala> df.createGlobalTempView("user2")                    #创建一个全局视图
    
    scala> spark.sql("select * from global_temp.user2").show          #默认使用当前的session查询全局视图数据
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    |      Jason|666666|
    |     Liming|   123|
    |      Jenny|   456|
    |      Danny|   789|
    +-----------+------+
    
    
    scala> spark.sql("select * from global_temp.user2 user where passwd=2020").show
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    +-----------+------+
    
    
    scala> spark.newSession().sql("select * from global_temp.user2").show      #使用一个新session来查询全局视图数据
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    |      Jason|666666|
    |     Liming|   123|
    |      Jenny|   456|
    |      Danny|   789|
    +-----------+------+
    
    
    scala> spark.newSession().sql("select * from global_temp.user2 user where passwd=2020").show
    +-----------+------+
    |       name|passwd|
    +-----------+------+
    |yinzhengjie|  2020|
    +-----------+------+
    
    
    scala> 

    三.DSL风格语法

    1>.查看DataFrame的Schema信息

    scala> val df = spark.read.json("/tmp/user.json")        #创建一个DataFrame
    df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]
    
    scala> df.printSchema                        #查看DataFrame的Schema信息
    root
     |-- name: string (nullable = true)
     |-- passwd: string (nullable = true)
    
    
    scala> 

    2>.只查看"name"列数据

    scala> df.select("name").show()
    +-----------+
    |       name|
    +-----------+
    |yinzhengjie|
    |      Jason|
    |     Liming|
    |      Jenny|
    |      Danny|
    +-----------+
    
    
    scala> 

    3>.查看”name”列数据以及”passwd+30”数据

    scala> df.select($"name", $"passwd" + 10).show()
    +-----------+-------------+
    |       name|(passwd + 10)|
    +-----------+-------------+
    |yinzhengjie|       2030.0|
    |      Jason|     666676.0|
    |     Liming|        133.0|
    |      Jenny|        466.0|
    |      Danny|        799.0|
    +-----------+-------------+
    
    
    scala> 

    4>.查看”passwd”大于”2020”的数据

    scala> df.filter($"passwd" > 2020).show()
    +-----+------+
    | name|passwd|
    +-----+------+
    |Jason|666666|
    +-----+------+
    
    
    scala> 

    5>.按照”passwd”分组,查看数据条数

    scala> df.groupBy("passwd").count().show()
    +------+-----+
    |passwd|count|
    +------+-----+
    |  2020|    1|
    |   789|    1|
    |666666|    1|
    |   456|    1|
    |   123|    1|
    +------+-----+
    
    
    scala> 

    四.RDD转换为DataFrame

      温馨提示:  
        如果需要RDD与DF或者DS之间操作,那么都需要引入"import spark.implicits._"(spark不是包名,而是sparkSession对象的名称),下面是具体的案例。


    scala
    > import spark.implicits._ #导入隐式转换 import spark.implicits._ scala> val listRDD = sc.makeRDD(List((1,"YinZhengjie",18),(2,"Jason Yin",20),(3,"Danny",28)))      #创建一个RDD listRDD: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[84] at makeRDD at <console>:27 scala> val df = listRDD.toDF("Id","Name","Age")      #将RDD转换成DataFrame df: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show                         #查看将RDD转换成DataFrame后的数据 +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala>

    五.DataFrame转换为RDD

    scala> df          #注意观察此时df是DataFrame
    res33: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]
    
    scala> df.show
    +---+-----------+---+
    | Id|       Name|Age|
    +---+-----------+---+
    |  1|YinZhengjie| 18|
    |  2|  Jason Yin| 20|
    |  3|      Danny| 28|
    +---+-----------+---+
    
    
    scala> df.rdd       #直接调用rdd方法即可将DataFrame转换为RDD
    res35: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[97] at rdd at <console>:29
    
    scala> res35.collect   #查看DataFrame转换rdd后的数据(注意哈,这个res36是上一条命令执行的返回结果)
    res36: Array[org.apache.spark.sql.Row] = Array([1,YinZhengjie,18], [2,Jason Yin,20], [3,Danny,28])
    
    scala> 
  • 相关阅读:
    xss漏洞
    web日志分析(待)
    linux命令学习摘记
    浏览器的MIME映射(程序映射)
    文件上传靶场-Upload-Labs
    目录遍历用字典
    cmd、bat分割单行字符串
    iptables使用
    Spring AOP 学习(五)
    Spring 使用注解注入 学习(四)
  • 原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13193293.html
Copyright © 2011-2022 走看看