zoukankan      html  css  js  c++  java
  • spark SQL之Catalog API使用

    Catalog API简介

        Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面是管理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及持久化的元数据(比如Hivemeta store或者HCatalog)。

    Spark的早期版本是没有标准的API来访问这些元数据的。用户通常使用查询语句(比如show tables)来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同元数据类型的操作也是不一样的。

    这种情况在Spark 2.0中得到改变。Spark 2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。

    访问Catalog

    Catalog可以通过SparkSession获取,下面代码展示如何获取Catalog:

    import org.apache.spark.sql.SparkSession
    
    val sparkSession = SparkSession.builder.appName("example").enableHiveSupport().getOrCreate()
    
    val catalog = sparkSession.catalog

    访问databases

    我们一旦创建好catalog对象之后,我们可以使用它来查询元数据中的数据库,catalog上的API返回的结果全部都是dataset。

    scala> catalog.listDatabases().show(false)
    
    +----------+--------------------+--------------------+
    |      name|   description      |      locationUri   |
    +----------+--------------------+--------------------+
    |data_clean|                    |hdfs://asiainfo-1...|
    |default   |Default Hive data...|hdfs://asiainfo-1...|
    +----------+--------------------+--------------------+
    
    scala> catalog.listDatabases().select("name").show(false)
    +-----------------------+
    |name                   |
    +-----------------------+
    |iteblog                |
    |default                |
    +-----------------------+

    listDatabases返回元数据中所有的数据库。

    默认情况下,元数据仅仅只有名为default的数据库。如果是Hive元数据,那么它会从Hive元数据中获取所有的数据库。listDatabases返回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据。

    使用createTempView注册Dataframe

    在Spark的早期版本,我们使用registerTempTable来注册Dataframe。然而在Spark 2.0中,这个API已经被遗弃了。registerTempTable名字很让人误解,因为用户会认为这个函数会将Dataframe持久化并且保证这个临时表,但是实际上并不是这样的,所以社区才有意将它替换成createTempViewcreateTempView的使用方法如下:

    df.createTempView("temp")

    查询表

    正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中的表。它会展示出Spark SQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是default)中的表。如下:

    scala> catalog.listTables().select("name").show(false)
    +----------------------------------------+
    |name                                    |
    +----------------------------------------+
    |city_to_level                           |
    |table2                                  |
    |test                                    |
    |ticket_order                            |
    |tmp1_result                             |
    +----------------------------------------+

    判断某个表是否缓存

    我们可以使用Catalog提供的API来检查某个表是否缓存。如下:

    scala> println(catalog.isCached("temp"))
    false

    上面判断temp表是否缓存,结果输出false。默认情况下表是不会被缓存的,我们可以手动缓存某个表,如下:

    scala>  df.cache()
    res4: df.type = [_c0: string, _c1: string ... 2 more fields]
     
    scala> println(catalog.isCached("temp"))
    true

    现在iteblog表已经被缓存了,所有现在的输出结构是true。

    删除view

    我们可以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么它会从元数据中删除表。

     scala> catalog.dropTempView("iteblog"

    查询已经注册的函数

    我们不仅可以使用Catalog API操作表,还可以用它操作UDF。下面代码片段展示SparkSession上所有已经注册号的函数,当然也包括了Spark内置的函数。

    scala> catalog.listFunctions().select("name","className","isTemporary").show(100, false)

    +---------------------+-----------------------------------------------------------------------+-----------+
    |name                 |className                                                              |isTemporary|
    +---------------------+-----------------------------------------------------------------------+-----------+
    |!                    |org.apache.spark.sql.catalyst.expressions.Not                          |true       |
    |%                    |org.apache.spark.sql.catalyst.expressions.Remainder                    |true       |
    |&                    |org.apache.spark.sql.catalyst.expressions.BitwiseAnd                   |true       |
    |*                    |org.apache.spark.sql.catalyst.expressions.Multiply                     |true       |
    |+                    |org.apache.spark.sql.catalyst.expressions.Add                          |true       |
    +---------------------+-----------------------------------------------------------------------+-----------+

    参考:https://blog.csdn.net/pengzonglu7292/article/details/81044857

  • 相关阅读:
    Appium问题解决方案(2)- AttributeError:module 'appium.webdriver' has no attribute 'Remote'
    Azure Messaging-ServiceBus Messaging消息队列技术系列8-服务总线配额
    Azure Messaging-ServiceBus Messaging消息队列技术系列7-消息事务
    Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执
    Azure Messaging-ServiceBus Messaging消息队列技术系列5-重复消息:at-least-once at-most-once
    Azure Messaging-ServiceBus Messaging消息队列技术系列4-复杂对象消息是否需要支持序列化和消息持久化
    Azure Messaging-ServiceBus Messaging消息队列技术系列3-消息顺序保证
    [博客迁移]探索Windows Azure 监控和自动伸缩系列3
    [博客迁移]探索Windows Azure 监控和自动伸缩系列2
    [博客迁移]探索Windows Azure 监控和自动伸缩系列1
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/10281714.html
Copyright © 2011-2022 走看看