zoukankan      html  css  js  c++  java
  • Flink+Kafka实时计算快速开始

    Flink+Kafka实时计算快速开始

    一、概述

    本示例场景(SQLServer->Kafka-Connect)实时流表与维度表(MySQL) JOIN得到数据实时写入到MySQL中的场景。

    另外本示例是通过Kafka-connect 实现CDC, 实际上,Flink最新版本已直接支持CDC(截止到当前2021/11,仅支持MySQL、Oracle、PostgresSQL、MongoDB, 但还不支持SQLServer)。

    FlinkCDC官方文档:https://ververica.github.io/flink-cdc-connectors/master/

    注意:本文主要讲基于SQLServer的CDC, 部分涉及MySQL(如开启CDC、注册Connector)仅是为了记录下来备忘。

    本场景示例关键步骤说明:

    0.开启源库CDC、部署好Kafka、Flink基础环境
    1.创建表链接实时流,kafka消息队列中的应用日志表(app_log)实时数据
    2.创建表链接维度表,公司维度表(dim_company)
    3.创建表链接结果表, 公司访问实时统计表(dws_company_vist)
    4.维表与实时流表 JOIN 写入到结果表,  insert into as select ...
    

    二、数据库开启CDC

    2.1.MySQL开启支持CDC

    注意:本示例无需配置mysql-cdc,仅作记录备忘。
    mysql需要binlog_format为ROW格式,我安装的mysql版本默认即为此格式,可先查看一下

    mysql> show global variables like "%binlog_format%";
    

    手动修改配置文件my.cnf,修改完需要重启生效

    # vi /etc/my.cnf
    binlog_format=ROW
    log-bin=mysql-bin
    

    也可在线修改,重启后失效

    mysql> SET global binlog_format='ROW';
    

    2.2.SQLServer开启支持CDC

    需要监听SQL Server中已有的数据表,需完成以下配置:

    2.2.1. 执行以下命令开启CDC配置。

    -- 开启CDC支持
    USE testDB
    GO
    EXEC sys.sp_cdc_enable_db
    GO
    

    2.2.2. 执行以下命令开启指定Table的CDC配置。

    -- 开启指定Table的CDC配置。
    USE testdb
    GO
    
    exec sp_cdc_enable_table 
    @source_schema = N'dbo', 
    @source_name = N't_app', 
    @role_name = null,
    @supports_net_changes = 1
    GO
    

    2.2.3. 执行以下命令确认是否有权限访问CDC Table。

    EXEC sys.sp_cdc_help_change_data_capture
    GO
    

    说明 如果返回结果为空,您需要确认是否有权限访问该表。

    2.2.4. 执行以下命令确认SQL Server Agent已开启。

    EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
    GO
    

    说明 如果返回结果为Running,则说明SQL Server Agent已开启。

    2.2.5. 查看表cdc开启状态

    SELECT is_tracked_by_cdc FROM sys.tables WHERE name='t_app';
    

    说明 查询结果为“1”,表示开启成功。

    2.2.6.SQLServer官方文档CDC参考

    https://docs.microsoft.com/zh-CN/sql/relational-databases/track-changes/track-data-changes-sql-server?view=sql-server-2017

    2.3.Oracle开启CDC

    实操步骤后续待实践后补上

    可参考:https://www.cnblogs.com/myrunning/p/5329139.html

    三、Kafka部署

    3.1. Kafka下载

    $ cd /usr/local/src/
    $ wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
    $ mkdir -p /usr/local/kafka/
    $ tar xvf kafka_2.13-2.8.1.tgz -C /usr/local/kafka/
    

    3.2. Kafka配置

    # vi /usr/local/kafka/config/server.properties
    
    log.dirs=/usr/local/kafka/logs
    listeners=PLAINTEXT://192.168.1.100:9092
    advertised.listeners=PLAINTEXT://192.168.1.100:9092
    

    3.3. 启动kafka

    $ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    $ bin/kafka-server-start.sh -daemon config/server.properties
    

    3.4.安装Kafka Connect

    3.4.1. 配置Connector

    #vi /usr/local/kafka/config/connect-distributed.properties
    
    #kafka集群地址
    bootstrap.servers=192.168.1.100:9092
    
    # 同一集群中group.id需要配置一致,且不能和别的消费者同名
    group.id=connect-cluster
    
    #使用json数据需配置成false
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    #保存connectors的路径
    plugin.path=/usr/local/kafka/plugins
    

    3.4.2.安装connectors插件

    $ mkdir -p /usr/local/kafka/plugins
    
    
    从https://www.confluent.io/hub/ 下载 connector 解压到/usr/local/kafka/plugins
    我这里下载了如下:
    debezium-debezium-connector-mysql-1.7.0.zip
    debezium-debezium-connector-sqlserver-1.7.0.zip
    confluentinc-kafka-connect-oracle-cdc-1.3.1.zip
    confluentinc-kafka-connect-jdbc-10.2.5.zip
    

    3.4.3.启动Kafka Connect

    $ cd /usr/local/kafka
    $ bin/connect-distributed.sh -daemon config/connect-distributed.properties
    

    成功启动后就会自动创建相应的Topic

    3.4.4.通过rest接口查看已安装的connector

    $ curl -X GET http://192.168.1.100:8083/connector-plugins
    

    返回信息如下:

    [
      {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "10.2.5"
      },
      {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "10.2.5"
      },
      {
        "class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
        "type": "source",
        "version": "1.3.1"
      },
      {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "1.7.0.Final"
      },
      {
        "class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "type": "source",
        "version": "1.7.0.Final"
      },
      {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.8.1"
      },
      {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.8.1"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
      }
    ]
    

    3.5.提交Connector用户配置

    通过kafka-connect rest-api接口进行注册配置connector

    #注册connector,sqlserver-cdc
    $ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.100:8083/connectors/  -d '{"name":"myserver-cdc-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"192.168.1.102","database.port":"1433","database.user":"sa","database.password":"xxx","database.dbname":"testdb","database.server.name":"myserver","table.include.list":"dbo.app_log","database.history.kafka.bootstrap.servers":"192.168.1.100:9092","database.history.kafka.topic":"dbhistory.myserver","decimal.handling.mode":"double","time.precision.mode":"connect"}}'
    
    ##更新connector, sqlserver-cdc
    $ curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.100:8083/connectors/myserver-cdc-connector/config/  -d '{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","database.hostname":"192.168.1.102","database.port":"1433","database.user":"sa","database.password":"xxx","database.dbname":"testdb","database.server.name":"myserver","table.include.list":"dbo.app_log","database.history.kafka.bootstrap.servers":"192.168.1.100:9092","database.history.kafka.topic":"dbhistory.myserver","decimal.handling.mode":"double","time.precision.mode":"connect"}'
    
    #查看状态 
    $ curl -k http://192.168.1.100:8083/connectors/myserver-cdc-connector/status
    
    #查看队列(可能会有延迟)
    $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
    
    # 通过kafka命令工具获取之前的消息(加上--from-beginning)
    $ ./bin/kafka-console-consumer.sh  --bootstrap-server 192.168.1.100:9092 --topic myserver.dbo.app_log --from-beginning
    

    针对SQLServer表配置示例

    {
        "name": "myserver-cdc-connector", 
        "config": {
            "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
            "database.hostname": "192.168.1.102", 
            "database.port": "1433", 
            "database.user": "sa", 
            "database.password": "xxx", 
            "database.dbname": "testdb", 
            "database.server.name": "myserver", 
            "table.include.list": "dbo.app_log", 
            "database.history.kafka.bootstrap.servers": "192.168.1.100:9092", 
            "database.history.kafka.topic": "dbhistory.myserver",
    		"decimal.handling.mode":"double",
    		"time.precision.mode":"connect"
        }
    }
    

    官方参考文档:https://debezium.io/documentation/reference/stable/connectors/sqlserver.html

    针对MySQL表配置示例

    {
        "name":"myserver-mysql-cdc-connector",
        "config":{
            "connector.class":"io.debezium.connector.mysql.MySqlConnector",
            "database.hostname":"192.168.1.101",
            "database.port":"3306",
            "database.user":"root",
            "database.password":"xxx",
            "database.server.id":"1",
            "database.server.name":"myserver",
            "database.include.list":"testdb",
            "table.include.list":"testdb.app_log",
            "database.history.kafka.bootstrap.servers":"192.168.1.100:9092",
            "database.history.kafka.topic":"dbhistory.myserver",
            "include.schema.changes":"true",
            "decimal.handling.mode":"double"
        }
    }
    

    官方参考文档:https://debezium.io/documentation/reference/stable/connectors/mysql.html

    connector的常见管理操作API:

    GET /connectors – 返回所有正在运行的connector名。
    POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
    GET /connectors/{name} – 获取指定connetor的信息。
    GET /connectors/{name}/config – 获取指定connector的配置信息。
    PUT /connectors/{name}/config – 更新指定connector的配置信息。
    GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
    GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
    GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
    PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
    PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
    POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
    POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
    DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
    

    四、Flink部署

    4.1.下载安装

    $ cd /usr/local/src/
    $ wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
    $ mkdir -p /usr/local/flink/
    $ tar xvf flink-1.13.3-bin-scala_2.11.tgz -C /usr/local/flink/
    
    
    $ mkdir -p /usr/local/flink/sql-lib/
    $ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.3/flink-sql-connector-kafka_2.11-1.13.3.jar
    $ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.3/flink-connector-jdbc_2.11-1.13.3.jar
    $ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
    
    
    #如果直接使用flink-cdc,如flink-cdc-mysql 可再下载flink-sql-connector-mysql-cdc-xxx.jar
    $ wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
    

    4.2.FlinkSQL开发

    基于FlinkSQL开发,可以通过Flink自带的sql-client.sh命令行方式执行sql脚本,也可以通过Zepplin或其他第三方数据开发平台来执行。

    4.2.1.基于sql-client方式执行

    $ cd /usr/local/flink/
    $ ./bin/sql-client.sh embedded -l sql-lib
    

    相关脚本

    #创建表链接维度表,公司维度表(dim_company)
    CREATE TABLE IF NOT EXISTS dim_app (
      `app_code` varchar(64) NOT NULL COMMENT '应用编码',
      `app_name` varchar(255) NULL COMMENT '应用名称',
      `company_code` varchar(64) NOT NULL COMMENT '公司编码',
      `company_name` varchar(255) NOT NULL COMMENT '公司名称',
      PRIMARY KEY (app_code) NOT ENFORCED
     ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://192.168.1.101:3306/testdb',
       'table-name' = 'dim_app',
       'username' = 'root',
       'password' = 'xxx'
    );
    
    
    #创建表链接实时流,kafka消息队列中的应用日志表(app_log)实时数据 
    CREATE TABLE IF NOT EXISTS ods_app_log (
    	`id` varchar(64) NOT NULL  COMMENT '主键ID',
    	`app_code` varchar(64) NOT NULL COMMENT '应用编码',
    	`vist_time` datetime NOT NULL COMMENT '访问时间'
    	PRIMARY KEY (id) NOT ENFORCED
     ) WITH (
      'connector' = 'kafka', 
      'topic' = 'myserver.dbo.app_log', 
      'properties.bootstrap.servers' = '192.168.1.100:9092',
      'properties.group.id' = 'myserver',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'debezium-json' 
    );
    
    
    #创建表链接结果表, 公司访问实时统计表(dws_company_vist)、
    CREATE TABLE IF NOT EXISTS dws_company_vist (
    	`company_code` varchar(64) NOT NULL COMMENT '公司编码',
    	`total_vist_cnt` int  COMMENT '总访问量',
        PRIMARY KEY (project_code) NOT ENFORCED
     ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://192.168.1.101:3306/testdb',
       'table-name' = 'dws_company_vist',
       'username' = 'root',
       'password' = 'xxx'
    );
    
    #维表与实时流表 JOIN 写入到结果表,  insert into as select ...
    insert into dws_company_vist select 
    c.company_code,
    count(a.id) as total_vist_cnt
    from dim_company c,ods_app_log a 
    where c.app_code=a.app_code
    group by c.company_code;
    

    4.2.2.基于Zepplin方式执行

    %flink.conf
    flink.execution.jars /usr/local/flink/sql-lib/flink-connector-jdbc_2.11-1.13.3.jar,/usr/local/flink/sql-lib/flink-sql-connector-kafka_2.11-1.13.3.jar,/usr/local/flink/sql-lib/mysql-connector-java-8.0.26.jar
    
    %flink.ssql(type=update)
    CREATE TABLE IF NOT EXISTS dim_app ...
    CREATE TABLE IF NOT EXISTS ods_app_log ...
    CREATE TABLE IF NOT EXISTS dws_company_vist ...
    
    %flink.ssql(jobName="dws_company_vist")
    insert into xxx as  select ...
    

    相关参考:

    4.2.3.基于FlinkSQL-Java方式执行

    package demo;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    public class FlinkCdcDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            String createOdsTablSql = "CREATE TABLE IF NOT EXISTS ods_app_log ...";
            String createDimTableSql = "CREATE TABLE IF NOT EXISTS dim_app ...";
    		String createDwsResultSql = "CREATE TABLE IF NOT EXISTS dws_company_vist ...";
    		String insertIntoSql = "insert into xxx as  select ...";
            
    		tableEnv.executeSql(createOdsTablSql);
            tableEnv.executeSql(createDimTableSql);
            tableEnv.executeSql(createDwsResultSql);
            tableEnv.executeSql(insertIntoSql);
        }
    }
    
    

    pom.xml部分内容

    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.14.0</flink.version>
            <flink.jdbc.version>1.10.3</flink.jdbc.version>
            <java.version>1.8</java.version>
            <scala.binary.version>2.11</scala.binary.version>
            <maven.compiler.source>${java.version}</maven.compiler.source>
            <maven.compiler.target>${java.version}</maven.compiler.target>
        </properties>
    
       <dependencies>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-java</artifactId>
               <version>${flink.version}</version>
               <scope>compile</scope>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>compile</scope>
           </dependency>
    
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-base</artifactId>
               <version>${flink.version}</version>
           </dependency>
    
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-clients_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>compile</scope>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
           </dependency>
    
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-json</artifactId>
               <version>${flink.version}</version>
               <scope>compile</scope>
           </dependency>
    
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>compile</scope>
           </dependency>
          <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-api-java</artifactId>
               <version>${flink.version}</version>
              <scope>compile</scope>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>compile</scope>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
               <version>${flink.jdbc.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
           </dependency>
    
           <dependency>
               <groupId>mysql</groupId>
               <artifactId>mysql-connector-java</artifactId>
               <version>8.0.26</version>
               <scope>compile</scope>
           </dependency>
       </dependencies>
    
  • 相关阅读:
    [51nod] 1088 最长回文子串 #Hash+二分
    [51nod] 1378 夹克老爷的愤怒 #树形DP
    [BZOJ] 2456: mode #众数计数法
    [51nod] 1199 Money out of Thin Air #线段树+DFS序
    [51nod] 1494 选举拉票 #算法设计策略
    [51nod] 1463 找朋友 #离线+扫描线
    [BZOJ] 2733: [HNOI2012]永无乡 #线段树合并+并查集
    [BZOJ] 1012: [JSOI2008]最大数maxnumber
    [Codeforces] E. Lomsat gelral #DSU on Tree
    [BZOJ] 4756: [Usaco2017 Jan]Promotion Counting #线段树合并+权值线段树
  • 原文地址:https://www.cnblogs.com/huligong1234/p/15611763.html
Copyright © 2011-2022 走看看