zoukankan      html  css  js  c++  java
  • Canal:入门

    官方文档:https://github.com/alibaba/canal/wiki

    简介

    image-20211119150617104

    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    工作原理

    MySQL主备复制原理

    image-20211119150710086

    • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
    • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
    • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

    canal 工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    下载、安装、配置

    mysql配置

    canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    查看mysql binlog是否开启:

    SHOW VARIABLES LIKE 'log_bin'
    

    image-20211119152408328

    如果没有开启,需要在mysql中配置文件中配置(修改后重启mysql):

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    binlog-format模式:

    ROW模式除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
    STATEMENT模式只记录了sql语句,但是没有记上下文信息,在迸行数据恢复的时候可能会导致数据的丢矢情况;
    MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    select * from mysql.user
    

    image-20211119153517302

    canal

    下载:https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

    安装:

    yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
    mkdir /opt/canal
    tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
    

    配置:

    vi /opt/canal/conf/example/instance.properties
    

    修改mysql的链接信息

    image-20211119160336621

    配置连接canal的账号密码:

    vi /opt/canal/conf/canal.properties
    # 默认账号密码是 canal/canal
    

    image-20211119165259384

    canal密码加密方式:使用 MySQL 的 password 方法加密(记得去掉第一个首字母的星号)

    select password('canal')
    

    image-20211119165559433

    启动canal:

    /opt/canal/bin/startup.sh
    

    image-20211119161035781

    查看server日志:

    cat /opt/canal/logs/canal/canal.log
    

    查看 instance 的日志:

    cat /opt/canal/logs/example/example.log
    

    java-demo

    pom依赖:

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.5</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.protocol</artifactId>
                <version>1.1.5</version>
                <optional>true</optional>
            </dependency>
        </dependencies>
    

    CanalClient:

    @Component
    public class CannalClient implements InitializingBean {
    
        private final static int BATCH_SIZE = 1000;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            //链接canal服务端
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("xxxxxxx", 11111),
                    "example", "canal", "canal");
            try {
                //打开连接
                connector.connect();
                //订阅数据库表,全部表
                connector.subscribe(".*\\..*");
                //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
                connector.rollback();
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(BATCH_SIZE);
                    //获取批量ID
                    long batchId = message.getId();
                    //获取批量的数量
                    int size = message.getEntries().size();
                    //如果没有数据
                    if (batchId == -1 || size == 0) {
                        try {
                            //线程休眠2秒
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        //如果有数据,处理数据
                        printEntry(message.getEntries());
                    }
                    //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connector.disconnect();
            }
        }
    
        /**
         * 打印canal server解析binlog获得的实体类信息
         */
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    //开启/关闭事务的实体类型,跳过
                    continue;
                }
                //RowChange对象,包含了一行数据变化的所有特征
                //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
                CanalEntry.RowChange rowChage;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                }
                //获取操作类型:insert/update/delete类型
                CanalEntry.EventType eventType = rowChage.getEventType();
                //打印Header信息
                System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
                //判断是否是DDL语句
                if (rowChage.getIsDdl()) {
                    System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
                }
                //获取RowChange对象里的每一行数据,打印出来
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    //如果是删除语句
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                        //如果是新增语句
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                        //如果是更新的语句
                    } else {
                        //变更前的数据
                        System.out.println("------->; before");
                        printColumn(rowData.getBeforeColumnsList());
                        //变更后的数据
                        System.out.println("------->; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    
    

    运行结果:每当对数据库进行修改时,canal服务端都会感知到。

    image-20211119170214354

  • 相关阅读:
    yum命令速查
    5分钟理解编译系统
    Nginx(一)安装及启停
    Linux时间命令
    常用七种排序的python实现
    python迭代器、生成器、装饰器
    LeetCode【第217题】Contains Duplicate
    LeetCode【第1题】Two Sum
    python【第二十篇】Django表的多对多、Ajax
    不要问我DO在哪里?
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/15578210.html
Copyright © 2011-2022 走看看