zoukankan      html  css  js  c++  java
  • Flink实例(117):FLINK-SQL应用场景(16)以upsert的方式读写Kafka数据——以Flink1.12为例(二)

    来源:https://mp.weixin.qq.com/s/Tb8GtabOVBvx88de0C4ncw

    使用案例

    本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka基本使用方式:

    • Kafka 数据源

    用户的ippv信息,一个用户在一天内可以有很多次pv

    CREATE TABLE source_ods_fact_user_ippv (
        user_id      STRING,       -- 用户ID
        client_ip    STRING,       -- 客户端IP
        client_info  STRING,       -- 设备机型信息
        pagecode     STRING,       -- 页面代码
        access_time  TIMESTAMP,    -- 请求时间
        dt           STRING,       -- 时间分区天
        WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND  -- 定义watermark
    ) WITH (
       'connector' = 'kafka', -- 使用 kafka connector
        'topic' = 'user_ippv', -- kafka主题
        'scan.startup.mode' = 'earliest-offset', -- 偏移量
        'properties.group.id' = 'group1', -- 消费者组
        'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
        'format' = 'json', -- 数据源格式为json
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
    );
    • Kafka Sink表

    统计每分钟的PV、UV,并将结果存储在Kafka中

    CREATE TABLE result_total_pvuv_min (
        do_date     STRING,     -- 统计日期
        do_min      STRING,      -- 统计分钟
        pv          BIGINT,     -- 点击量
        uv          BIGINT,     -- 一天内同个访客多次访问仅计算一个UV
        currenttime TIMESTAMP,  -- 当前时间
        PRIMARY KEY (do_date, do_min) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'result_total_pvuv_min',
      'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
      'key.json.ignore-parse-errors' = 'true',
      'value.json.fail-on-missing-field' = 'false',
      'key.format' = 'json',
      'value.format' = 'json',
      'value.fields-include' = 'EXCEPT_KEY' -- key不出现kafka消息的value中
    );

    计算逻辑

    -- 创建视图
    CREATE VIEW view_total_pvuv_min AS
    SELECT
         dt AS do_date,                    -- 时间分区
         count (client_ip) AS pv,          -- 客户端的IP
         count (DISTINCT client_ip) AS uv, -- 客户端去重
         max(access_time) AS access_time   -- 请求的时间
    FROM
        source_ods_fact_user_ippv
    GROUP BY dt;
    
    -- 写入数据
    INSERT INTO result_total_pvuv_min
    SELECT
      do_date,    --  时间分区
      cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的时间
      pv,
      uv,
      CURRENT_TIMESTAMP AS currenttime -- 当前时间
    from
      view_total_pvuv_min;

    生产用户访问数据到kafka,向kafka中的user_ippv插入数据

    {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"}
    
    {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"}
    
    {"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"}
    
    {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"}
    
    {"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"}
    
    {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"}

    查询结果表:

    select * from result_total_pvuv_min;

    可以看出:每分钟的pv、uv只显示一条数据,即代表着截止到当前时间点的pv和uv

    查看Kafka中result_total_pvuv_min主题的数据,如下:

    可以看出:针对每一条访问数据,触发计算了一次PV、UV,每一条数据都是截止到当前时间的累计PV和UV。

    尖叫提示:

    默认情况下,如果在启用了检查点的情况下执行查询,Upsert Kafka接收器会将具有至少一次保证的数据提取到Kafka主题中。

    这意味着,Flink可能会将具有相同键的重复记录写入Kafka主题。但是,由于连接器在upsert模式下工作,因此作为源读回时,同一键上的最后一条记录将生效。因此,upsert-kafka连接器就像HBase接收器一样实现幂等写入。

    总结

    本文以Flink1.12为例,介绍了upsert-kafka的基本使用,该方式允许用户以upsert 的方式读写Kafka中的表,使用起来非常方便。另外本文也给出了一个具体的使用案例,可以进一步加深对该功能的使用

  • 相关阅读:
    Python
    Kubernetes之二workload资源编排
    Kubernetes之一安装
    DockerFile
    Docker的安装和使用
    Elastic Stack配置和使用
    虚拟化KVM应用
    Tomcat安装和使用
    Keepalived实现双主模型的ipvs高可用集群+实现双主模型的nginx高可用集群
    实验lvs+dns+nfs+mysql+web
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14328010.html
Copyright © 2011-2022 走看看