zoukankan      html  css  js  c++  java
  • flink lookup join mysql demo

    Flink 1.12 的时候尝试使用 JDBC SQL Connector kafka 流关联 mysql 表,使用 lookup cache 缓存 mysql 数据,测试在关联性能和更新时效的平衡。不过遭遇了失败,尝试各种 join 也无法实现,mysql source 使用 InputFormatSource 一次性把 mysql 的数据读完,mysql source 就退出了。

    在 Flink 1.13 的 SQL 文档看到这个: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#lookup-join

    -- Customers is backed by the JDBC connector and can be used for lookup joins
    CREATE TEMPORARY TABLE Customers (
      id INT,
      name STRING,
      country STRING,
      zip STRING
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
      'table-name' = 'customers'
    );
    
    -- enrich each order with customer information
    SELECT o.order_id, o.total, c.country, c.zip
    FROM Orders AS o
      JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
        ON o.customer_id = c.id;

    jdbc connector 关于 Lookup Cache 的描述还是和上一版本一样的

    JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。
    
    默认情况下,lookup cache 是未启用的,你可以设置 lookup.cache.max-rows and lookup.cache.ttl 参数来启用。
    
    lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 
    当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。
    Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。
    当缓存命中最大缓存行 lookup.cache.max
    -rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。
    缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。
    所以要做好吞吐量和正确性之间的平衡。

    测试SQL 如下

    CREATE TABLE user_log (
      user_id STRING
      ,item_id STRING
      ,category_id STRING
      ,behavior STRING
      ,ts TIMESTAMP(3)
      ,process_time as proctime()
      , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_behavior'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'group-offsets'
      ,'format' = 'json'
    );
    
    CREATE TEMPORARY TABLE mysql_behavior_conf (
       id int
      ,code STRING
      ,map_val STRING
      ,update_time TIMESTAMP(3)
    --   ,primary key (id) not enforced
    --   ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
    ) WITH (
       'connector' = 'jdbc'
       ,'url' = 'jdbc:mysql://localhost:3306/venn'
       ,'table-name' = 'lookup_join_config'
       ,'username' = 'root'
       ,'password' = '123456'
       ,'lookup.cache.max-rows' = '1000'
       ,'lookup.cache.ttl' = '1 minute' -- 缓存时间,即使一直在访问也会删除
    );
    
    SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.map_val, a.ts
    FROM user_log a
      left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
      ON a.behavior = c.code
    where a.behavior is not null;

    Lookup Cache 尝试成功

    查看 mysql Lookup Join 的源码: LookupJoinRunner

    @Override
    public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception {
        collector.setCollector(out);
        collector.setInput(in);
        collector.reset();
    
        // fetcher has copied the input field when object reuse is enabled
        fetcher.flatMap(in, getFetcherCollector());
    
        if (isLeftOuterJoin && !collector.isCollected()) {
            outRow.replace(in, nullRow);
            outRow.setRowKind(in.getRowKind());
            out.collect(outRow);
        }
    }

    getFetcherCollector 方法调用 JdbcRowDataLookupFunction.java,查询和返回缓存的地方

    /**
     * This is a lookup method which is called by Flink framework in runtime.
     *
     * @param keys lookup keys
     */
    public void eval(Object... keys) {
        // 获取 key
        RowData keyRow = GenericRowData.of(keys);
        // 如果没有缓存
        if (cache != null) {
            // 获取缓存
            List<RowData> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (RowData cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }
        // 从源端重新获取数据
        for (int retry = 0; retry <= maxRetryTimes; retry++) {
            try {
                statement.clearParameters();
                statement = lookupKeyRowConverter.toExternal(keyRow, statement);
                // 查询 数据库
                try (ResultSet resultSet = statement.executeQuery()) {
                    if (cache == null) {
                        while (resultSet.next()) {
                            collect(jdbcRowConverter.toInternal(resultSet));
                        }
                    } else {
                        ArrayList<RowData> rows = new ArrayList<>();
                        while (resultSet.next()) {
                            RowData row = jdbcRowConverter.toInternal(resultSet);
                            rows.add(row);
                            // 返回数据
                            collect(row);
                        }
                        rows.trimToSize();
                        // 放入缓存
                        cache.put(keyRow, rows);
                    }
                }
                break;
            } catch (SQLException e) {
                ...略...
            }
        }
    }

    缓存使用 guava 的 LocalCache

    注: ttl 时间是 expireAfterWrite,写入后固定时间时效

    this.cache =
            cacheMaxSize == -1 || cacheExpireMs == -1
                    ? null
                    : CacheBuilder.newBuilder()
                            .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                            .maximumSize(cacheMaxSize)
                            .build();

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    UVA 10066 The Twin Towers
    UVA 10192 Vacation
    hdu 5018 Revenge of Fibonacci
    hdu 5108 Alexandra and Prime Numbers
    UVA 10252
    UVA 10405最长公共子序列
    数塔
    hdu 2602
    面向对象(五)
    面向对象(三)
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14862865.html
Copyright © 2011-2022 走看看