zoukankan      html  css  js  c++  java
  • canal使用记录

    canal是阿里巴巴的来源项目。我们可以通过配置binlog实现数据库监控,得到数据库表或者数据的更新信息。
    参考我的文档前先去官网看下,可能已经支持更高版本的MySQL了

    1. 查看官方开源项目

    https://github.com/alibaba/canal

    2. 下载最新的canal.deployer-XXXX-SNAPSHOT.tar.gz

    https://github.com/alibaba/canal/releases

    3. 查看wiki

    ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
    所以需要安装mysql5.7以下的版本稳妥些

    4. 去MySQL官网下载mysql并安装

    5. 查看mysql安装目录

    image.png

    6. 复制一个my-default.ini改名叫my.ini

    修改对应位置:
    添加:

    image.png
     
    #添加这一行就ok
    log-bin=mysql-bin 
    #选择row模式 
    binlog-format=ROW
    #配置mysql replaction需要定义,不能和canal的slaveId重复 
    server_id=1
    character-set-server=utf8
    collation-server=utf8_general_ci

    添加:

    image.png
    [mysql]
    default-character-set = utf8
    [mysql.server]
    default-character-set = utf8
    [mysqld_safe]
    default-character-set = utf8
    [client]
    default-character-set = utf8
    

      

    7. 重启mysql

    .6in>sc delete mysql
    in>net stop mysql
    in>mysqld --install mysql --defaults-file="C:Program FilesMySQLMySQL Server 5.6my.ini"
    

      

    8. 查看是否开启binlog

    show variables like'log_%';
    image.png
     

    On:表示已开启

    9. 创建数据库canal用户

    image.png

    官网是%,%是对所有非本地主机授权,不包括localhost。由于我们是在windows本机上做,所以需要配置为localhost.

    10. 修改canal-deploy-> confexample里的instance.properties

    image.png

    这两个新添的配置可以注解调,还不太明白具体的用处

    canal.instance.tsdb.dbUsername=canal
    canal.instance.tsdb.dbPassword=canal

    注:这里的slaveId=1234不能和my.ini的一样

    11. 在cmd下启动canal-deploy

    image.png
    image.png

    如果没有报错那就是启动成功了

    12. 创建canal-client服务

    Pom或者gradle。主要依赖:

    compile group: 'org.jetbrains', name: 'annotations', version: '13.0'
    compile group: 'com.alibaba.otter', name: 'canal.client', version: '1.0.25'
    

      

    13. 编写客户端类

    package com.shao.demo.canalclient;
    import java.net.InetSocketAddress;
    import java.util.List;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.alibaba.otter.canal.client.*;
    import org.jetbrains.annotations.NotNull;
    /**
     * @author zhiqi.shao
     * @Date 2018/6/4 18:29
     */
    public class ClientSample {
    
     public static void main(String args[]) {
     // 创建链接 
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
     11111), "example", "", "");
     int batchSize = 1000;
     int emptyCount = 0;
      try {
        connector.connect();
       connector.subscribe(".*\..*");
       connector.rollback();
       int totalEmtryCount = 1200;
       while (emptyCount < totalEmtryCount) {
       Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = 
       message.getId();
        int size = message.getEntries().size();
       if (batchId == -1 || size == 0) {
           emptyCount++;
          System.out.println("empty count : " + emptyCount);
          try {
             Thread.sleep(1000);
             } catch (InterruptedException e) {
             e.printStackTrace();
            }
        } else {
            emptyCount = 0;
           // System.out.printf("message[batchId=%s,size=%s] 
    ", batchId, size);
           printEntry(message.getEntries());
       }
        connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 }
        System.out.println("empty too many times, exit");
     } finally {
         connector.disconnect();
     }
     }
    
     private static void printEntry(@NotNull List<Entry> entrys) {
     for (Entry entry : entrys) {
     if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
     continue;
     }
    
     RowChange rowChage = null;
              try {
     rowChage = RowChange.parseFrom(entry.getStoreValue());
     } catch (Exception e) {
     throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
               e);
     }
    
     EventType eventType = rowChage.getEventType();
     System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
     eventType));
    
     for (RowData rowData : rowChage.getRowDatasList()) {
         if (eventType == EventType.DELETE) {
            printColumn(rowData.getBeforeColumnsList());
         } else if (eventType == EventType.INSERT) {
           printColumn(rowData.getAfterColumnsList());
         } else {
            System.out.println("-------> before");
            printColumn(rowData.getBeforeColumnsList());
           System.out.println("-------> after");
           printColumn(rowData.getAfterColumnsList());
        }
       }
      }
     }
    
     private static void printColumn(@NotNull List<Column> columns) {
         for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
         }
       }
    }
    

      

    14. 启动客户端

    use canal_test;
    CREATE TABLE user (
     uid INT(4) PRIMARY KEY NOT NULL AUTO_INCREMENT,
     name VARCHAR(10) NOT NULL
    ); 
    
    insert into user (name) values('shaoshao');

    image.png

    15. 关于update

    rowData.getAfterColumnsList()
    image.png

    16. 常见错误

    1. 服务端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
      是由于你改了配置文件,导致meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
      解决方法:删除meta.dat删除,再重启canal,问题解决;
    2. 客户端:java.lang.OutOfMemoryError: Java heap space
      canal消费端挂了太久,在zk对应conf下节点的
      /otter/canal/destinations/test_db/1001/cursor 位点信息是很早以前,导致重启canal时,从很早以前的位点开始消费,导致canal服务器内存爆掉
    3. 服务端ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x0191aafd, /192.168.10.68:49502 => /192.168.10.68:11111], exception=java.io.IOException:
      当客户端停掉后,canal服务端会报此异常
      客户端:com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong with reason: something goes wrong with channel:[id: 0x01311037, /192.168.10.68:52086 => /192.168.10.68:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
      当服务端停掉或者重启中,客户端连不上就会抛出此异常。场景修改了服务点的配置文件此时服务端会重启,客户端就会报次异常

    17. 消费过滤

    canalConnector.subscribe("canal_test..");//客户端只消费canal_test库的数据变化
    subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是.
    ..*,那么相当于你消费了所有的更新数据。

    18. 配置

    1. 【instance.properties配置定义优先级高于canal.properties】
    2. 修改了服务端配置文件,服务器会自动重启

    19. 关于HA机制的设计

    canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
    canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

    20. Canal的工作原理

    image.png

    原理相对比较简单:

    1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    3. canal解析binary log对象(原始为byte流)

    21. 链接方式(参考:http://www.importnew.com/25189.html

    1. HA配置架构图

    image.png

    2. 单连

    image.png

    3. 两个client+两个instance+1个mysql

    当mysql变动时,两个client都能获取到变动

    image.png

    4. 一个server+两个instance+两个mysql+两个client

    image.png

    5. instance****的standby配置

    image.png

    Standby:备库

    22. 总结

    这里总结了一下Canal的一些点,仅供参考:

    1. 原理:模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;mysql master收到dump请求,开始推送binary log给slave(也就是canal);解析binary log对象(原始为byte流)
    2. 重复消费问题:在消费端解决。
    3. 采用开源的open-replicator来解析binlog
    4. canal需要维护EventStore,可以存取在Memory, File, zk
    5. canal需要维护客户端的状态,同一时刻一个instance只能有一个消费端消费
    6. 数据传输格式:protobuff
    7. 支持binlog format 类型:statement, row, mixed. 多次附加功能只能在row下使用,比如otter
    8. binlog position可以支持保存在内存,文件,zk中
    9. instance启动方式:rpc/http; 内嵌
    10. 有ACK机制
    11. 无告警,无监控,这两个功能都需要对接外部系统
    12. 方便快速部署。

    23. 我调试成功的代码地址

    https://gitee.com/zhiqishao/canal-client

  • 相关阅读:
    解决ecshop进入后台服务器出现500的问题
    Java8新特性(拉姆达表达式lambda)
    使用Optional优雅处理null
    Arrays.asList 存在的坑
    Java提供的几种线程池
    冒泡排序及优化详解
    如何让MySQL语句执行加速?
    关于https的五大误区
    127.0.0.1和0.0.0.0地址的区别
    宽带网络技术-大题重点
  • 原文地址:https://www.cnblogs.com/shaozhiqi/p/11534658.html
Copyright © 2011-2022 走看看