zoukankan      html  css  js  c++  java
  • Flink实战(七十六):FLINK-SQL使用基础(四)Catalogs

    1 简介

    Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

    数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

     2 yaml配置

    Catalogs 可以由 YAML 属性集合定义,并且在 SQL 客户端启动之前自动注册到运行环境中。

    用户可以指定在 SQL CLI 中哪些 catalog 要被作为当前的 catalog,以及哪个数据库的 catalog 可以用于当前数据库。

    catalogs:
       - name: catalog_1
         type: hive
         property-version: 1
         default-database: mydb2
         hive-conf-dir: <path of Hive conf directory>
       - name: catalog_2
         type: hive
         property-version: 1
         hive-conf-dir: <path of Hive conf directory>
    
    execution:
       ...
       current-catalog: catalog_1
       current-database: mydb1

    3 Catalog 类型

    3.1 GenericInMemoryCatalog

    GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

    3.2 JdbcCatalog

    JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。 参考 JdbcCatalog 文档 获取关于配置 JDBC catalog 的详细信息。

    3.3 HiveCatalog

    HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有 Hive 元数据的详细信息。

    警告 Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。

    3.4 用户自定义 Catalog

    Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。 想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

    CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

    4.1 使用 SQL DDL

    用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

    val tableEnv = ...
    
    // Create a HiveCatalog 
    val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
    
    // Register the catalog
    tableEnv.registerCatalog("myhive", catalog);
    
    // Create a catalog database
    tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
    
    // Create a catalog table
    tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
    
    tableEnv.listTables(); // should return the tables in current catalog and database.

    4.2 使用 Java/Scala

    用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

    import org.apache.flink.table.api._
    import org.apache.flink.table.catalog._
    import org.apache.flink.table.catalog.hive.HiveCatalog
    import org.apache.flink.table.descriptors.Kafka
    
    val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)
    
    // Create a HiveCatalog
    val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
    
    // Register the catalog
    tableEnv.registerCatalog("myhive", catalog)
    
    // Create a catalog database
    catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
    
    // Create a catalog table
    val schema = TableSchema.builder()
        .field("name", DataTypes.STRING())
        .field("age", DataTypes.INT())
        .build()
    
    catalog.createTable(
            new ObjectPath("mydb", "mytable"),
            new CatalogTableImpl(
                schema,
                new Kafka()
                    .version("0.11")
                    ....
                    .startFromEarlist()
                    .toProperties(),
                "my comment"
            ),
            false
        )
    
    val tables = catalog.listTables("mydb") // tables should contain "mytable"

    5 Catalog API

    注意:这里只列出了编程方式的 Catalog API,用户可以使用 SQL DDL 实现许多相同的功能。 关于 DDL 的详细信息请参考 SQL CREATE DDL

    5.1 数据库操作

    // create database
    catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
    
    // drop database
    catalog.dropDatabase("mydb", false);
    
    // alter database
    catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
    
    // get databse
    catalog.getDatabase("mydb");
    
    // check if a database exist
    catalog.databaseExists("mydb");
    
    // list databases in a catalog
    catalog.listDatabases("mycatalog");

    5.2 表操作

    // create table
    catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // drop table
    catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
    
    // alter table
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // rename table
    catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
    
    // get table
    catalog.getTable("mytable");
    
    // check if a table exist or not
    catalog.tableExists("mytable");
    
    // list tables in a database
    catalog.listTables("mydb");

    5.3 视图操作

    // create view
    catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
    
    // drop view
    catalog.dropTable(new ObjectPath("mydb", "myview"), false);
    
    // alter view
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
    
    // rename view
    catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
    
    // get view
    catalog.getTable("myview");
    
    // check if a view exist or not
    catalog.tableExists("mytable");
    
    // list views in a database
    catalog.listViews("mydb");

    5.4 分区操作

    // create view
    catalog.createPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // drop partition
    catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
    
    // alter partition
    catalog.alterPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // get partition
    catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // check if a partition exist or not
    catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // list partitions of a table
    catalog.listPartitions(new ObjectPath("mydb", "mytable"));
    
    // list partitions of a table under a give partition spec
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // list partitions of a table by expression filter
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

    5.5 函数操作

    // create function
    catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // drop function
    catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
    
    // alter function
    catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // get function
    catalog.getFunction("myfunc");
    
    // check if a function exist or not
    catalog.functionExists("myfunc");
    
    // list functions in a database
    catalog.listFunctions("mydb");

    6 通过 Table API 和 SQL Client 操作 Catalog

    6.1 注册 Catalog

    用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。

    tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

    6.2 修改当前的 Catalog 和数据库

    Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。

    tableEnv.useCatalog("myCatalog");
    tableEnv.useDatabase("myDb");

    通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。

    tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");

    6.3 列出可用的 Catalog

    tableEnv.listCatalogs();

    6.4 列出可用的数据库

    tableEnv.listDatabases();

    6.5 列出可用的表

    tableEnv.listTables();

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13934764.html

  • 相关阅读:
    INF文件的语法解说转
    VC的拨号上网程序
    sql修复
    flash media server 2 下载 + 无限制序列号
    [原] ASPNET2.0中如何历遍GRIDVIEW
    [net2.0] ASPNET内置安全认证架构 的灵异问题~~~~
    SnagIt的Visual Studio Team System插件
    [翻译]使用HtmlAgilityPack更好的HTML分析和验证
    哪些自动化测试工具支持AJAX
    VSTT Rosario CTP
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13934764.html
Copyright © 2011-2022 走看看