zoukankan      html  css  js  c++  java
  • canal使用

    一:canal使用

    下载:https://github.com/alibaba/canal/releases

    文档:https://github.com/alibaba/canal/wiki/QuickStart

    这里使用canal.deployer-1.1.3-SNAPSHOT.tar.gz

    1. 上传解压

    mkdir /usr/local/src/canal
    cd /usr/local/src/canal
    tar -zxvf canal.deployer-1.1.3-SNAPSHOT.tar.gz
    

    2. mysql安装,查看binlog是否开启

    mysql安装参考:https://www.cnblogs.com/ttzzyy/p/9063737.html

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,(/etc/my.cnf)中配置如下

    #[mysqld]下
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=10 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复,一般使用IP最后一位
    

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

    查看binlog是否开启

    #查看binlog相关数据库命令:
    #是否启用了日志
    mysql>show variables like 'log_bin';
    
    #怎样知道当前的日志
    mysql> show master status;
    
    #查看mysql binlog模式
    show variables like 'binlog_format';
    
    #获取binlog文件列表
    show binary logs;
    
    #查看当前正在写入的binlog文件
    show master statusG
    
    #查看指定binlog文件的内容
    show binlog events in 'mysql-bin.000002';
    
    
    

    3. 配置canal实例,设置本地数据库信息

    • vim conf/example/instance.properties

    • ## mysql serverId
      canal.instance.mysql.slaveId = 1234
      #position info,需要改成自己的数据库信息
      canal.instance.master.address = 127.0.0.1:3306 
      canal.instance.master.journal.name = 
      canal.instance.master.position = 
      canal.instance.master.timestamp = 
      #canal.instance.standby.address = 
      #canal.instance.standby.journal.name =
      #canal.instance.standby.position = 
      #canal.instance.standby.timestamp = 
      #username/password,需要改成自己的数据库信息
      canal.instance.dbUsername = canal  
      canal.instance.dbPassword = canal
      #canal.instance.defaultDatabaseName =            !!!!!!!!!!#这条注释掉就是全库监听
      canal.instance.connectionCharset = UTF-8
      #table regex
      canal.instance.filter.regex =.*\..*
      
    • vim conf/canal.properties

    • #去掉canal.instance.parser.parallelThreadSize注释,java连接需要才能成功运行
      canal.instance.parser.parallelThreadSize = 16
      #如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
      canal.instance.parser.parallel = true
      #如果报错com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set注释掉下面的
      #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
      

    4. 启动Canal查看日志

    ./bin/startup.sh
    #查看 server 日志
    tail -f -n 50 logs/canal/canal.log
    #查看 instance 的日志
    tail -f -n 50 logs/example/example.log
    

    5. 使用

    6. Java操作canal

    • pom.xml

    •     <!--  canal  -->
          <dependency>
              <groupId>com.alibaba.otter</groupId>
              <artifactId>canal.client</artifactId>
              <version>1.0.25</version>
          </dependency>
          <dependency>
              <groupId>com.alibaba.otter</groupId>
              <artifactId>canal.protocol</artifactId>
              <version>1.0.25</version>
          </dependency>
      
    • SimpleClient.java

    • package com.tzy.canal;
      
      import java.io.File;
      import java.net.InetSocketAddress;
      import java.util.List;
      import java.util.stream.Collectors;
      
      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.protocol.CanalEntry.Column;
      import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
      import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
      import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
      import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
      import com.alibaba.otter.canal.protocol.Message;
      
      public class SimpleClient {
      
          public static void main(String[] args) throws Exception {
              //普通模式
              CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.192.10", 11111), "example", "", "");
              //集群模式
              //CanalConnector connector = CanalConnectors.newClusterConnector("192.168.192.10:2181,192.168.192.11:2181,192.168.192.12:2181", "example", "", "");
              try {
                  connector.connect();
                  connector.subscribe(".*\..*");// .*代表database,..*代表table
                  connector.rollback();
      
                  while (true) {
                      Message message = connector.getWithoutAck(100);// 获取指定数量的数据
                      long batchId = message.getId();
                      if (batchId == -1 || message.getEntries().isEmpty()) {
                          System.out.println("sleep");
                          Thread.sleep(1000);
                          continue;
                      }
                      printEntries(message.getEntries());
                      connector.ack(batchId);
                  }
              } finally {
                  connector.disconnect();
              }
          }
      
          private static void printEntries(List<Entry> entries) throws Exception {
              for (Entry entry : entries) {
                  if (entry.getEntryType() != EntryType.ROWDATA) {
                      continue;
                  }
      
                  RowChange rowChange = null;
                  try {
                      rowChange = RowChange.parseFrom(entry.getStoreValue());
                  } catch (Exception e) {
                      throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
                  }
      
                  System.out.println(String.format("================> binlog[%s:%s] ,[库名:%s,表名:%s] , eventType : %s",
                          entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                          entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowChange.getEventType()));
      
                  for (RowData rowData : rowChange.getRowDatasList()) {
                      switch (rowChange.getEventType()) {
                          case INSERT:
                              System.out.print("INSERT ");
                              printColumns(rowData.getAfterColumnsList());
                              break;
      
                          case UPDATE:
                              System.out.print("UPDATE ");
                              printColumns(rowData.getAfterColumnsList());
                              break;
      
                          case DELETE:
                              System.out.print("DELETE ");
                              printColumns(rowData.getBeforeColumnsList());
                              break;
      
                          default:
                              break;
                      }
                  }
              }
          }
      
          private static void printColumns(List<Column> columns) {
              String line = columns.stream()
                      .map(column -> column.getName() + ":" + column.getValue() + ";update:" + column.getUpdated())
                      .collect(Collectors.joining(" , "));
              System.out.println(line);
          }
      
      }
      

      启动测试

      CREATE TABLE `student` (
        `id` int(11) NOT NULL,
        `name` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
      
      INSERT INTO student (id, `name`) VALUES (1, 'zhangsan');
      
      UPDATE student SET `name`='lisi' WHERE id=1 LIMIT 1;
      
      DELETE FROM student WHERE id=1
      
      

    二:集群搭建

    7. zk搭建

    zk搭建参考:https://www.cnblogs.com/ttzzyy/articles/10205202.html

    8. 修改canal配置

    • 主节点修改 vim conf/canal.properties (修改)

    • canal.zkServers=192.168.192.10:2181,192.168.192.11:2181,192.168.192.12:2181
      canal.instance.global.spring.xml=classpath:spring/default-instance.xml
      
    • 拷贝canal到从节点

    • scp canal.deployer-1.0.26-SNAPSHOT.tar.gz root@slave1:/usr/local/src/canal/
      scp canal.deployer-1.0.26-SNAPSHOT.tar.gz root@slave2:/usr/local/src/canal/
      #从节点解压缩
      tar -zxvf canal.deployer-1.0.26-SNAPSHOT.tar.gz
      
    • 修改slave节点配置文件 vim conf/example/instance.properties

    • ## mysql serverId修改与master不同即可(1235,1236)
      canal.instance.mysql.slaveId=1235
      # position info修改成mysql地址即可
      canal.instance.master.address=192.168.192.10:3306
      #canal.instance.defaultDatabaseName =            !!!!!!!!!!#这条注释掉就是全库监听
      
    • 修改slave节点配置文件 vim conf/canal.properties

    • canal.zkServers=192.168.192.10:2181,192.168.192.11:2181,192.168.192.12:2181
      canal.instance.global.spring.xml=classpath:spring/default-instance.xml
      

    9. 三台机器分别启动zk,canal

    #zk
    zkServer.sh start
    #canal
    ./bin/startup.sh
    
    • 查看Canal在ZK中的状态

    • get /otter/canal/destinations/example/running
      
    • 测试HA是否配置成功:stop掉master这台机器,再次查看Canal在ZK中的状态

      • 很明显active已经切换到slave2上了
    • 客户端java代码修改为集群方式

    • CanalConnector connector = CanalConnectors.newClusterConnector("192.168.192.10:2181,192.168.192.11:2181,192.168.192.12:2181","example", "", "");
      

    10. properties配置文件参数解释

    canal.properties (系统根配置文件)
    instance.properties (instance级别的配置文件,每个instance一份)

    • canal.properties

    • canal.destinations  #当前server上部署的instance列表 
      canal.conf.dir  #conf/目录所在的路径   
      canal.auto.scan #开启instance自动扫描
      #如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:
      #a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动
      #b. instance目录删除:卸载对应instance配置,如已启动则进行关闭
      #c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作
      canal.auto.scan.interval    #instance自动扫描的间隔时间,单位秒  
      canal.instance.global.mode  #全局配置加载方式   
      canal.instance.global.lazy  #全局lazy模式   
      canal.instance.global.manager.address   #全局的manager配置方式的链接信息    无
      canal.instance.global.spring.xml    #全局的spring配置方式的组件文件 
      canal.instance.example.mode
      canal.instance.example.lazy
      canal.instance.example.spring.xml
      #instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式
      #命名规则:canal.instance.{name}.xxx 无
      
    • instance.properties

    • canal.id    #每个canal server实例的唯一标识,暂无实际意义   
      canal.ip    #canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务   
      canal.port  #canal server提供socket服务的端口  
      canal.zkServers #canal server链接zookeeper集群的链接信息
      #例子:10.20.144.22:2181,10.20.144.51:2181 
      canal.zookeeper.flush.period    #canal持久化数据到zookeeper上的更新频率,单位毫秒    
      canal.instance.memory.batch.mode    #canal内存store中数据缓存模式
      #1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
      #2. MEMSIZE : 根据buffer.size  * buffer.memunit的大小,限制缓存记录的大小  
      canal.instance.memory.buffer.size   #canal内存store中可缓存buffer记录数,需要为2的指数  
      canal.instance.memory.buffer.memunit    #内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小  
      canal.instance.transactionn.size    最大事务完整解析的长度支持
      超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性
      canal.instance.fallbackIntervalInSeconds    #canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒
      #说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
      canal.instance.detecting.enable #是否开启心跳检查
      canal.instance.detecting.sql    #心跳检查sql    insert into retl.xdual values(1,now()) on duplicate key update x=now()
      canal.instance.detecting.interval.time  #心跳检查频率,单位秒
      canal.instance.detecting.retry.threshold    #心跳检查失败重试次数
      canal.instance.detecting.heartbeatHaEnable  #心跳检查失败后,是否开启自动mysql自动切换
      #说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
      canal.instance.network.receiveBufferSize    #网络链接参数,SocketOptions.SO_RCVBUF 
      canal.instance.network.sendBufferSize   #网络链接参数,SocketOptions.SO_SNDBUF 
      canal.instance.network.soTimeout    #网络链接参数,SocketOptions.SO_TIMEOUT
      

    三:对接kafka

    11. kafka搭建

    kafka搭建参考:https://www.cnblogs.com/ttzzyy/articles/10476652.html

    12. 修改配置文件

    • a. Kafka配置文件基于上面的修改 vim server.properties

    • listeners=PLAINTEXT://:9092
       # 本机 ip + 端口
      advertised.listeners=PLAINTEXT://192.168.192.10:9092 
      
    • b. canal配置文件修改 vim conf/example/instance.properties

    • #  按需修改成自己的数据库信息
      #################################################
      ...
      canal.instance.master.address=192.168.192.10:3306
      # username/password,数据库的用户名和密码
      ...
      canal.instance.dbUsername = canal
      canal.instance.dbPassword = canal
      ...
      # mq config
      canal.mq.topic=example
      # 针对库名或者表名发送动态topic
      #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
      canal.mq.partition=0
      # hash partition config
      #canal.mq.partitionsNum=3
      #库名.表名: 唯一主键,多个表之间用逗号分隔
      #canal.mq.partitionHash=mytest.person:id,mytest.role:id
      #################################################
      
    • c. canal配置文件修改 vim conf/canal.properties

    • # ...
      # 可选项: tcp(默认), kafka, RocketMQ
      canal.serverMode = kafka
      # ...
      # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
      #canal.mq.servers = 127.0.0.1:6667
      canal.mq.servers = 192.168.192.10:9092,192.168.192.11:9092,192.168.192.12:9092
      canal.mq.retries = 0
      # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
      canal.mq.batchSize = 16384
      canal.mq.maxRequestSize = 1048576
      # flatMessage模式下请将该值改大, 建议50-200
      canal.mq.lingerMs = 100
      canal.mq.bufferMemory = 33554432
      # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
      canal.mq.canalBatchSize = 50
      # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
      canal.mq.canalGetTimeout = 100
      # 是否为flat json格式对象
      canal.mq.flatMessage = true
      canal.mq.compressionType = none
      canal.mq.acks = all
      # kafka消息投递是否使用事务
      canal.mq.transaction = false
      
      

    13. 启动zk,kafka,canal

    #启动zk
    zkServer.sh start
    #启动kafka
    start-kafka.sh
    #启动canal
    ./bin/startup.sh
    #关闭
    zkServer.sh stop
    kafka-server-stop.sh
    ./bin/stop.sh
    #查看日志没报错就行
    tail -n 50 logs/example/example.log
    tail -n 50 logs/canal/canal.log
    #查看kafka所有topic
    kafka-topics.sh --list --zookeeper 192.168.192.10:2181,192.168.192.11:2181,192.168.192.12:2181
    #新增kafka消费者
    kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic example 
    

    测试kafka是否有数据消费

  • 相关阅读:
    哈希冲突详解(拉链法,开放地址法)
    哈希冲突详解(拉链法,开放地址法)
    排序算法
    排序算法
    加分二叉树
    加分二叉树
    动态规划
    动态规划
    动态规划
    动态规划
  • 原文地址:https://www.cnblogs.com/ttzzyy/p/11272208.html
Copyright © 2011-2022 走看看