zoukankan      html  css  js  c++  java
  • spark sql

    Spark 用来操作结构化和半结构化数据的接口——Spark SQL。结构化数据是指任何有结构信息的数据。所谓结构信息,就是每条记录共用的已知的字段集合。当数据符合这样的条件时,Spark SQL 就会使得针对这些数据的读取和查询变得更加简单高效。

    Spark SQL 提供了以下三大功能:

    (1) Spark SQL 可以从各种结构化数据源(例如JSON、Hive、Parquet 等)中读取数据。
    (2) Spark SQL 不仅支持在Spark 程序内使用SQL 语句进行数据查询,也支持从类似商业智能软件Tableau 这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接SparkSQL 进行查询。
    (3) 当在Spark 程序内使用Spark SQL 时,Spark SQL 支持SQL 与常规的Python/Java/Scala代码高度整合,包括连接RDD 与SQL 表、公开的自定义SQL 函数接口等。这样一来,许多工作都更容易实现了。

    为了实现这些功能,Spark SQL 提供了一种特殊的RDD,叫作SchemaRDD。SchemaRDD是存放Row 对象的RDD,每个Row 对象代表一行记录。SchemaRDD 还包含记录的结构信息(即数据字段)。SchemaRDD 看起来和普通的RDD 很像,但是在内部,SchemaRDD 可以利用结构信息更加高效地存储数据。此外,SchemaRDD 还支持RDD 上所没有的一些新操作,比如运行SQL 查询。SchemaRDD 可以从外部数据源创建,也可以从查询结果或普通RDD 中创建。1.3.0 及后续版本中,SchemaRDD 已经被DataFrame 所取代。

     Spark SQL 最强大之处就是可以在Spark 应用内使用。这种方式让我们可以轻松读取数据并使用SQL 查询,同时还能把这一过程和普通的Python/Java/Scala 程序代码结合在一起。

    在应用中使用Spark SQL

    scala> import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.hive.HiveContext

    scala> val hivectx=new HiveContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    hivectx: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@6a04da88

    scala> val custdf=hivectx.sql("select * from gamedw.cust")
    custdf: org.apache.spark.sql.DataFrame = [custname: string, sex: int ... 1 more field]

    scala> custdf.count
    res53: Long = 9  


    scala> custdf.where("sex==1")
    res55: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [custname: string, sex: int ... 1 more field]

    scala> custdf.where("sex==1").show
    +---------------+---+--------+
    |       custname|sex|nianling|
    +---------------+---+--------+
    |tianyt_touch100|  1|      50|
    |         wangwu|  1|      85|
    |       zhangsan|  1|      20|
    |        liuyang|  1|      32|
    |      mahuateng|  1|    1001|
    |          nihao|  1|       5|
    +---------------+---+--------+


    scala> custdf.take(1)
    res58: Array[org.apache.spark.sql.Row] = Array([tianyt_touch100,1,50])

    scala> custdf.take(2)
    res59: Array[org.apache.spark.sql.Row] = Array([tianyt_touch100,1,50], [wangwu,1,85])

    scala> custdf.filter(line=>line(1)==1).show
    +---------------+---+--------+
    |       custname|sex|nianling|
    +---------------+---+--------+
    |tianyt_touch100|  1|      50|
    |         wangwu|  1|      85|
    |       zhangsan|  1|      20|
    |        liuyang|  1|      32|
    |      mahuateng|  1|    1001|
    |          nihao|  1|       5|
    +---------------+---+--------+

    scala> val custnamedf=custdf.map(line=>line.getString(0))  //getString(0) 会把字段0 的值作为字符串返回

    custnamedf: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> val custnamedf=custdf.map(line=>line(0).toString)

    custnamedf: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> custnamedf.show
    +---------------+
    |          value|
    +---------------+
    |tianyt_touch100|
    |         wangwu|
    |       zhangsan|
    |         liuqin|
    |         wangwu|
    |        liuyang|
    |          hello|
    |      mahuateng|
    |          nihao|
    +---------------+

     scala> custdf.select("custname").show
    +---------------+
    |       custname|
    +---------------+
    |tianyt_touch100|
    |         wangwu|
    |       zhangsan|
    |         liuqin|
    |         wangwu|
    |        liuyang|
    |          hello|
    |      mahuateng|
    |          nihao|
    +---------------+

    scala> custdf.select("custname")
    res110: org.apache.spark.sql.DataFrame = [custname: string]

    scala> custdf.select("custname").show
    +---------------+
    |       custname|
    +---------------+
    |tianyt_touch100|
    |         wangwu|
    |       zhangsan|
    |         liuqin|
    |         wangwu|
    |        liuyang|
    |          hello|
    |      mahuateng|
    |          nihao|
    +---------------+

    scala> custdf.printSchema
    root
     |-- custname: string (nullable = true)
     |-- sex: integer (nullable = true)
     |-- nianling: integer (nullable = true)

    Row 对象表示SchemaRDD(DataFrame) 中的记录,其本质就是一个定长的字段数组

    缓存

    Spark SQL 的缓存机制与Spark 中的稍有不同。由于我们知道每个列的类型信息,所以Spark 可以更加高效地存储数据。为了确保使用更节约内存的表示方式进行缓存而不是储存整个对象,应当使用专门的hiveCtx.cacheTable("tableName") 方法。当缓存数据表时,Spark SQL 使用一种列式存储格式在内存中表示数据。这些缓存下来的表只会在驱动器程序的生命周期里保留在内存中,所以如果驱动器进程退出,就需要重新缓存数据。和缓存RDD 时的动机一样,如果想在同样的数据上多次运行任务或查询时,就应把这些数据表缓存起来。

    你也可以使用HiveQL/SQL 语句来缓存表。只需要运行CACHE TABLEtableName 或UNCACHETABLEtableName 来缓存表或者删除已有的缓存即可。

    scala> hivectx.cacheTable("gamedw.cust")

    scala> hivectx.sql("select * from gamedw.cust").show
    +---------------+---+--------+
    |       custname|sex|nianling|
    +---------------+---+--------+
    |tianyt_touch100|  1|      50|
    |         wangwu|  1|      85|
    |       zhangsan|  1|      20|
    |         liuqin|  0|      56|
    |         wangwu|  0|      47|
    |        liuyang|  1|      32|
    |          hello|  0|     100|
    |      mahuateng|  1|    1001|
    |          nihao|  1|       5|
    +---------------+---+--------+

    然后在Spark 的应用用户界面看到了In-memory table gamedw.cust的缓存

    scala> hivectx.uncacheTable("gamedw.cust") // 删除缓存

     读取和存储数据

    Spark SQL 支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取到Row 对象。这些数据源包括Hive 表、JSON 和Parquet 文件。此外,当你使用SQL 查询这些数据源中的数据并且只用到了一部分字段时,Spark SQL 可以智能地只扫描这些用到的字段,而不是像SparkContext.hadoopFile 中那样简单粗暴地扫描全部数据。

    Apache Hive

    当从Hive 中读取数据时,Spark SQL 支持任何Hive 支持的存储格式(SerDe),包括文本文件、RCFiles、ORC、Parquet、Avro,以及Protocol Buffer。

    JSON

     [root@host ~]# hdfs dfs -cat /root/name.json
    {"name": "Holden"}
    {"name": "Sparky The Bear", "lovesPandas":true,"knows": {"friends":["holden","bajie"]}}

    scala> val hivectx=new HiveContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    hivectx: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@333411ee


    scala> val namejsdf=hivectx.jsonFile("hdfs://localhost:9000/root/name.json")
    warning: there was one deprecation warning; re-run with -deprecation for details
    namejsdf: org.apache.spark.sql.DataFrame = [knows: struct<friends: array<string>>, lovesPandas: boolean ... 1 more field]

    scala> namejsdf.show
    +--------------------+-----------+---------------+
    |               knows|lovesPandas|           name|
    +--------------------+-----------+---------------+
    |                null|       null|         Holden|
    |[WrappedArray(hol...|       true|Sparky The Bear|
    +--------------------+-----------+---------------+


    scala> namejsdf.printSchema
    root
     |-- knows: struct (nullable = true)
     |    |-- friends: array (nullable = true)
     |    |    |-- element: string (containsNull = true)
     |-- lovesPandas: boolean (nullable = true)
     |-- name: string (nullable = true)


    scala> namejsdf.select("name","knows.friends").show
    +---------------+---------------+
    |           name|        friends|
    +---------------+---------------+
    |         Holden|           null|
    |Sparky The Bear|[holden, bajie]|
    +---------------+---------------+

    scala> namejsdf.registerTempTable("name")
    warning: there was one deprecation warning; re-run with -deprecation for details


    scala> hivectx.sql("select name,knows.friends from name where lovesPandas is not null").show
    +---------------+---------------+
    |           name|        friends|
    +---------------+---------------+
    |Sparky The Bear|[holden, bajie]|
    +---------------+---------------+


    scala> hivectx.sql("select name,knows.friends[0] from name where lovesPandas is not null").show
    +---------------+-----------------------------+
    |           name|knows.friends AS `friends`[0]|
    +---------------+-----------------------------+
    |Sparky The Bear|                       holden|
    +---------------+-----------------------------+

    scala> hivectx.sql("select name,knows.friends[1] from name where lovesPandas is not null").show
    +---------------+-----------------------------+
    |           name|knows.friends AS `friends`[1]|
    +---------------+-----------------------------+
    |Sparky The Bear|                        bajie|
    +---------------+-----------------------------+

     基于RDD

    除了读取数据,也可以基于RDD 创建SchemaRDD(DataFrame)。在Scala 中,带有case class 的RDD可以隐式转换成SchemaRDD(DataFrame)。
    scala> val custrdd=sc.textFile("/root/customers.txt")
    custrdd: org.apache.spark.rdd.RDD[String] = /root/customers.txt MapPartitionsRDD[221] at textFile at <console>:41
    scala> custrdd.foreach(println)
    tianyt_touch100 1 50
    wangwu 1 85
    zhangsan 1 20
    liuqin 0 56
    wangwu 0 47
    liuyang 1 32
    hello 0 100
    scala> case class customer(name:String,sex:Int,age:Int)
    defined class customer


    scala> val custdf=custrdd.map(x=>x.split(' ')).map(x=>customer(x(0),x(1).trim.toInt,x(2).trim.toInt)).toDF
    custdf: org.apache.spark.sql.DataFrame = [name: string, sex: int ... 1 more field]

    scala> custdf.show
    +---------------+---+---+
    |           name|sex|age|
    +---------------+---+---+
    |tianyt_touch100|  1| 50|
    |         wangwu|  1| 85|
    |       zhangsan|  1| 20|
    |         liuqin|  0| 56|
    |         wangwu|  0| 47|
    |        liuyang|  1| 32|
    |          hello|  0|100|
    +---------------+---+---+

    scala> custdf.select("name","sex").where("sex==0").show
    +------+---+
    |  name|sex|
    +------+---+
    |liuqin|  0|
    |wangwu|  0|
    | hello|  0|
    +------+---+


    scala> custdf.registerTempTable("t_customer")
    warning: there was one deprecation warning; re-run with -deprecation for details


    scala> hivectx.sql("select * from t_customer where sex=0").show
    +------+---+---+
    |  name|sex|age|
    +------+---+---+
    |liuqin|  0| 56|
    |wangwu|  0| 47|
    | hello|  0|100|
    +------+---+---+

     用户自定义函数

    用户自定义函数,也叫UDF,可以让我们使用Python/Java/Scala 注册自定义函数,并在SQL中调用。在Spark SQL 中,编写UDF 尤为简单。Spark SQL 不仅有自己的UDF 接口,也支持已有的Apache Hive UDF。

    Spark SQL UDF

    我们可以使用Spark 支持的编程语言编写好函数,然后通过Spark SQL 内建的方法传递进来,非常便捷地注册我们自己的UDF。
    scala> def strlen(str:String):Int={str.length}
    strlen: (str: String)Int
    scala> hivectx.udf.register("strlength",strlen _)
    res202: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

    scala> hivectx.sql("select name,strlength(name) len from t_customer").show
    +---------------+---+
    |           name|len|
    +---------------+---+
    |tianyt_touch100| 15|
    |         wangwu|  6|
    |       zhangsan|  8|
    |         liuqin|  6|
    |         wangwu|  6|
    |        liuyang|  7|
    |          hello|  5|
    +---------------+---+

    Hive UDF

    Spark SQL 也支持已有的Hive UDF。标准的Hive UDF 已经自动包含在了Spark SQL 中。如果需要支持自定义的Hive UDF,我们要确保该UDF 所在的JAR 包已经包含在了应用中。

    Spark SQL性能

    Spark SQL 可以利用其对类型的了解来高效地表示数据。当缓存数据时,Spark SQL 使用内存式的列式存储。这不仅仅节约了缓存的空间,而且尽可能地减少了后续查询中针对某
    几个字段查询时的数据读取。


    scala> conf.set("spark.sql.codegen","true")

    一些选项的配置需要给予特别的考量。第一个是spark.sql.codegen,这个选项可以让Spark SQL 把每条查询语句在运行前编译为Java 二进制代码。由于生成了专门运行指定查询的代码,codegen 可以让大型查询或者频繁重复的查询明显变快。然而,在运行特别快(1 ~ 2 秒)的即时查询语句时,codegen 有可能会增加额外开销,因为codegen 需要让每条查询走一遍编译的过程。5codegen 还是一个试验性的功能,但是我们推荐在所有大型的或者是重复运行的查询中使用codegen。

  • 相关阅读:
    ASP.NET WebApi 中使用swagger 构建在线帮助文档
    TortoiseSVN 分支创建与合并
    C# 图片处理
    使用Vue构建单页应用一
    使用nuget 打包并上传 nuget.org
    SignalR 教程二 服务端广播
    SignalR 教程一
    Visual Studio 中常用的快捷键
    EF for Mysql
    Advanced Plugin Concepts
  • 原文地址:https://www.cnblogs.com/playforever/p/9234746.html
Copyright © 2011-2022 走看看