zoukankan      html  css  js  c++  java
  • FlinkCDC实践

    CDC介绍

    CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

    CDC种类
    基于查询的CDC

    例如:Sqoop、JDBC source等产品。
    特点:基于批处理,不能捕获到所有数据的变化、高延迟、需要查询数据库,会增加数据库压力

    基于binlog的CDC

    例如:Maxwell、Canal、Debezium
    特点:基于streaming模式、能捕捉所有数据的变化、低延迟、不会增加数据库压力。

    Flink 社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL
    等数据库直接读取全量数据和增量变更数据的source组件。目前已开源。
    开源地址:https://github.com/ververica/flink-cdc-connectors

    1.开启mysql binlog
    查看mysql-binlog状态并开启mysql-binlog

    上图是开始的状态。如果没有开始,则log_bin=off,log_bin_basename和log_bin_index值为空。开启方式如下:

    vim vim /etc/my.cnf
    

    在添加以下信息

    #开启binglog
    server-id=1
    log-bin=/var/lib/mysql/mysql-bin
    

    server-id表示单个结点的id,这里由于只有一个结点,所以可以把id随机指定为一个数,这里将id设置成1。若集群中有多个结点,则id不能相同
    第二句是指定binlog日志文件的名字为mysql-bin,以及其存储路径。
    添加完成后保存退出。

    重启mysql服务
    service mysqld restart
    
    查看binlog

    2.建立mysql测试表并初始化数据

    导入jar包

    	<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.1.3</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.49</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>1.2.0</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.75</version>
            </dependency>
    

    编写测试类

    package com.meijs;
    
    import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
    import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
    import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class FlinkCDC {
        public static void main(String args[]) throws Exception {
            //获取执行环境
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
    
            DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                    .hostname("192.168.154.130")
                    .port(3306)
                    .username("root")
                    .password("123456")
                    .databaseList("test")
                    .tableList("test.flink_cdc_test")//监控对应的表,如果没有该参数,则是监控全表
                    .deserializer(new StringDebeziumDeserializationSchema())
                    .startupOptions(StartupOptions.initial())//initial对监控的表做一个初始化快照,earliest,latest等参数与kafka的的offset类似
                    .build();
            DataStreamSource<String> streamSource = executionEnvironment.addSource(sourceFunction);
    
            streamSource.print();
    
            executionEnvironment.execute("FlinkCDC");
        }
    }
    
    初始化执行后的打印结果如下:
    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=1,name=小米,log_url=www.xiaomi.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845597}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=2,name=华为,log_url=www.huawei.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=3,name=苹果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=4,name=欧派,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    

    op=c代表是创建,after为启动后当前的数据状态

    更新一条数据观察打印结果

    打印日志如下

    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906085, file=mysql-bin.000005, pos=1418, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=4,name=欧派,log_url=www.oppo.com},after=Struct{id=4,name=oppo,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906085000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1553,row=0,thread=14},op=u,ts_ms=1641906085304}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    
    

    op=u代表为update,before为修改更新前的数据,after更新后的数据状态

    删除一条数据观察打印结果

    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906292, file=mysql-bin.000005, pos=1735, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=3,name=苹果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906292000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1870,row=0,thread=14},op=d,ts_ms=1641906292636}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    

    op=d代表为delete,before为修改更新前的数据,可以看到没after

    在开启状态上增加一条数据观察打印结果

    SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906490, file=mysql-bin.000005, pos=2030, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=6}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=6,name=kupai,log_url=www.kupai.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906490000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=2165,row=0,thread=14},op=c,ts_ms=1641906490308}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    

    同时可以看出flink对bing-log的监控和mysql-binglog一致

  • 相关阅读:
    python获取指定日期和转换的整理
    调用baidu地图API,实现语音导航播报
    Quartus 调试笔记
    mybatis-plus学习遇到的坑
    X509Certificate
    Hyperledger Fabric学习笔记——Wallet
    [WCF权限控制]从两个重要的概念谈起:Identity与Principal
    9-HyperLedger-Fabric原理-MSP详解(一)-MSP基础
    区块链中的“双花”问题
    区块链共识算法 PBFT(拜占庭容错)、PAXOS、RAFT简述
  • 原文地址:https://www.cnblogs.com/jiashengmei/p/15792516.html
Copyright © 2011-2022 走看看