zoukankan      html  css  js  c++  java
  • Flink实例(119):FLINK-SQL应用场景(18)一文了解基于Flink构建流批一体数仓的技术点(二)

    来源:https://mp.weixin.qq.com/s/ECe_bn9HzFzXTlfEnAaLBg

    2 Hive Catalog与Hive Dialect

    2.1 什么是Hive Catalog

      我们知道,Hive使用Hive Metastore(HMS)存储元数据信息,使用关系型数据库来持久化存储这些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元数据,这就是Hive Catalog的功能。

      Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元数据。Hive Catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。

    2.2 如何使用Hive Catalog

      HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就可以使用HiveCatalog。比如,我们通过FlinkSQL 的DDL语句创建一张kafka的数据源表,立刻就能查看该表的元数据信息。

      HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。

      普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。

      对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false

    尖叫提示:由于依赖Hive Metastore,所以必须开启Hive MetaStore服务

    代码中使用Hive Catalog

       EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
            TableEnvironment tableEnv = TableEnvironment.create(settings);
    
            String name            = "myhive";
            String defaultDatabase = "default";
            String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
    
            HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
            tableEnv.registerCatalog("myhive", hive);
            // 使用注册的catalog
            tableEnv.useCatalog("myhive");

    Flink SQLCli中使用Hive Catalog

    在FlinkSQL Cli中使用Hive Catalog很简单,只需要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:

    catalogs:
       - name: myhive
         type: hive
         default-database: default
         hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

    在FlinkSQL Cli中创建一张kafka表,该表默认为普通表,即is_generic=true

    CREATE TABLE user_behavior ( 
        `user_id` BIGINT, -- 用户id
        `item_id` BIGINT, -- 商品id
        `cat_id` BIGINT, -- 品类id
        `action` STRING, -- 用户行为
        `province` INT, -- 用户所在的省份
        `ts` BIGINT, -- 用户行为发生的时间戳
        `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
        `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
         WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
     ) WITH ( 
        'connector' = 'kafka', -- 使用 kafka connector
        'topic' = 'user_behavior', -- kafka主题
        'scan.startup.mode' = 'earliest-offset', -- 偏移量
        'properties.group.id' = 'group1', -- 消费者组
        'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
        'format' = 'json', -- 数据源格式为json
        'json.fail-on-missing-field' = 'true',
        'json.ignore-parse-errors' = 'false'
    );

    我们可以在Hive客户端中查看该表的元数据信息

    hive (default)> desc formatted  user_behavior;
    Table Parameters:                
           ...
            is_generic              true                
          ...         

    从上面的元数据信息可以看出,is_generic=true,说明该表是一张普通表,如果在Hive中去查看该表,则会报错。

    上面创建的表是普通表,该表不能使用Hive去查询。那么,该如何创建一张Hive兼容表呢?我们只需要在建表的属性中显示指定is_generic=false即可,具体如下:

    CREATE TABLE hive_compatible_tbl ( 
        `user_id` BIGINT, -- 用户id
        `item_id` BIGINT, -- 商品id
        `cat_id` BIGINT, -- 品类id
        `action` STRING, -- 用户行为
        `province` INT, -- 用户所在的省份
        `ts` BIGINT -- 用户行为发生的时间戳
     ) WITH ( 
        'connector' = 'kafka', -- 使用 kafka connector
        'topic' = 'user_behavior', -- kafka主题
        'scan.startup.mode' = 'earliest-offset', -- 偏移量
        'properties.group.id' = 'group1', -- 消费者组
        'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
        'format' = 'json', -- 数据源格式为json
        'json.fail-on-missing-field' = 'true',
        'json.ignore-parse-errors' = 'false',
        'is_generic' = 'false'
    );

    当我们在Hive中查看该表的元数据信息时,可以看出:is_generic =false

    hive (default)> desc formatted hive_compatible_tbl;
    Table Parameters:                
            ...           
            is_generic              false               
            ...

    我们可以使用FlinkSQL Cli或者HiveCli向该表中写入数据,然后分别通过FlinkSQL Cli和Hive Cli去查看该表数据的变化

    hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
    hive (default)> select * from hive_compatible_tbl;

    再在FlinkSQL Cli中查看该表,

    Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
                       user_id                   item_id                    action
                          2020                      1221                       buy

    同样,我们可以在FlinkSQL Cli中去向该表中写入数据:

    Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
    Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
    
                       user_id                   item_id                    action
                          2020                      1221                       buy
                          2020                      1222                       fav

    尖叫提示:对于Hive兼容的表,需要注意数据类型,具体的数据类型对应关系以及注意点如下

    Flink 数据类型Hive 数据类型
    CHAR(p) CHAR(p)
    VARCHAR(p) VARCHAR(p)
    STRING STRING
    BOOLEAN BOOLEAN
    TINYINT TINYINT
    SMALLINT SMALLINT
    INT INT
    BIGINT LONG
    FLOAT FLOAT
    DOUBLE DOUBLE
    DECIMAL(p, s) DECIMAL(p, s)
    DATE DATE
    TIMESTAMP(9) TIMESTAMP
    BYTES BINARY
    ARRAY LIST
    MAP<K, V> MAP<K, V>
    ROW STRUCT

    注意

    • Hive CHAR(p) 类型的最大长度为255
    • Hive VARCHAR(p)类型的最大长度为65535
    • Hive MAP类型的key仅支持基本类型,而Flink’s MAP 类型的key执行任意类型
    • Hive不支持联合数据类型,比如STRUCT
    • Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能处理 precision <= 9的 TIMESTAMP
    • Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE,TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
    • FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

    上面介绍了普通表和Hive兼容表,那么我们该如何使用Hive的语法进行建表呢?这个时候就需要使用Hive Dialect

    2.3 什么是Hive Dialect

    从Flink1.11.0开始,只要开启了Hive dialect配置,用户就可以使用HiveQL语法,这样我们就可以在Flink中使用Hive的语法使用一些DDL和DML操作。

    Flink目前支持两种SQL方言(SQL dialects),分别为:default和hive。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive

    2.4 如何使用Hive Dialect

    在SQL Cli中使用Hive dialect

    使用hive dialect只需要配置一个参数即可,该参数名称为:table.sql-dialect。我们就可以在sql-client-defaults.yaml配置文件中进行配置,也可以在具体的会话窗口中进行设定,对于SQL dialect的切换,不需要进行重启session。

    execution:
      planner: blink
      type: batch
      result-mode: table
    
    configuration:
      table.sql-dialect: hive

    如果我们需要在SQL Cli中进行切换hive dialect,可以使用如下命令:

    Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
    Flink SQL> set table.sql-dialect=default; -- 使用default dialect

    尖叫提示:一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错

    在Table API中配合dialect

    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    // 使用hive dialect
    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    // 使用 default dialect
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

    操作示例

    Flink SQL> set table.sql-dialect=hive;
    -- 使用Hive语法创建一张表
    CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
      `id` int COMMENT 'id',
      `name` string COMMENT '名称',
      `age` int COMMENT '年龄' 
    )
    COMMENT 'hive dialect表测试'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

    进入Hive客户端去查看该表的元数据信息

    desc formatted hive_dialect_tbl;
    col_name        data_type       comment
    # col_name              data_type               comment             
                     
    id                      int                                         
    name                    string                                      
    age                     int                                         
                     
    # Detailed Table Information             
    Database:               default                  
    Owner:                  null                     
    CreateTime:             Mon Dec 21 17:23:48 CST 2020     
    LastAccessTime:         UNKNOWN                  
    Retention:              0                        
    Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        
    Table Type:             MANAGED_TABLE            
    Table Parameters:                
            comment                 hive dialect表测试     
            is_generic              false               
            transient_lastDdlTime   1608542628          
                     
    # Storage Information            
    SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
    InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
    OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
    Compressed:             No                       
    Num Buckets:            -1                       
    Bucket Columns:         []                       
    Sort Columns:           []                       
    Storage Desc Params:             
            field.delim             ,                   
            serialization.format    ,                   

    很明显,该表是一张Hive兼容表,即is_generic=false

    使用FlinkSQLCli向该表中写入一条数据:

    Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

    我们也可以在Hive的Cli中去操作该表

    hive (default)> select * from hive_dialect_tbl;
    hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

    以下是使用Hive方言的一些注意事项。

    • Hive dialect只能用于操作Hive表,不能用于普通表。Hive方言应与HiveCatalog一起使用。
    • 虽然所有Hive版本都支持相同的语法,但是是否有特定功能仍然取决于使用的Hive版本。例如,仅在Hive-2.4.0或更高版本中支持更新数据库位置。
    • Hive和Calcite具有不同的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在使用Hive dialect时,必须使用反引号(`)引用此类关键字,才能将其用作标识符。
    • 在Hive中不能查询在Flink中创建的视图。

    当然,一旦开启了Hive dialect,我们就可以按照Hive的操作方式在Flink中去处理Hive的数据了,具体的操作与Hive一致,本文不再赘述。

  • 相关阅读:
    java1.8时间比较应用
    Window配置网络设定IPv4的固定IP自动被修改为169.254.*.*的问题
    osgi内嵌jetty容器添加过滤器
    jackson依赖的jar包
    Tomcat下ajax请求路径总结
    JavaScript中一个字符串变量突然变成了false的问题解析
    Ajax请求发送的FormData是"[object object]"
    jQuery的$.extend方法使用
    JS 异常:Uncaught RangeError: Maximum call stack size exceeded解析
    Java的重写equals但不重写hashCode方法的影响
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14328080.html
Copyright © 2011-2022 走看看