zoukankan      html  css  js  c++  java
  • Flink 1.10 SQL 写HBase

    Hbase 也是我们很常用的数据存储组件,所以提前尝试下用SQL 写Hbase,中间也遇到一些坑,跟大家分享一下。

    官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#hbase-connector

    --------------------------20200508--------------

    新增: Flink 1.10 SQL 写 Hbase 数据无法写入hbase问题

    ---------------------------------------------------

    HBase Connector 支持这些操作: Source: Batch Sink: Batch Sink: Streaming Append Mode Sink: Streaming Upsert Mode Temporal Join: Sync Mode

    (选择性忽略Batch 的操作了,上次跟一个朋友说,HBase connector 只支持 sink 操作。)

    HBase connector  可以在upsert模式下运行,以使用查询定义的密钥与外部系统交换UPSERT / DELETE消息。

    对于 append-only 查询,connector 还可以在 append  模式下操作,仅与外部系统交换INSERT消息。

    官网示例如下:

    CREATE TABLE MyUserTable (
      hbase_rowkey_name rowkey_type,
      hbase_column_family_name1 ROW<...>,
      hbase_column_family_name2 ROW<...>
    ) WITH (
      'connector.type' = 'hbase', -- required: specify this table type is hbase
      
      'connector.version' = '1.4.3',          -- required: valid connector versions are "1.4.3"
      
      'connector.table-name' = 'hbase_table_name',  -- required: hbase table name
      
      'connector.zookeeper.quorum' = 'localhost:2181', -- required: HBase Zookeeper quorum configuration
      'connector.zookeeper.znode.parent' = '/test',    -- optional: the root dir in Zookeeper for HBase cluster.
                                                       -- The default value is "/hbase".
    
      'connector.write.buffer-flush.max-size' = '10mb', -- optional: writing option, determines how many size in memory of buffered
                                                        -- rows to insert per round trip. This can help performance on writing to JDBC
                                                        -- database. The default value is "2mb".
    
      'connector.write.buffer-flush.max-rows' = '1000', -- optional: writing option, determines how many rows to insert per round trip.
                                                        -- This can help performance on writing to JDBC database. No default value,
                                                        -- i.e. the default flushing is not depends on the number of buffered rows.
    
      'connector.write.buffer-flush.interval' = '2s',   -- optional: writing option, sets a flush interval flushing buffered requesting
                                                        -- if the interval passes, in milliseconds. Default value is "0s", which means
                                                        -- no asynchronous flush thread will be scheduled.
    )

    Columns: HBase表中的 column families 必须声明为ROW类型字段名称映射到column families 名称而嵌套的字段名称映射到 column qualifier 名称。 无需在架构中声明所有 families 和 qualifiers ,用户可以声明必要的内容。 除ROW type字段外,原子类型的唯一 一个字段(例如STRING,BIGINT)将被识别为表的 rowkey。 row key 字段的名称没有任何限制。

    Temporary join: 针对HBase的 Lookup join 不使用任何缓存; 始终总是通过HBase客户端直接查询数据。

    之前一直看英文,上面的描述看得似是而非的,没能理解到,Flink 中 建HBase 表的 DDL 的规则,简单列下:

      1、Flink HBase 表只能有一个原子类型的字段,就是 rowkey(习惯是放在第一个字段,名字随意)

      2、Flink HBase 表的其他字段都是ROW 类型的,并且字段名与 HBase 表中 的 column family 名一样(如果只有一个列族,除了rowkey 就只有一个字段)

      3、ROW 类型的字段嵌套的字段名称就是该列族下的列名

    下面看个实例:

    首先需要添加flink-hbase connector 对应的依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hbase_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    选择对应版本,Flink 版本是 1.10

    必须要说下,现在 HBase 版本只支持 1.4.3,我的HBase 是2.1.4 的,懒得换了,直接修改Flink 代码,绕过版本验证(可以正常写数,没有经过版本匹配和严格的测试,可能会有未知的问题)

    sql 文件如下:

    -- 读 kafka write to json
    --sourceTable
    CREATE TABLE user_log(
        user_id VARCHAR,
        item_id VARCHAR,
        category_id VARCHAR,
        behavior VARCHAR,
        ts TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'user_behavior',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'connector.startup-mode' = 'earliest-offset',
        'format.type' = 'json'
    );
    
    --sinkTable
    CREATE TABLE user_log_sink (
        rowkey string,
        cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))
    ) WITH (
        'connector.type' = 'hbase',
        -- 目前只支持 1.4.3 ,   HBaseValidator.CONNECTOR_VERSION_VALUE_143 写死了 1.4.3, 改成 2.1.4 可以正常写数到 hbase
        -- 生产慎用
        'connector.version' = '2.1.4',                    -- hbase vesion
        'connector.table-name' = 'venn',                  -- hbase table name
        'connector.zookeeper.quorum' = 'venn:2181',       -- zookeeper quorum
        'connector.zookeeper.znode.parent' = '/hbase',    -- hbase znode in zookeeper
        'connector.write.buffer-flush.max-size' = '10mb', -- max flush size
        'connector.write.buffer-flush.max-rows' = '1000', -- max flush rows
        'connector.write.buffer-flush.interval' = '2s'    -- max flush interval
    );
    --insert
    INSERT INTO user_log_sink
    SELECT user_id,
      ROW(item_id, category_id, behavior, ts ) as cf
    FROM user_log;

    简单描述下 sink 表:

    有个string类型的rowkey, 还有一个 列 cf (HBase 表的列族),cf 下面有 item_id/category_id/behavior/ts 4个列

    执行的sql 就很简单了,从Kafka 的 source 表读数据,写到 sink 表。 

     查看写入到 HBase 中的数据:

    hbase(main):001:0> count 'venn'
    Current count: 1000, row: 561558                                                                                                                           
    1893 row(s)
    Took 1.8662 seconds                                                                                                                                        
    => 1893
    hbase(main):002:0> scan 'venn',{LIMIT=>1}
    ROW                                     COLUMN+CELL                                                                                                        
     1000034                                column=cf:behavior, timestamp=1584615437261, value=pv                                                              
     1000034                                column=cf:category_id, timestamp=1584615437261, value=982926                                                       
     1000034                                column=cf:item_id, timestamp=1584615437261, value=800784                                                           
     1000034                                column=cf:ts, timestamp=1584615437261, value=x00x00x01_xF4x1F`xD0                                            
    1 row(s)
    Took 0.0834 seconds   

    ------------20200427 改-----------

    根本不用改源码,直接将 sql  properties 写成 '1.4.3' 就可以了,执行的时候,不会去校验hbase 的版本

    ----------------------------------------

    搞定 

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    java基础(十九)IO流(二)
    java基础(十八)IO流(一)
    java基础(十七)集合(四)
    java基础(十六)集合(三)
    java基础(十五)集合(二)
    java基础(十四)集合(一)
    oracle中trim,ltrim,rtrim函数用法
    最详细的Log4j使用教程
    SAP ERP和ORACLE ERP的区别是哪些?
    PLS-00221: 'function' 不是过程或尚未定义
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/12544251.html
Copyright © 2011-2022 走看看