zoukankan      html  css  js  c++  java
  • SpringBoot 整合cana 实现数据同步

    微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变更进行增量订阅&消费的业务。无论是canal实验需要还是为了增量备份、主从复制和恢复,都是需要开启mysql-binlog日志,数据目录设置到不同的磁盘分区可以降低io等待。

    官网:https://github.com/alibaba/canal

    canal 工作原理
    canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    canal 解析 binary log 对象(原始为 byte 流)

    canal 搭建

    搭建mysql环境

    1,修改配置文件

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复;

      重启MySQL服务后,确认是否开启了binlog(注意一点是MySQL8.x默认开启binlog)SHOW VARIABLES LIKE '%bin%';:

    
    
    
    
    

    2,授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant(省略第三步)

    CREATE USER canal IDENTIFIED BY 'root'; 
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ;
    FLUSH PRIVILEGES;

    3,新建一个用户名canal密码为QWqw12!@的新用户,赋予REPLICATION SLAVE和 REPLICATION CLIENT权限:

    CREATE USER canal IDENTIFIED BY '123456!@';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456!@';

    搭建canal环境

    下载Linux最新稳定版(canal.deployer-1.1.4.tar.gz):https://github.com/alibaba/canal/releases

    解压后修改/canal/conf/example下的instance.properties配置文件:

    canal.instance.master.address,数据库地址,这里指定为127.0.0.1:3306。
    canal.instance.dbUsername,监听的数据库用户名。
    canal.instance.dbPassword,监听的数据库密码。
    新增canal.instance.defaultDatabaseName,默认那个库,这里指定为test(需要在MySQL中建立一个test库)

    启动

    sh /canal/bin/startup.sh
    # 查看服务日志
    tail -100f /canal/logs/canal/canal
    # 查看实例日志  -- 一般情况下,关注实例日志即可
    tail -100f /canal/logs/example/example.log

    到目前为止 canal的服务端我们已经搭建好了 但是到目前 我们只是把数据库的binlog 拉到canal中,我们还得编写客户端消费数据

    properties配置文件

    properties配置分为两部分:

    • canal.properties (系统根配置文件)
    • instance.properties (instance级别的配置文件,每个instance一份)
    1. instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)
    2. common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】

    instance.properties介绍:
    a. 在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件

    比如:

    canal.destinations = example1,example2  #spring客户端注意指定的不同名字

    这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.

     


    两种方式,官方提供的demo和springboot  starter

    1,官方提供的

     <dependency>
           <groupId>com.alibaba.otter</groupId>
           <artifactId>canal.client</artifactId>
           <version>1.1.4</version>
     </dependency>
    package com.example.demo.test;

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;

    import java.net.InetSocketAddress;
    import java.util.List;


    public class CanalTest {

    public static void main(String[] args) throws Exception {
    //canal.ip = 192.168.56.104
    //canal.port = 11111
    //canal.destinations = example
    //canal.user =
    //canal.passwd =
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.56.104", 11111), "example", "", "");
    try {
    connector.connect();
    //监听的表, 格式为数据库.表名,数据库.表名
    connector.subscribe(".*\..*");
    connector.rollback();

    while (true) {
    Message message = connector.getWithoutAck(100); // 获取指定数量的数据
    long batchId = message.getId();
    if (batchId == -1 || message.getEntries().isEmpty()) {
    Thread.sleep(1000);
    continue;
    }
    // System.out.println(message.getEntries());
    printEntries(message.getEntries());
    connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
    // connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
    }
    }catch (Exception e){

    }finally {
    connector.disconnect();
    }
        }

    private static void printEntries(List<Entry> entries) throws Exception {
    for (Entry entry : entries) {
    if (entry.getEntryType() != EntryType.ROWDATA) {
    continue;
    }

    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

    EventType eventType = rowChange.getEventType();
    System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

    for (RowData rowData : rowChange.getRowDatasList()) {
    switch (rowChange.getEventType()) {
    case INSERT:
    System.out.println("INSERT ");
    printColumns(rowData.getAfterColumnsList());
    break;
    case UPDATE:
    System.out.println("UPDATE ");
    printColumns(rowData.getAfterColumnsList());
    break;
    case DELETE:
    System.out.println("DELETE ");
    printColumns(rowData.getBeforeColumnsList());
    break;

    default:
    break;
    }
    }
    }
    }

    private static void printColumns(List<Column> columns) {
    for (Column column : columns) {
    System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
    }
    }

    操作数据库增删改,控制台则会打印

     参考:https://github.com/gxmanito/canal-client

    https://gitee.com/zhiqishao/canal-client/tree/master

    2,springboot starter

    https://github.com/NormanGyllenhaal/canal-client

    <dependency>
        <groupId>top.javatool</groupId>
        <artifactId>canal-spring-boot-starter</artifactId>
        <version>1.2.1-RELEASE</version>
    </dependency>
    package com.example.demo.test;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    import top.javatool.canal.client.annotation.CanalTable;
    import top.javatool.canal.client.handler.EntryHandler;
    
    @Slf4j
    @Component
    @CanalTable(value = "test")
    public class UserHandler implements EntryHandler<Test> {
    
        @Override
        public void insert(Test user) {
            log.info("insert message  {}", user);
        }
    
        @Override
        public void update(Test before, Test after) {
            log.info("update before {} ", before);
            log.info("update after {}", after);
        }
        @Override
        public void delete(Test user) {
            log.info("delete  {}", user);
        }
    }
    package com.example.demo.test;
    
    import lombok.Data;
    
    import java.io.Serializable;
    
    /**
     * @Description //TODO
     * @Author GaoX
     * @Date 2020/6/28 14:44
     */
    @Data
    //@Table(name = "test")
    public class Test implements Serializable {
    
        private Integer id;
        private String name;
    
    }

  • 相关阅读:
    Linux的web服务的介绍
    Linux的DNS主从服务器部署
    K8S Calico
    K8S flannel
    K8S dashboard
    K8S RBAC
    kubernetes认证和serviceaccount
    K8S 部署 ingress-nginx (三) 启用 https
    K8S 部署 ingress-nginx (二) 部署后端为 tomcat
    K8S 部署 ingress-nginx (一) 原理及搭建
  • 原文地址:https://www.cnblogs.com/gaomanito/p/13203809.html
Copyright © 2011-2022 走看看