zoukankan      html  css  js  c++  java
  • Flink SQL learn

    1. 搭建测试环境安装

    1.1 下载并启动docker-compose容器

    # 该 Docker Compose 中包含的容器有:
    # DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。默认每秒生成 1000 条数据,持续生成约 3 小时。也可以更改 docker-compose.yml 中 datagen 的 speedup 参数来调整生成速率(重启 docker compose 才能生效)。
    # MySQL:集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
    # Kafka:主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。
    # Zookeeper:Kafka 容器依赖。
    # Elasticsearch:主要存储 Flink SQL 产出的数据。
    # Kibana:可视化 Elasticsearch 中的数据
    mkdir -p /data/flink/flink-demo
    cd /data/flink/flink-demo
    wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
    
    # 启动:
    docker-compose up -d
    # 停止并删除:
    docker-compose down
    # 重启:
    docker-compose restart
    
    # 查看kafka测试数据
    docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
    
    # https://downloads.apache.org/flink/
    wget "https://downloads.apache.org/flink/flink-1.10.3/flink-1.10.3-bin-scala_2.11.tgz"
    gzip -d flink-1.10.3-bin-scala_2.11.tgz
    tar -xvf flink-1.10.3-bin-scala_2.11.tar
    /data/flink/flink-1.10.3
    
    ln -s /data/Apps/flink-1.10.3 /data/flink/flink
    # 下载flink sql connect包
    # https://repo1.maven.org/maven2/org/apache/flink/
    cd /data/flink/flink/lib/
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.3/flink-json-1.10.3.jar 
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.3/flink-sql-connector-kafka_2.11-1.10.3.jar 
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.3/flink-sql-connector-elasticsearch6_2.11-1.10.3.jar 
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.3/flink-jdbc_2.11-1.10.3.jar 
    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
    
    # 修改并发配置
    vi /data/flink/flink/conf/flink-conf.yaml
    taskmanager.numberOfTaskSlots: 10
    
    # 启动Flink
    bin/stop-cluster.sh
    bin/start-cluster.sh
    # 启动 SQL CLI
    bin/sql-client.sh embedded
    

    2. 创建实时任务

    -- 创建kafka数据源表
    CREATE TABLE user_behavior (
        user_id BIGINT,
        item_id BIGINT,
        category_id BIGINT,
        behavior STRING,
        ts TIMESTAMP(3),
        proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
        WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
    ) WITH (
        'connector.type' = 'kafka',  -- 使用 kafka connector
        'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
        'connector.topic' = 'user_behavior',  -- kafka topic
        'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
        'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
        'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
        'format.type' = 'json'  -- 数据源格式为 json
    );
    
    -- 验证SQL
    show databases;
    create database demo;
    use demo;
    show tables;
    describe user_behavior;
    SELECT * FROM user_behavior limit 10;
    -- 数据显示方式
    SET execution.result-mode=changelog;
    SET execution.result-mode=table;
    
    -- 创建统计每小时的成交量的elasticsearch结果表
    CREATE TABLE buy_cnt_per_hour (
        hour_of_day BIGINT,
        buy_cnt BIGINT
    ) WITH (
        'connector.type' = 'elasticsearch',             -- 使用 elasticsearch connector
        'connector.version' = '6',                      -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
        'connector.hosts' = 'http://10.8.60.127:9200',  -- elasticsearch 地址
        'connector.index' = 'buy_cnt_per_hour',         -- elasticsearch 索引名,相当于数据库的表名
        'connector.document-type' = 'user_behavior',    -- elasticsearch 的 type,相当于数据库的库名
        'connector.bulk-flush.max-actions' = '1',       -- 每条数据都刷新
        'format.type' = 'json',                         -- 输出数据格式 json
        'update-mode' = 'append'
    );
    
    -- 统计每小时的成交量
    INSERT INTO buy_cnt_per_hour
    SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
    FROM user_behavior
    WHERE behavior = 'buy'
    GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
    ;
    
    -- 统计一天每10分钟累计独立用户数的es结果表
    CREATE TABLE cumulative_uv (
        time_str STRING,
        uv BIGINT
    ) WITH (
        'connector.type' = 'elasticsearch',
        'connector.version' = '6',
        'connector.hosts' = 'http://localhost:9200',
        'connector.index' = 'cumulative_uv',
        'connector.document-type' = 'user_behavior',
        'format.type' = 'json',
        'update-mode' = 'upsert'
    );
    
    -- 创建预处理的视图
    CREATE VIEW uv_per_10min AS
    SELECT 
      MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, 
      COUNT(DISTINCT user_id) OVER w AS uv
    FROM user_behavior
    WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
    
    -- 统计SQL
    INSERT INTO cumulative_uv
    SELECT time_str, MAX(uv)
    FROM uv_per_10min
    GROUP BY time_str;
    
    -- 创建mysql维表
    CREATE TABLE category_dim (
        sub_category_id BIGINT,  -- 子类目
        parent_category_id BIGINT -- 顶级类目
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/flink',
        'connector.table' = 'category',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = '123456',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    -- 创建顶级类目操行es表
    CREATE TABLE top_category (
        category_name STRING,  -- 类目名称
        buy_cnt BIGINT  -- 销量
    ) WITH (
        'connector.type' = 'elasticsearch',
        'connector.version' = '6',
        'connector.hosts' = 'http://localhost:9200',
        'connector.index' = 'top_category',
        'connector.document-type' = 'user_behavior',
        'format.type' = 'json',
        'update-mode' = 'upsert'
    );
    
    -- 创建视图
    create view rich_user_behavior
    as
    select
         u.user_id
        ,u.item_id
        ,u.behavior, 
        case c.parent_category_id
            when 1 then '服饰鞋包'
            when 2 then '家装家饰'
            when 3 then '家电'
            when 4 then '美妆'
            when 5 then '母婴'
            when 6 then '3c数码'
            when 7 then '运动户外'
            when 8 then '食品'
            else '其他'
        end as category_name
    from user_behavior as u 
    left join category_dim for system_time as of u.proctime as c
        on u.category_id = c.sub_category_id
    ;
    -- 按顶级类目进行统计
    INSERT INTO top_category
    SELECT
         category_name
        ,COUNT(*) buy_cnt
    FROM rich_user_behavior
    WHERE behavior = 'buy'
    GROUP BY
         category_name;
    
    
    http://10.8.60.127:5601
    
    REF
    
    https://iteblog.blog.csdn.net/article/details/111465792
    https://blog.csdn.net/weixin_42066446/article/details/113243977
    https://blog.csdn.net/weixin_43039757/article/details/112850707
    https://blog.csdn.net/wshl1234567/article/details/104512644/
    https://mp.weixin.qq.com/s/pXJfxp0wxdlafFyg4tgiGg
    
    
  • 相关阅读:
    洛谷 P2384 最短路
    洛谷 P2910 [USACO08OPEN]寻宝之路Clear And Present Danger
    POJ 3264 Balanced Lineup
    洛谷 P1892 团伙
    洛谷 P1724 东风早谷苗
    P1129 [ZJOI2007]矩阵游戏
    P1894 [USACO4.2]完美的牛栏The Perfect Stall
    Poj 3041 Asteroids
    P3377 【模板】左偏树(可并堆)
    P1613 跑路
  • 原文地址:https://www.cnblogs.com/chenzechao/p/14709480.html
Copyright © 2011-2022 走看看