zoukankan      html  css  js  c++  java
  • Flink SQL 双流 join demo

    官网持续查询中的join :  https://site.douban.com/108880/widget/notes/4611554/note/249729366/

    Flink 官网上介绍的双流join,有两种:Regular Joins 和 Time-windowed Joins

    以下内容翻译自官网:

    Regular Joins

    常规 join 是最通用的 join 类型,其中任何新记录或对 join 输入两侧的任何更改都是可见的,并且会影响整个联接结果。 例如,如果左侧有一个新记录,则它将与右侧的所有以前和将来的记录合并在一起。

    SELECT * FROM Orders
    INNER JOIN Product
    ON Orders.productId = Product.id

    这些语义允许进行任何类型的更新(插入,更新,删除)输入到表。

    但是,此操作有一个重要的含义:它需要将 join 输入的两端始终保持在Flink的状态。 因此,如果一个或两个输入表持续增长,资源使用也将无限期增长。

    Time-windowed Joins

     时间窗口 join 由 join 谓词定义,该 join 谓词检查输入记录的时间属性是否在某些时间限制(即时间窗口)内。

    SELECT *
    FROM
      Orders o,
      Shipments s
    WHERE o.id = s.orderId AND
          o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

    与常规 join 操作相比,这种 join 仅支持具有时间属性的append-only tables 。 由于时间属性是准单调递增的,因此Flink可以从其状态中删除旧值,而不会影响结果的正确性。

    注:在升到 Flink 1.10 后,我们开始考虑更多的使用SQL了

    先看下 SQL提交程序:

    // create blink table enviroment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings
      .newInstance
      .useBlinkPlanner
      .inStreamingMode
      .build
    
    val tableEnv = StreamTableEnvironment.create(env, settings)
    
    // get sql,读取sql 文件,按 ; 切成不同的段
    val sqlList = SqlFileReadUtil.sqlFileReadUtil(sqlName)
    
    // change special sql name to loop
    for (sql <- sqlList) {
      logger.info("sql : {}", sql)
      tableEnv.sqlUpdate(sql)
    }
    
    tableEnv.execute(sqlName)

    我的SQL提交程序,基本上就是 抄的 云邪大佬之前的博客,之后一点点改动,地址如下:http://wuchong.me/blog/2019/09/02/flink-sql-1-9-read-from-kafka-write-into-mysql/

    看下常规 join 的sql 文件:

    -- Regular Joins like Global Join
    ---sourceTable
    -- 订单表
    CREATE TABLE t_order(
        order_id VARCHAR,         -- 订单 id
        product_id VARCHAR,       -- 产品 id
        create_time VARCHAR -- 订单时间
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'order',
        'connector.startup-mode' = 'earliest-offset',
        'connector.properties.0.key' = 'zookeeper.connect',
        'connector.properties.0.value' = 'venn:2181',
        'connector.properties.1.key' = 'bootstrap.servers',
        'connector.properties.1.value' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    ---sourceTable
    --产品表
    CREATE TABLE t_product (
        product_id VARCHAR,     -- 产品 id
        price DECIMAL(38,18),          -- 价格
        create_time VARCHAR -- 订单时间
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'shipments',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    
    ---sinkTable
    --订单表 关联 产品表 成订购表
    CREATE TABLE order_detail (
        order_id VARCHAR,
        producer_id VARCHAR ,
        price DECIMAL(38,18),
        order_create_time VARCHAR,
        product_create_time VARCHAR
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'order_detail',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    
    ---order_sink
    INSERT INTO order_detail(order_id, product_id, price, create_time)
    SELECT a.order_id, a.product_id, b.price, a.create_time, b.create_time
    FROM t_order a
      INNER JOIN t_product b ON a.product_id = b.product_id
    where a.order_id is not null;

    再看个 timewindow join 的:

    -- time-windowd join
    ---sourceTable
    -- 订单表
    CREATE TABLE t_order(
        order_id VARCHAR,         -- 订单 id
        product_id VARCHAR,       -- 产品 id
        create_time VARCHAR, -- 订单时间
        order_proctime as PROCTIME()
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'order',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    ---sourceTable
    --产品表
    CREATE TABLE t_product (
        product_id VARCHAR,     -- 产品 id
        price DECIMAL(38,18),          -- 价格
        create_time VARCHAR, -- 订单时间
        product_proctime as PROCTIME()
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'shipments',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    
    ---sinkTable
    --订单表 关联 产品表 成订购表
    CREATE TABLE order_detail (
        order_id VARCHAR,
        producer_id VARCHAR ,
        price DECIMAL(38,18),
        order_create_time VARCHAR,
        product_create_time VARCHAR
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'order_detail',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    
    ---order_sink
    INSERT INTO order_detail(order_id, product_id, price, create_time)
    SELECT a.order_id, a.product_id, b.price, a.create_time, b.create_time
    FROM t_order a
      INNER JOIN t_product b ON a.product_id = b.product_id and a.order_proctime BETWEEN b.product_proctime - INTERVAL '10' MINUTE AND b.product_proctime + INTERVAL '10' MINUTE
    where a.order_id is not null;

    两个样例基本上一样的,只是 time window join 加了个处理时间字段,用于在 join 的时候指定时间范围。

    两个 join 的输入数据都是这样的:

    send topic : order,message : {"create_time":"2020-04-27 13:19:23","product_id":"bh001","order_id":"2"}
    send topic : shipments,message : {"create_time":"2020-04-27 13:19:25","price":"3.5","product_id":"bh001"}
    send topic : order,message : {"create_time":"2020-04-27 13:19:27","product_id":"bh001","order_id":"3"}
    send topic : shipments,message : {"create_time":"2020-04-27 13:19:28","price":"3.5","product_id":"bh001"}
    send topic : order,message : {"create_time":"2020-04-27 13:19:29","product_id":"bh001","order_id":"4"}
    send topic : shipments,message : {"create_time":"2020-04-27 13:19:30","price":"3.5","product_id":"bh001"}
    send topic : order,message : {"create_time":"2020-04-27 13:19:31","product_id":"bh001","order_id":"5"}
    send topic : shipments,message : {"create_time":"2020-04-27 13:19:32","price":"3.5","product_id":"bh001"}
    send topic : order,message : {"create_time":"2020-04-27 13:19:33","product_id":"bh001","order_id":"6"}
    send topic : shipments,message : {"create_time":"2020-04-27 13:19:34","price":"3.5","product_id":"bh001"}

    输出数据也一样(只是数据关联的时间范围不一样,一个是全局,一个是指定时间内):

    {"order_id":"19","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:18","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"91","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:42","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"59","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:38","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"49","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:18","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"4","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:20:48","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"81","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:22","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"17","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:14","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"71","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:02","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"14","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:21:08","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"90","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:23:40","product_create_time":"2020-04-27 13:23:59"}
    {"order_id":"43","producer_id":"bh001","price":0,"order_create_time":"2020-04-27 13:22:06","product_create_time":"2020-04-27 13:23:59"}

    从两种 join 看起来, regular join 更像是SQL中的join,将所有数据缓存下来,在每次触发join 的时候,都将另一边的全部数据拿来 关联,对语义的支持没问题后,需要讨论的就是支持的数据量了。

    在我实测的过程中,flink 保持默认配置:

    JVM Parameters:
        -Xmx511mb
        -Xms511mb
        -XX:MaxDirectMemorySize=256mb
        -XX:MaxMetaspaceSize=96mb
    
    TaskManager Dynamic Configs:
        taskmanager.memory.framework.off-heap.size=128mb
        taskmanager.memory.network.max=128mb
        taskmanager.memory.network.min=128mb
        taskmanager.memory.framework.heap.size=128mb
        taskmanager.memory.managed.size=512mb
        taskmanager.cpu.cores=8.0
        taskmanager.memory.task.heap.size=383mb
        taskmanager.memory.task.off-heap.size=0b

    在两边数据都很小的时候,只能保持两边数据在30W 以内,从程序启动,可以看到 heap 内存线性的上涨,直到 抛出 OutOfMemoryError,挂掉。time window join 如果是加大 tps 表现基本一样。

    有兴趣的可以看下源码:  org.apache.flink.table.runtime.operators.join.TimeBoundedStreamJoin 

    从代码上看,每条数据数据,都会去遍历另一边的数据,两种join 的不同在 timewindow join 会卡时间(并且会删除已经超过时间范围内的数据),而不是像 stream api 一样,可以 keyby,将需要遍历的数据范围缩小。比如 两边都是 50 W 数据,那任意一边来一条数据,都会去遍历 50W次,不 OOM 就怪了。加大 heap 内存当然是有效的,但是只能增加不多的数据量。

    我们开始在选择的时候,是选了 Regular join 的,后来实测效果一般, 靠 TTL 清理还不太适用,经过社区大佬指点后,换成了 time window join,在保持 时间范围不大的情况下,还是够我们业务使用的。目前已经在测试环境跑了 3 天,数据量在 500W(一边),heap 使用保持在  80% 左右,看起来还是很稳定。数据量不大,tps 每秒 50 (每边),目前看起来是够我们业务使用的,如果tps 更大的话,join 还是选择 使用 stream api,两边状态存 mapstate 的情况下,tps 可以达到很高的量(已测到 每秒1W tps)

    午休的时候,水一篇,等五一再好好写两篇双流 join 和 join 任务调 heap的之后的表现

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    C语言ASM汇编内嵌语法
    Linux下安装安装arm-linux-gcc
    苹果手机(ios系统)蓝牙BLE的一些特点
    蓝牙BLE数据包格式汇总
    蓝牙BLE4.0的LL层数据和L2CAP层数据的区分与理解
    nrf52840蓝牙BLE5.0空中数据解析
    nrf52840蓝牙BLE5.0空中速率测试(nordic对nordic)
    nrf52832协议栈S132特性记录
    使用 Open Live Writer 创建我的第一个博文
    Codeforces Round #691 (Div. 2) D
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/12719320.html
Copyright © 2011-2022 走看看