zoukankan      html  css  js  c++  java
  • Spark笔记之Catalog

    一、什么是Catalog

    Spark SQL提供了执行sql语句的支持,sql语句是以表的方式组织使用数据的,而表本身是如何组织存储的呢,肯定是存在一些元数据之类的东西了,Catalog就是Spark 2.0之后提供的访问元数据的类:

    image

    Catalog提供一些API用来对数据库、表、视图、缓存、列、函数(UDF/UDAF)进行操作,下文将一一介绍。

    二、如何使用Catalog

    得到Catalog:

    val spark = SparkSession.builder().master("local[*]").appName("catalog-study").getOrCreate()
    val catalog = spark.catalog 

    Catalog相关的代码存放在org.apache.spark.sql.catalog下:

    image

    上面的Catalog只是一个接口定义规范,具体实现还有一个org.apache.spark.sql.internal.CatalogImpl,如果只是使用Spark完成工作的话只阅读接口定义基本够用了。

    三、相关API

    数据库相关

    看数据库相关的操作之前先看一下Catalog对数据库的表示:

    /**
     * A database in Spark, as returned by the `listDatabases` method defined in [[Catalog]].
     *
     * @param name name of the database.
     * @param description description of the database.
     * @param locationUri path (in the form of a uri) to data files.
     * @since 2.0.0
     */
    @InterfaceStability.Stable
    class Database(
        val name: String,
        @Nullable val description: String,
        val locationUri: String)
      extends DefinedByConstructorParams {
    
      override def toString: String = {
        "Database[" +
          s"name='$name', " +
          Option(description).map { d => s"description='$d', " }.getOrElse("") +
          s"path='$locationUri']"
      }
    
    }

    Catalog使用三个字段表示一个数据库:

    name:数据库名字

    descripttion:数据库描述,可以认为是注释

    locationUri:数据库的数据保存位置

    currentDatabase: String

    返回当前使用的数据库,相当于select database();

    setCurrentDatabase(dbName: String): Unit
    设置当前使用的数据库,相当于use database_name;
    listDatabases(): Dataset[Database]

    查看所有数据库,相当于show databases;

    getDatabase(dbName: String): Database

    获取某数据库的元数据,返回值是Database类型的,如果指定的数据库不存在则会@throws[AnalysisException]("database does not exist")

    databaseExists(dbName: String): Boolean

    判断某个数据库是否已经存在,返回boolean值。

    为了避免抛异常对单个数据库进行getDatabase获取元数据之前还是先使用databaseExists确定数据库已经存在。

    表/视图相关

    同样的,对表或视图Catalog也用一个class来表示:

    /**
     * A table in Spark, as returned by the `listTables` method in [[Catalog]].
     *
     * @param name name of the table.
     * @param database name of the database the table belongs to.
     * @param description description of the table.
     * @param tableType type of the table (e.g. view, table).
     * @param isTemporary whether the table is a temporary table.
     * @since 2.0.0
     */
    @InterfaceStability.Stable
    class Table(
        val name: String,
        @Nullable val database: String,
        @Nullable val description: String,
        val tableType: String,
        val isTemporary: Boolean)
      extends DefinedByConstructorParams {
    
      override def toString: String = {
        "Table[" +
          s"name='$name', " +
          Option(database).map { d => s"database='$d', " }.getOrElse("") +
          Option(description).map { d => s"description='$d', " }.getOrElse("") +
          s"tableType='$tableType', " +
          s"isTemporary='$isTemporary']"
      }
    
    }

    name:表的名字

    database:表所属的数据库的名字

    description:表的描述信息

    tableType:用于区分是表还是视图,两个取值:table或view。

    isTemporary:是否是临时表或临时视图,解释一下啥是临时表,临时表就是使用Dataset或DataFrame的createOrReplaceTempView等类似的API注册的视图或表,当此次Spark任务结束后这些表就没了,再次使用的话还要再进行注册,而非临时表就是在Hive中真实存在的,开启Hive支持就能够直接使用的,本次Spark任务结束后表仍然能存在,下次启动不需要重新做任何处理就能够使用,表是持久的,这种不是临时表。

    listTables(): Dataset[Table]

    查看所有表或视图,相当于show tables;

    listTables(dbName: String): Dataset[Table]

    返回指定数据库下的表或视图,如果指定的数据库不存在则会抛出@throws[AnalysisException]("database does not exist")表示数据库不存在。

    getTable(tableName: String): Table
    getTable(dbName: String, tableName: String): Table

    获取表的元信息,不存在则会抛出异常。

    tableExists(tableName: String): Boolean
    tableExists(dbName: String, tableName: String): Boolean

    判断表或视图是否存在,返回boolean值。

    dropTempView(viewName: String): Boolean
    dropGlobalTempView(viewName: String): Boolean

    使用createOrReplaceTempView类似API注册的临时视图可以使用此方法删除,如果这个视图已经被缓存过的话会自动清除缓存。

    recoverPartitions(tableName: String): Unit
    isCached(tableName: String): Boolean

    用于判断一个表否已经缓存过了。

    cacheTable(tableName: String): Unit
    cacheTable(tableName: String, storageLevel: StorageLevel): Unit
    用于缓存表
    uncacheTable(tableName: String): Unit

    对表取消缓存

    clearCache(): Unit

    清空所有缓存

    refreshTable(tableName: String): Unit

    Spark为了性能考虑,对表的元数据做了缓存,所以当被缓存的表已经改变时也必须刷新元数据重新缓存。

    refreshByPath(path: String): Unit
    createTable(tableName: String, path: String): DataFrame
    createTable(tableName: String, path: String, source: String): DataFrame
    createTable(tableName: String, source: String, options: java.util.Map[String, String]): DataFrame
    createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
    createTable(tableName: String, source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame
    createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame  
    
     

    函数相关

    Catalog对函数的表示:

    /**
     * A user-defined function in Spark, as returned by `listFunctions` method in [[Catalog]].
     *
     * @param name name of the function.
     * @param database name of the database the function belongs to.
     * @param description description of the function; description can be null.
     * @param className the fully qualified class name of the function.
     * @param isTemporary whether the function is a temporary function or not.
     * @since 2.0.0
     */
    @InterfaceStability.Stable
    class Function(
        val name: String,
        @Nullable val database: String,
        @Nullable val description: String,
        val className: String,
        val isTemporary: Boolean)
      extends DefinedByConstructorParams {
    
      override def toString: String = {
        "Function[" +
          s"name='$name', " +
          Option(database).map { d => s"database='$d', " }.getOrElse("") +
          Option(description).map { d => s"description='$d', " }.getOrElse("") +
          s"className='$className', " +
          s"isTemporary='$isTemporary']"
      }
    
    }

    name:函数的名字

    database:函数注册在哪个数据库下,函数是跟数据库绑定的

    description:对函数的描述信息,可以理解成注释

    className:函数其实就是一个class,调用函数就是调用类的方法,className表示函数对应的class的全路径类名

    isTemporary:是否是临时函数。

    listFunctions(): Dataset[Function]

    列出当前数据库下的所有函数,包括注册的临时函数。

    listFunctions(dbName: String): Dataset[Function]

    列出指定数据库下注册的所有函数,包括临时函数,如果指定的数据库不存在的话则会抛出@throws[AnalysisException]("database does not exist")表示数据库不存在。

    getFunction(functionName: String): Function
    getFunction(dbName: String, functionName: String): Function

    获取函数的元信息,函数不存在则会抛出异常。

    functionExists(functionName: String): Boolean
    functionExists(dbName: String, functionName: String): Boolean
    判断函数是否存在,返回boolean值。

    对表或视图的列相关的操作

    Catalog对列的表示:

    /**
     * A column in Spark, as returned by `listColumns` method in [[Catalog]].
     *
     * @param name name of the column.
     * @param description description of the column.
     * @param dataType data type of the column.
     * @param nullable whether the column is nullable.
     * @param isPartition whether the column is a partition column.
     * @param isBucket whether the column is a bucket column.
     * @since 2.0.0
     */
    @InterfaceStability.Stable
    class Column(
        val name: String,
        @Nullable val description: String,
        val dataType: String,
        val nullable: Boolean,
        val isPartition: Boolean,
        val isBucket: Boolean)
      extends DefinedByConstructorParams {
    
      override def toString: String = {
        "Column[" +
          s"name='$name', " +
          Option(description).map { d => s"description='$d', " }.getOrElse("") +
          s"dataType='$dataType', " +
          s"nullable='$nullable', " +
          s"isPartition='$isPartition', " +
          s"isBucket='$isBucket']"
      }
    
    }

    name:列的名字

    description:列的描述信息,与注释差不多

    dataType:列的数据类型

    nullable:列是否允许为null

    isPartition:是否是分区列

    isBucket:是否是桶列

    listColumns(tableName: String): Dataset[Column]
    listColumns(dbName: String, tableName: String): Dataset[Column]

    列出指定的表或视图有哪些列,表不存在则抛异常。

    相关资料:

    1.  Spark 2.0介绍:Catalog API介绍和使用 

    2. Java Doc: Class Catalog

    .

  • 相关阅读:
    【AtCoder】ARC067 F
    【AtCoder】ARC095 E
    【BZOJ】4559: [JLoi2016]成绩比较 计数DP+排列组合+拉格朗日插值
    【CodeForces】961 F. k-substrings 字符串哈希+二分
    【CodeForces】961 G. Partitions 斯特林数
    【BZOJ】2310: ParkII 插头DP
    【BZOJ】2331: [SCOI2011]地板 插头DP
    webpack从0开始---(二)
    webpack从0开始---(一)
    前端基础知识(不应需要思考的知识点)三
  • 原文地址:https://www.cnblogs.com/cc11001100/p/9463578.html
Copyright © 2011-2022 走看看