zoukankan      html  css  js  c++  java
  • 基于Flink SQL构建流式应用

    基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 构建电商用户行为的实时分析应用。Flink SQL 可轻松连接各种外部系统,原生支持事件时间和乱序数据处理、维表关联,有丰富的内置函数等。

    1)购买腾讯云服务器,安装Java 13.0.2,安装Docker

    //ubuntu 18.04 安装 docker
    https://www.cnblogs.com/ws17345067708/p/10455460.html

    2)下载 docker-compose.yml 文件。Docker Compose 包含的容器有:

     1 version: '2.1'
     2 services:
     3   datagen:
     4     image: jark/datagen:0.1
     5     command: "java -classpath /opt/datagen/flink-sql-demo.jar myflink.SourceGenerator --input /opt/datagen/user_behavior.log --output kafka kafka:9094 --speedup 1000"
     6     depends_on:
     7       - kafka
     8     environment:
     9       ZOOKEEPER_CONNECT: zookeeper
    10       KAFKA_BOOTSTRAP: kafka
    11   mysql:
    12     image: jark/mysql-example:0.1
    13     ports:
    14       - "3306:3306"
    15     environment:
    16       - MYSQL_ROOT_PASSWORD=123456
    17   zookeeper:
    18     image: wurstmeister/zookeeper:3.4.6
    19     ports:
    20       - "2181:2181"
    21   kafka:
    22     image: wurstmeister/kafka:2.12-2.2.1
    23     ports:
    24       - "9092:9092"
    25       - "9094:9094"
    26     depends_on:
    27       - zookeeper
    28     environment:
    29       - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
    30       - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
    31       - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
    32       - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
    33       - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    34       - KAFKA_CREATE_TOPICS="user_behavior:1:1"
    35     volumes:
    36       - /var/run/docker.sock:/var/run/docker.sock
    37   elasticsearch:
    38     image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0
    39     environment:
    40       - cluster.name=docker-cluster
    41       - bootstrap.memory_lock=true
    42       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    43       - discovery.type=single-node
    44     ports:
    45       - "9200:9200"
    46       - "9300:9300"
    47     ulimits:
    48       memlock:
    49         soft: -1
    50         hard: -1
    51       nofile:
    52         soft: 65536
    53         hard: 65536
    54   kibana:
    55     image: docker.elastic.co/kibana/kibana:7.6.0
    56     ports:
    57       - "5601:5601"
    View Code
    • DataGen: 数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。
    • MySQL: 集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
    • Kafka: 主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。
    • Zookeeper: Kafka 容器依赖。
    • Elasticsearch: 主要存储 Flink SQL 产出的数据。
    • Kibana: 可视化 Elasticsearch 中的数据。

    3)启动docker容器 docker-compose up -d

    查看容器是否正常启动 docker ps

    停止所有容器 docker-compose down

    注意:"docker-compose up -d"会报错。python2和python3的差异大,使用pip因为多版本python会报错

    # pip
    2 Traceback (most recent call last):
    3   File "/usr/bin/pip", line 5, in <module>
    4     from pkg_resources import load_entry_point
    5 ImportError: No module named pkg_resources

    解决办法:

    sudo apt-get clean
    sudo apt-get update
    sudo apt-get install --reinstall python-minimal python-lockfile

    4) 下载 Flink 1.10.0 安装包并解压,下载 json,kafka,Elasticsearch,jdbc,MySQL 包依赖放在 Flink 的 lib 目录。注意 Flink 的 Scala 版本与 Elasticsearch 连接器的 Scala 版本保持一致,可全部为2.11版本。 Elasticsearch 版本6的 connector 可用来连接 Elasticsearch 7.6.0。

    wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar
    wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
    wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
    wget -bc https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar
    wget -bc https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

    5) 将 conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,会同时运行多个任务。

    6)启动 Flink 集群 ./bin/start-cluster.sh

    通过 http://IP:8081 可以访问到 Flink Web UI。

    7)启动 SQL CLI:bin/sql-client.sh embedded

    8)使用 DDL 创建 Kafka 表,作为原始数据表

    9)使用 DDL 创建 Elasticsearch 表统计每小时成交量、一天每10分钟累计独立用户数、类目排行榜。

    10)使用 Kibana 实时显示可视化结果。

    ref:

    http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/

  • 相关阅读:
    netstat
    ansibe tower的开源替代品semaphore
    ansible playbook 示例
    centos6 安装python2.7
    python celery + redis
    flask + uwsgi 生产环境
    ubuntu访问supermicro ikvm
    [leetcode]Symmetric Tree
    [leetcode]Pascal's Triangle
    [leetcode]Letter Combinations of a Phone Number
  • 原文地址:https://www.cnblogs.com/zgq25302111/p/12578371.html
Copyright © 2011-2022 走看看