zoukankan      html  css  js  c++  java
  • flink sql join hbase demo

    lookup join mysql demo:  flink lookup join mysql demo

    ## join rowkey

    -- Lookup Source
    -- kafka source
    CREATE TABLE user_log (
    user_id STRING
    ,item_id STRING
    ,category_id STRING
    ,behavior STRING
    ,ts TIMESTAMP(3)
    ,process_time as proctime()
    , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
    'connector' = 'kafka'
    ,'topic' = 'user_behavior'
    ,'properties.bootstrap.servers' = 'localhost:9092'
    ,'properties.group.id' = 'user_log'
    ,'scan.startup.mode' = 'group-offsets'
    ,'format' = 'json'
    );
    
    drop table if exists hbase_behavior_conf ;
    CREATE TEMPORARY TABLE hbase_behavior_conf (
    rowkey STRING
    ,cf ROW(item_id STRING
    ,category_id STRING
    ,behavior STRING
    ,ts TIMESTAMP(3))
    ) WITH (
    'connector' = 'hbase-2.2'
    ,'zookeeper.quorum' = 'thinkpad:12181'
    ,'table-name' = 'user_log'
    ,'lookup.cache.max-rows' = '10000'
    ,'lookup.cache.ttl' = '10 second'
    ,'lookup.async' = 'true'
    );
    
    ---sinkTable
    CREATE TABLE kakfa_join_mysql_demo (
    user_id STRING
    ,item_id STRING
    ,category_id STRING
    ,behavior STRING
    ,behavior_map STRING
    ,ts TIMESTAMP(3)
    -- ,primary key (user_id) not enforced
    ) WITH (
    'connector' = 'kafka'
    ,'topic' = 'user_behavior_1'
    ,'properties.bootstrap.servers' = 'localhost:9092'
    ,'properties.group.id' = 'user_log'
    ,'scan.startup.mode' = 'group-offsets'
    ,'format' = 'json'
    );
    
    INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
    SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
    FROM user_log a
    left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
    ON a.user_id = rowkey
    where a.behavior is not null;

    测试 hbase 维表Lookup 功能正常,可以正常缓存数据,缓存也会定时失效,透查Hbase


    * 注: 随便测试了一下性能,Hbase 维表有2 万多条数据,输入数据的关联字段都是Hbase 表主键,lookup.cache.ttl 为 1分钟,关联的 TPS 轻松到达: 2W

    ## join 非 rowkey

    join 非主键时,hbase 维表启动时一次性读取 hbase 表全部数据,缓存到内存中,hbase source 状态 finish

    INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
    SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
    FROM user_log a
    left join hbase_behavior_conf c ON a.item_id = cf.item_id
    where a.behavior is not null;


    * 注: 这种情况在 flink 1.13 版本,不能完成 checkpoint

    2021-08-20 09:12:39,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, hbase_behavior_conf, project=[cf]]], fields=[cf]) -> Calc(select=[cf, cf.item_id AS $f2]) (1/1) (55be6048b81e2eef95f25d78b3705a34) switched from RUNNING to FINISHED.
    2021-08-20 09:13:30,356 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job 971bbb75c7334b0c2b9d218005efbf00 since some tasks of job 971bbb75c7334b0c2b9d218005efbf00 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.

    Hbase 关联只有在关联键是Hbase 表的主键的时候,才能应用 Lookup 功能,非主键一次性加载,维表数据没办法更新,而且不能做 Checkpoint 影响 Flink job 的一致性。

    在之前版本,SQL 功能不完善的时候,我们使用 UDF 的方式关联 Hbase,可以在 UDF 里面自己关联缓存、透查Hbase,也比较灵活。(没找到之前的代码,空了自己写一个,可以再水一篇博客)

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    产品需求说明书PRD模版
    会编程的 AI + 会修 Bug 的 AI,等于什么 ?
    会编程的 AI + 会修 Bug 的 AI,等于什么 ?
    会编程的 AI + 会修 Bug 的 AI,等于什么 ?
    luogu P1164 小A点菜
    luogu P1347 排序
    luogu P1195 口袋的天空
    luogu P1182 数列分段Section II
    luogu P1332 血色先锋队
    luogu P1983 车站分级
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/15165175.html
Copyright © 2011-2022 走看看