zoukankan      html  css  js  c++  java
  • 1. Canal入门

    1. Canal简介

    官方文档: https://github.com/alibaba/canal/wiki/简介

    早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元,也就是Canal,翻译为管道的意思

    工作原理

    官网截图

    image-20210708205250864

    先简单了解一下关于Mysql,master和slave的同步机制:

    1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);需要手动打开,master在更新数据时,会同步操作日志文件,会有性能方面的影响,如果没有同步需要,关闭即可
    2. slave将master的binary log events拷贝到它的中继日志(relay log);
    3. slave重做中继日志中的事件,将改变反映它自己的数据。

    详细信息可以参考:

    以上是mysql主从备份的简单了解,以下是Canal的工作原理:

    1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    3. canal解析binary log对象(原始为byte流)
    4. canal将数据推送给指定的目标位置,并封装为对象的形式操作

    2. Canal服务端搭建

    2.1 部署Mysql

    Mysql的搭建我这里就不说了,搭建完毕后,需要修改一些信息:

    修改Mysql配置文件(我是在windows下,默认文件夹是C:ProgramDataMySQLMySQL Server 5.7my.ini,需要打开隐藏目录):

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    创建Canal连接的账号,并赋予slave的权限:

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

    然后重启mysql服务即可

    可以使用如下命令查看是否打开binlog模式:

    show variables like 'log_bin';
    

    查看binlog日志文件列表:

    show binary logs 
    

    2.2 Canal服务搭建

    下载canal包: https://github.com/alibaba/canal/releases ,我下载的是 1.1.5 版本, canal.deployer-1.1.5.tar.gz

    解压完毕后, 修改conf/example/instance.properties 配置文件,如下:

    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # 数据库地址
    canal.instance.master.address=127.0.0.1:3306
    #执行binlog文件
    canal.instance.master.journal.name=mysql-bin.000001
    #起始位置.跳过Mysql初始的一部分
    canal.instance.master.position=154
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    
    # 数据库账号密码
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    
    # 指定同步的表名 这里是全部
    canal.instance.filter.regex=.*\..*
    # table black regex
    canal.instance.filter.black.regex=mysql\.slave_.*
    
    # mq config
    canal.mq.topic=example
    #canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
    canal.mq.partition=0
    
    
    

    然后执行bin目录下对应操作系统的启动脚本即可:

    image-20210710135743233

    这样就启动成功了

    3. 客户端搭建

    创建客户端连接Canal服务

    创建maven项目,导入依赖:

        <dependencies>
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.5</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.protocol</artifactId>
                <version>1.1.5</version>
            </dependency>
        </dependencies>
    

    客户端代码:

    class SimpleCanalClientExample {
    
    
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                    11111), "example", "root", "root");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] 
    ", batchId, size);
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
    
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                //如果是事务开启关闭时间则跳过
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                CanalEntry.RowChange rowChage = null;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
    
                CanalEntry.EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------&gt; before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------&gt; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
    
    }
    

    当我们操作数据库时,客户端则可以获取到事件的发生:

    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4
    empty count : 5
    empty count : 6
    empty count : 7
    empty count : 8
    empty count : 9
    empty count : 10
    empty count : 11
    empty count : 12
    empty count : 13
    empty count : 14
    ================&gt; binlog[mysql-bin.000001:1893] , name[test,aa_test] , eventType : CREATE
    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4
    empty count : 5
    empty count : 6
    empty count : 7
    empty count : 8
    empty count : 9
    empty count : 10
    empty count : 11
    empty count : 12
    empty count : 13
    empty count : 14
    empty count : 15
    empty count : 16
    empty count : 17
    empty count : 18
    ================&gt; binlog[mysql-bin.000001:2698] , name[test,aa_test] , eventType : INSERT
    id : 1111110946    update=true
    status : 1    update=true
    orderId : 1    update=true
    orderProductId : 1    update=true
    stanId : 1    update=true
    quantity : 1    update=true
    paymentDate : 2021-07-07 14:07:23    update=true
    warehouse : 1    update=true
    pid : 1    update=true
    customerId : 1    update=true
    type : 1    update=true
    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4
    
    

    如上日志显示, 创建表和向中插入一条数据的时间都被记录,并打印出插入的数据

  • 相关阅读:
    在Windows QT下使用ZeroMQ
    libpng warning: iCCP: known incorrect sRGB profile告警处理
    qt 程序发布
    Qtcreator 之中文目录
    windows下kafka配置入门 示例
    CentOS-7安装Mysql集群
    zookeeper 集群安装(单点与分布式成功安装)摘录
    Linux下安装Redis2.6.17
    Hadoop集群Hadoop安装配置
    lvs/dr+keepalived应用测试实施文档
  • 原文地址:https://www.cnblogs.com/xjwhaha/p/15008151.html
Copyright © 2011-2022 走看看