zoukankan      html  css  js  c++  java
  • Flink Sql jdbc connector

    本文参考官网 Table & SQL Connectors JDBC SQL Connector https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache

    jdbc 依赖

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_2.11</artifactId>
      <version>1.12.0</version>
    </dependency>
    ```
    这里使用 mysql 所以还需要 mysql 的依赖
    ```xml
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql-connector.version}</version>
    </dependency>

    scan source : bounded

    一次执行,读完就任务结束

    drop table if exists mysql_user_log ;
    CREATE TABLE mysql_user_log (
        id int
      ,user_id VARCHAR
      ,item_id VARCHAR
      ,category_id VARCHAR
      ,behavior VARCHAR
      ,ts TIMESTAMP(3)
      ,create_time TIMESTAMP(3)
      ,insert_time TIMESTAMP(3)
      ,primary key (id) not enforced
    ) WITH (
       'connector' = 'jdbc'
       ,'url' = 'jdbc:mysql://venn:3306/venn'
       ,'table-name' = 'user_log'
       ,'username' = 'root'
       ,'password' = '123456'
    );

    注: 由于 flink 本身并不存储数据,所以主键是 'not enforced' 未执行的

    时态表 join 

    -- Lookup Source: Sync Mode
    -- kafka source
    CREATE TABLE user_log (
      user_id VARCHAR
      ,item_id VARCHAR
      ,category_id VARCHAR
      ,behavior INT
      ,ts TIMESTAMP(3)
      ,process_time as proctime()
      , WATERMARK FOR ts AS ts
    ) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_behavior'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'group-offsets'
      ,'format' = 'json'
    );
    
    -- mysql source
    drop table if exists mysql_behavior_conf ;
    CREATE TABLE mysql_behavior_conf (
        id int
      ,behavior VARCHAR
      ,behavior_map VARCHAR
      ,update_time TIMESTAMP(3)
    --   ,process_time as proctime()
      ,primary key (id) not enforced
      , WATERMARK FOR update_time AS update_time
    ) WITH (
       'connector' = 'jdbc'
       ,'url' = 'jdbc:mysql://venn:3306/venn'
       ,'table-name' = 'behavior_conf'
       ,'username' = 'root'
       ,'password' = '123456'
       ,'scan.partition.column' = 'id'
       ,'scan.partition.num' = '1'
       ,'scan.partition.lower-bound' = '0'
       ,'scan.partition.upper-bound' = '9999'
       ,'lookup.cache.max-rows' = '1000'
       ,'lookup.cache.ttl' = '2 minute'
    );
    
    ---sinkTable
    CREATE TABLE kakfa_join_mysql_demo (
      user_id VARCHAR
      ,item_id VARCHAR
      ,category_id VARCHAR
      ,behavior INT
      ,behavior_map VARCHAR
      ,ts TIMESTAMP(3)
      ,primary key (user_id) not enforced
    ) WITH (
    'connector' = 'upsert-kafka'
      ,'topic' = 'user_behavior_sink'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'key.format' = 'json'
      ,'key.json.ignore-parse-errors' = 'true'
      ,'value.format' = 'json'
      ,'value.json.fail-on-missing-field' = 'false'
      ,'value.fields-include' = 'ALL'
    );
    
    ---sink 左表的事件时间字段
    INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
    SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.behavior_map, a.ts
    FROM user_log a
      left join mysql_behavior_conf for system_time as of a.ts as b on a.behavior = b.id
    where a.behavior is not null;

    mysql_behavior_conf 表数据:

    id,behavior,behavior_map,update_time
    1,pv,pv-map,2021-02-01 06:10:54
    2,buy,buy-map,2021-02-01 06:10:56
    3,cart,cart-map,2021-02-01 06:10:58
    4,fav,fav-map,2021-02-01 06:11:00
    5,pv_0,map_0,2021-02-02 07:41:24
    6,pv_1,map_1,2021-02-02 07:41:25
    7,pv_2,map_2,2021-02-02 07:41:25
    8,pv_3,map_3,2021-02-02 07:41:26
    9,pv_4,map_4,2021-02-02 07:41:26
    10,pv_5,map_5,2021-02-02 07:41:26
    ... 到 10000

    user_log 数据:

    {"user_id": "652863", "item_id":"4967749", "category_id": "1320293", "behavior": "1", "ts": "2021-02-02 14:50:00"}
    {"user_id": "801610", "item_id":"900305", "category_id": "634390", "behavior": "2", "ts": "2021-02-02 14:50:11"}
    {"user_id": "411478", "item_id":"3259235", "category_id": "2667323", "behavior": "3", "ts": "2021-02-02 14:50:22"}
    {"user_id": "431664", "item_id":"764155", "category_id": "2520377", "behavior": "4", "ts": "2021-02-02 14:50:33"}
    {"user_id": "431664", "item_id":"764155", "category_id": "2520377", "behavior": "1001", "ts": "2021-02-02 16:51:58"}

    输出结果如下

    {"user_id":"user_id_813","item_id":"item_id_813","category_id":"category_id_813","behavior":813,"behavior_map":"map_808","ts":"2021-02-02 16:50:40"}
    {"user_id":"user_id_633","item_id":"item_id_633","category_id":"category_id_633","behavior":633,"behavior_map":"map_628","ts":"2021-02-02 16:50:44"}
    {"user_id":"user_id_8425","item_id":"item_id_8425","category_id":"category_id_8425","behavior":8425,"behavior_map":null,"ts":"2021-02-02 16:50:48"}
    {"user_id":"user_id_8701","item_id":"item_id_8701","category_id":"category_id_8701","behavior":8701,"behavior_map":null,"ts":"2021-02-02 16:50:52"}
    {"user_id":"user_id_9983","item_id":"item_id_9983","category_id":"category_id_9983","behavior":9983,"behavior_map":null,"ts":"2021-02-02 16:50:56"}
    {"user_id":"431664","item_id":"764155","category_id":"2520377","behavior":7000,"behavior_map":null,"ts":"2021-02-02 16:51:56"}

    参数 "scan.partition.lower-bound" 和 “scan.partition.upper-bound” 是生效的, 'lookup.cache.max-rows' 和 'lookup.cache.ttl' 没有生效

    behavior_map 为 null 的是,没有关联到数据的

    官网介绍,如果不在缓存中会去数据库查询,实际上并没有做为 Lookup Source 在处理,就是个 InputFormatSource 一次性把 mysql 的数据读完,mysql source 就退出了(难道是姿势不对,没有走到 Lookup Source )

    看起来和一般的join 和 一般的 join 的效果看起来并没有什么不一样的(user_log 表需要去掉 事件事件属性),维表都是一次性读取,然后 finish

    INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
    SELECT a.user_id, a.item_id, a.category_id, a.behavior, b.behavior_map, a.ts
    FROM user_log a
      left join mysql_behavior_conf b on a.behavior = b.id
    where a.behavior is not null;

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    Anaconda-navigator 打不开的解决方法(亲测有效!)
    Python3.7安装keras和TensorFlow的教程图解
    win10环境下使用anaconda安装opencv
    windows环境下的Anaconda安装与OpenCV机器视觉环境搭建
    各种工具汇总(20210702更新)
    关于文章致谢
    公共数据库信息汇总(20210709更新)
    关于摆脱痛苦
    pip 本地安装 python 包
    报错Error: Sorted input specified, but the file file.bedgraph has the following out of order record解决方案
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14419226.html
Copyright © 2011-2022 走看看