zoukankan      html  css  js  c++  java
  • canal使用小结

    一、基本概念

      mysql本身支持主从备份,原理就是主库master生成的binlog文件记录了所有的增删改操作语句,然后slave向master发送dump协议,master将binlog日志文件推送给从库slave解析执行,达到数据一致备份的目的。

      canal,基于java开发,伪装成一个slave,去监听获取增量的binlog日志文件,然后解析处理获得的相关数据(过程中可以加入自由的加入一些额外的功能性代码需求),利用获得的数据,可以用其他不同用途,比如同步到es中做搜索相关。

    二、canal基本配置使用

      测试环境:windows、mysql 5.7.26、canal 1.1.3、Navicat for MySQL。

      1、mysql安装和配置

        1.1、下载安装解压忽略。进入mysql解压后目录,新增data文件夹。

        1.2、新增my.ini文件,添加配置:

    [client]
    # 设置mysql客户端连接服务端时默认使用的端口
    port=3311
    [mysql]
    default-character-set=utf8
    [mysqld]
    character-set-server=utf8
    port=3311
    # 默认存储引擎innoDB
    default-storage-engine=INNODB
    # Server Id.数据库服务器id,这个id用来在主从服务器中标记唯一mysql服务器
    server-id=1
    datadir=E:\soft\mysql2\data
    bind-address=0.0.0.0
    # 开启binlog日志
    log-bin=mysql-bin
    binlog_format = ROW
    

        1.3、cmd进入并目录,启动/关闭 mysql:

    //启动
    net start mysql
    //关闭
    net stop mysql
    

        1.4、连接mysql并设置密码

        连接:mysql -uroot -p,初始密码为空,一直按enter即可进入mysql命令行。

        进入后设置密码:

    // 切换库
    use mysql;
    // 设置密码
    update user set authentication_string=PASSWORD("123456") where user="root";
    // 刷新生效
     flush privileges;
    

        设置成功后,quit退出重进,输入密码123456。

        1.5、新增个canal的访问账户

    // 新增用户
    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    // 授权
    GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    // 刷新
    FLUSH PRIVILEGES;
    

      2、canal安装配置

        下载canal包(https://github.com/alibaba/canal/releases),解压本地目录。

        2.1、目录结构

        

          其中cfang是拷贝example,需要多个instance可继续拷贝,再修改每个instance中的配置文件。

        2.1、配置canal.properties

        

          port可自定义,用于canal对外服务接口。destinations配置instance列表(连接db)。

        2.2、配置instance.properties

        

          其中canal.instance.defaultDatabaseName可不配置,全库扫描。

        2.3、启动

          bin目录,点击startup.bat,查看/logs/canal/canal.log日志文件,出现以下则为开启成功:

        2.4、canal数据格式:

    Entry
        Header
            logfileName [binlog文件名]
            logfileOffset [binlog position]
            executeTime [发生的变更]
            schemaName
            tableName
            eventType [insert/update/delete类型]
        entryType   [事务头BEGIN/事务尾END/数据ROWDATA]
        storeValue  [byte数据,可展开,对应的类型为RowChange]   
    RowChange
        isDdl       [是否是ddl变更操作,比如create table/drop table]
        sql     [具体的ddl sql]
        rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
            beforeColumns [Column类型的数组]
            afterColumns [Column类型的数组]     
    Column
        index      
        sqlType     [jdbc type]
        name        [column name]
        isKey       [是否为主键]
        updated     [是否发生过变更]
        isNull      [值是否为null]
        value       [具体的内容,注意为文本]

        2.5、java程序测试

          pom导入:

    		<dependency>
    		    <groupId>com.alibaba.otter</groupId>
    		    <artifactId>canal.client</artifactId>
    		    <version>1.1.3</version>
    		</dependency>
    

          java测试:

    package com.cfang.prebo;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.stream.Collectors;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.alibaba.otter.canal.protocol.Message;
    
    public class CanalTest {
    
    	public static void main(String[] args) throws Exception {
    		
    		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", "");
    		connector.connect();
    		connector.subscribe(".*\..*");
    		connector.rollback();
    		
    		while (true) {
                Message message = connector.getWithoutAck(100);  // 获取指定数量的数据
                long batchId = message.getId();
                if (batchId == -1 || message.getEntries().isEmpty()) {
                    Thread.sleep(1000);
                    continue;
                }
               // System.out.println(message.getEntries());
                printEntries(message.getEntries());
                connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
    //            connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
            }
    	}
    	
    	private static void printEntries(List<Entry> entries) throws Exception {
            for (Entry entry : entries) {
                if (entry.getEntryType() != EntryType.ROWDATA) {
                    continue;
                }
    
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                
                EventType eventType = rowChange.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
                
                for (RowData rowData : rowChange.getRowDatasList()) {
                    switch (rowChange.getEventType()) {
    	                case INSERT:
    	                	 System.out.println("INSERT ");
    	                     printColumns(rowData.getAfterColumnsList());
    	                     break;
    	                case UPDATE:
    	                    System.out.println("UPDATE ");
    	                    printColumns(rowData.getAfterColumnsList());
    	                    break;
    	                case DELETE:
    	                    System.out.println("DELETE ");
    	                    printColumns(rowData.getBeforeColumnsList());
    	                    break;
    	
    	                default:
    	                    break;
    	             }
                }
            }
        }
    
    	private static void printColumns(List<Column> columns) {
            for(Column column : columns) {
            	System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    

      Navicat中进行相关操作的时候,可在控台看到输出,例如:

      

      

  • 相关阅读:
    监控神器-普罗米修斯Prometheus的安装
    Prometheus+Grafana搭建监控系统
    Win10安装PyQt5与Qt Designer
    分区 partition
    16-成绩3
    15-成绩2
    14-票价问题
    13-union 、distinc、 join
    集群无法启动的问题?
    12-order by和group by 原理和优化 sort by 倒叙
  • 原文地址:https://www.cnblogs.com/eric-fang/p/9969732.html
Copyright © 2011-2022 走看看