zoukankan      html  css  js  c++  java
  • Flink SQL learn

    1. 搭建测试环境安装

    1.1 下载并启动docker-compose容器

    # 该 Docker Compose 中包含的容器有:
    # DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。默认每秒生成 1000 条数据,持续生成约 3 小时。也可以更改 docker-compose.yml 中 datagen 的 speedup 参数来调整生成速率(重启 docker compose 才能生效)。
    # MySQL:集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
    # Kafka:主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。
    # Zookeeper:Kafka 容器依赖。
    # Elasticsearch:主要存储 Flink SQL 产出的数据。
    # Kibana:可视化 Elasticsearch 中的数据
    mkdir -p /data/flink/flink-demo
    cd /data/flink/flink-demo
    wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
    
    # 启动:
    docker-compose up -d
    # 停止并删除:
    docker-compose down
    # 重启:
    docker-compose restart
    
    # 查看kafka测试数据
    docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
    
    # https://downloads.apache.org/flink/
    wget "https://downloads.apache.org/flink/flink-1.10.3/flink-1.10.3-bin-scala_2.11.tgz"
    gzip -d flink-1.10.3-bin-scala_2.11.tgz
    tar -xvf flink-1.10.3-bin-scala_2.11.tar
    /data/flink/flink-1.10.3
    
    ln -s /data/Apps/flink-1.10.3 /data/flink/flink
    # 下载flink sql connect包
    # https://repo1.maven.org/maven2/org/apache/flink/
    cd /data/flink/flink/lib/
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.3/flink-json-1.10.3.jar 
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.3/flink-sql-connector-kafka_2.11-1.10.3.jar 
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.3/flink-sql-connector-elasticsearch6_2.11-1.10.3.jar 
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.3/flink-jdbc_2.11-1.10.3.jar 
    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
    
    # 修改并发配置
    vi /data/flink/flink/conf/flink-conf.yaml
    taskmanager.numberOfTaskSlots: 10
    
    # 启动Flink
    bin/stop-cluster.sh
    bin/start-cluster.sh
    # 启动 SQL CLI
    bin/sql-client.sh embedded
    

    2. 创建实时任务

    -- 创建kafka数据源表
    CREATE TABLE user_behavior (
        user_id BIGINT,
        item_id BIGINT,
        category_id BIGINT,
        behavior STRING,
        ts TIMESTAMP(3),
        proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
        WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
    ) WITH (
        'connector.type' = 'kafka',  -- 使用 kafka connector
        'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
        'connector.topic' = 'user_behavior',  -- kafka topic
        'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
        'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
        'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
        'format.type' = 'json'  -- 数据源格式为 json
    );
    
    -- 验证SQL
    show databases;
    create database demo;
    use demo;
    show tables;
    describe user_behavior;
    SELECT * FROM user_behavior limit 10;
    -- 数据显示方式
    SET execution.result-mode=changelog;
    SET execution.result-mode=table;
    
    -- 创建统计每小时的成交量的elasticsearch结果表
    CREATE TABLE buy_cnt_per_hour (
        hour_of_day BIGINT,
        buy_cnt BIGINT
    ) WITH (
        'connector.type' = 'elasticsearch',             -- 使用 elasticsearch connector
        'connector.version' = '6',                      -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
        'connector.hosts' = 'http://10.8.60.127:9200',  -- elasticsearch 地址
        'connector.index' = 'buy_cnt_per_hour',         -- elasticsearch 索引名,相当于数据库的表名
        'connector.document-type' = 'user_behavior',    -- elasticsearch 的 type,相当于数据库的库名
        'connector.bulk-flush.max-actions' = '1',       -- 每条数据都刷新
        'format.type' = 'json',                         -- 输出数据格式 json
        'update-mode' = 'append'
    );
    
    -- 统计每小时的成交量
    INSERT INTO buy_cnt_per_hour
    SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
    FROM user_behavior
    WHERE behavior = 'buy'
    GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
    ;
    
    -- 统计一天每10分钟累计独立用户数的es结果表
    CREATE TABLE cumulative_uv (
        time_str STRING,
        uv BIGINT
    ) WITH (
        'connector.type' = 'elasticsearch',
        'connector.version' = '6',
        'connector.hosts' = 'http://localhost:9200',
        'connector.index' = 'cumulative_uv',
        'connector.document-type' = 'user_behavior',
        'format.type' = 'json',
        'update-mode' = 'upsert'
    );
    
    -- 创建预处理的视图
    CREATE VIEW uv_per_10min AS
    SELECT 
      MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, 
      COUNT(DISTINCT user_id) OVER w AS uv
    FROM user_behavior
    WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
    
    -- 统计SQL
    INSERT INTO cumulative_uv
    SELECT time_str, MAX(uv)
    FROM uv_per_10min
    GROUP BY time_str;
    
    -- 创建mysql维表
    CREATE TABLE category_dim (
        sub_category_id BIGINT,  -- 子类目
        parent_category_id BIGINT -- 顶级类目
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://localhost:3306/flink',
        'connector.table' = 'category',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = '123456',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    -- 创建顶级类目操行es表
    CREATE TABLE top_category (
        category_name STRING,  -- 类目名称
        buy_cnt BIGINT  -- 销量
    ) WITH (
        'connector.type' = 'elasticsearch',
        'connector.version' = '6',
        'connector.hosts' = 'http://localhost:9200',
        'connector.index' = 'top_category',
        'connector.document-type' = 'user_behavior',
        'format.type' = 'json',
        'update-mode' = 'upsert'
    );
    
    -- 创建视图
    create view rich_user_behavior
    as
    select
         u.user_id
        ,u.item_id
        ,u.behavior, 
        case c.parent_category_id
            when 1 then '服饰鞋包'
            when 2 then '家装家饰'
            when 3 then '家电'
            when 4 then '美妆'
            when 5 then '母婴'
            when 6 then '3c数码'
            when 7 then '运动户外'
            when 8 then '食品'
            else '其他'
        end as category_name
    from user_behavior as u 
    left join category_dim for system_time as of u.proctime as c
        on u.category_id = c.sub_category_id
    ;
    -- 按顶级类目进行统计
    INSERT INTO top_category
    SELECT
         category_name
        ,COUNT(*) buy_cnt
    FROM rich_user_behavior
    WHERE behavior = 'buy'
    GROUP BY
         category_name;
    
    
    http://10.8.60.127:5601
    
    REF
    
    https://iteblog.blog.csdn.net/article/details/111465792
    https://blog.csdn.net/weixin_42066446/article/details/113243977
    https://blog.csdn.net/weixin_43039757/article/details/112850707
    https://blog.csdn.net/wshl1234567/article/details/104512644/
    https://mp.weixin.qq.com/s/pXJfxp0wxdlafFyg4tgiGg
    
    
  • 相关阅读:
    51nod 1179 最大的最大公约数 (数论)
    POJ 3685 二分套二分
    POJ 3045 贪心
    LIC
    HDU 1029 Ignatius and the Princess IV
    HDU 1024 Max Sum Plus Plus
    HDU 2389 Rain on your Parade
    HDU 2819 Swap
    HDU 1281 棋盘游戏
    HDU 1083 Courses
  • 原文地址:https://www.cnblogs.com/chenzechao/p/14709480.html
Copyright © 2011-2022 走看看