lookup join mysql demo: flink lookup join mysql demo
## join rowkey
-- Lookup Source -- kafka source CREATE TABLE user_log ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3) ,process_time as proctime() , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); drop table if exists hbase_behavior_conf ; CREATE TEMPORARY TABLE hbase_behavior_conf ( rowkey STRING ,cf ROW(item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3)) ) WITH ( 'connector' = 'hbase-2.2' ,'zookeeper.quorum' = 'thinkpad:12181' ,'table-name' = 'user_log' ,'lookup.cache.max-rows' = '10000' ,'lookup.cache.ttl' = '10 second' ,'lookup.async' = 'true' ); ---sinkTable CREATE TABLE kakfa_join_mysql_demo ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,behavior_map STRING ,ts TIMESTAMP(3) -- ,primary key (user_id) not enforced ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior_1' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts) SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts FROM user_log a left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c ON a.user_id = rowkey where a.behavior is not null;
测试 hbase 维表Lookup 功能正常,可以正常缓存数据,缓存也会定时失效,透查Hbase
* 注: 随便测试了一下性能,Hbase 维表有2 万多条数据,输入数据的关联字段都是Hbase 表主键,lookup.cache.ttl 为 1分钟,关联的 TPS 轻松到达: 2W
## join 非 rowkey
join 非主键时,hbase 维表启动时一次性读取 hbase 表全部数据,缓存到内存中,hbase source 状态 finish
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts) SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts FROM user_log a left join hbase_behavior_conf c ON a.item_id = cf.item_id where a.behavior is not null;
* 注: 这种情况在 flink 1.13 版本,不能完成 checkpoint
2021-08-20 09:12:39,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, hbase_behavior_conf, project=[cf]]], fields=[cf]) -> Calc(select=[cf, cf.item_id AS $f2]) (1/1) (55be6048b81e2eef95f25d78b3705a34) switched from RUNNING to FINISHED. 2021-08-20 09:13:30,356 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job 971bbb75c7334b0c2b9d218005efbf00 since some tasks of job 971bbb75c7334b0c2b9d218005efbf00 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.
Hbase 关联只有在关联键是Hbase 表的主键的时候,才能应用 Lookup 功能,非主键一次性加载,维表数据没办法更新,而且不能做 Checkpoint 影响 Flink job 的一致性。
在之前版本,SQL 功能不完善的时候,我们使用 UDF 的方式关联 Hbase,可以在 UDF 里面自己关联缓存、透查Hbase,也比较灵活。(没找到之前的代码,空了自己写一个,可以再水一篇博客)
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文