zoukankan      html  css  js  c++  java
  • 阿里Canal框架数据库同步-实战教程

    一、Canal简介:

      canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

    二、背景介绍:

      早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

    三、适用场景:

      在一些复杂的业务逻辑中,可能插入或者查询数据都比较频繁,如果一直在数据库插入查询会造成速度非常慢,可以把数据库表分成两个库,一个库用来做查询,一个库作为插入数据,读写分离,怎么解决呢?就可以用canal框架来监听数据是否发生改变,来同步数据。

    比如大部分人都做搜索引擎ES,咱们不可能每次数据库更新了数据手动去同步索引库,咱们就可以用Canal来监听数据库增删改时去重新导入索引库,保持数据一致性。

    四、Canal的工作机制

      

    复制过程分成三步:

    (1) Master主库将改变记录,写到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);

    (2) Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);

    (3) Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

    四、Canal中间件功能

    基于纯java语言开发,可以用于做增量数据订阅和消费功能。

    相比于传统的数据同步,我们通常需要进行先搭建主从架构,然后使用binlog日志进行读取,然后指定需要同步的数据库,数据库表等信息。但是随着我们业务的不断复杂,这种传统的数据同步方式以及开始变得较为繁琐,不够灵活。

    canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流),通过对binlog数据进行解析即可获取需要同步的数据,在进行同步数据的过程中还可以加入开发人员的一些额外逻辑处理,比较开放。

    Binlog的三种基本类型分别为:

    STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况

    ROW模式除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看。

    MIX模式比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式

    五、安装Canal

    1.准备工作:win10系统、jdk1.8、mysql5.7、canal1.1.1

     2.连接自己的数据,检查binlog功能是否有开启,检查命令:show variables like 'log_bin';

     3.如果显示状态为OFF表示该功能未开启,就需要找到自己安装的Mysql位置找到my.ini文件,在此文件的最下面一行加上如下(注意:保存文件后重启下自己的Mysql数据库):

    1 server-id=1  #不能与canal的slaveId重复即可
    2 log-bin=mysql-bin
    3 binlog_format = ROW  #设置ROW模式

    4.再次查看binlog功能是否有开启,检查命令:show variables like 'log_bin';

     5.我们需要创建一个用户操作数据库的写入操作,我们需要给用户权限,执行如下sql语句:

    1 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    2 GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    3 FLUSH PRIVILEGES;

    6.下载我canal客户端,官方地址进行相应版本的安装包进行下载(注意:如果下载翻到本文最下面联系我): https://github.com/alibaba/canal/releases


    7.下载成功后,解压压缩包后进入conf下面的example目录下面的instance.properties文件打开编辑如下地方:

     8.返回bin目录点击startup.bat启动canal服务端,如下图表示启动成功:

     六、java代码实现

    1.新建一个maven项目,导入maven jar包如下:

    1 <dependency>
    2     <groupId>com.alibaba.otter</groupId>
    3     <artifactId>canal.client</artifactId>
    4     <version>1.1.0</version>
    5 </dependency>

    2.编写测试代码

      1 package com.fuzongle.canal.conf;
      2 
      3 import com.alibaba.otter.canal.client.CanalConnector;
      4 import com.alibaba.otter.canal.client.CanalConnectors;
      5 import com.alibaba.otter.canal.protocol.CanalEntry;
      6 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
      7 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
      8 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
      9 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
     10 import com.alibaba.otter.canal.protocol.Message;
     11 import com.google.protobuf.InvalidProtocolBufferException;
     12 
     13 import java.net.InetSocketAddress;
     14 import java.util.List;
     15 import java.util.Queue;
     16 import java.util.concurrent.ConcurrentLinkedQueue;
     17 /**
     18  * @Auther: fzl
     19  * @Date: 2020/4/20 01:21
     20  * @Description:
     21  */
     22 public class TestCanal {
     23 
     24     private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
     25 
     26     public static void main(String[] args) {
     27         //获取canalServer连接:本机地址,端口号
     28         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
     29                 11111), "example", "", "");
     30         int batchSize = 1000;
     31         try {
     32             //连接canalServer
     33             connector.connect();
     34             //订阅Desctinstion
     35             connector.subscribe();
     36             connector.rollback();
     37             try {
     38                 while (true) {
     39                     //尝试从master那边拉去数据batchSize条记录,有多少取多少
     40                     //轮询拉取数据   上面的where
     41                     Message message = connector.getWithoutAck(batchSize);
     42                     long batchId = message.getId();
     43                     int size = message.getEntries().size();
     44                     if (batchId == -1 || size == 0) {
     45                         //睡眠
     46                         Thread.sleep(1000);
     47                     } else {
     48                         dataHandle(message.getEntries());
     49                     }
     50                     connector.ack(batchId);
     51                     System.out.println("aa"+size);
     52                     //当队列里面堆积的sql大于一定数值的时候就模拟执行
     53                     if (SQL_QUEUE.size() >= 10) {
     54                         executeQueueSql();
     55                     }
     56                 }
     57             } catch (InterruptedException e) {
     58                 e.printStackTrace();
     59             } catch (InvalidProtocolBufferException e) {
     60                 e.printStackTrace();
     61             }
     62         } finally {
     63             connector.disconnect();
     64         }
     65 
     66 
     67     }
     68 
     69 
     70 
     71 
     72     /**
     73      * 模拟执行队列里面的sql语句
     74      */
     75     public static void executeQueueSql() {
     76         int size = SQL_QUEUE.size();
     77         for (int i = 0; i < size; i++) {
     78             String sql = SQL_QUEUE.poll();
     79             System.out.println("[sql]----> " + sql);
     80         }
     81     }
     82 
     83     /**
     84      * 数据处理
     85      *
     86      * @param entrys
     87      */
     88     private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
     89         for (CanalEntry.Entry entry : entrys) {
     90             if (EntryType.ROWDATA == entry.getEntryType()) {
     91                 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
     92                 CanalEntry.EventType eventType = rowChange.getEventType();
     93                 if (eventType == EventType.DELETE) {
     94                     saveDeleteSql(entry);
     95                 } else if (eventType == EventType.UPDATE) {
     96                     saveUpdateSql(entry);
     97                 } else if (eventType == CanalEntry.EventType.INSERT) {
     98                     saveInsertSql(entry);
     99                 }
    100             }
    101         }
    102     }
    103 
    104     /**
    105      * 保存更新语句
    106      *
    107      * @param entry
    108      */
    109     private static void saveUpdateSql(CanalEntry.Entry entry) {
    110         try {
    111             RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    112             List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
    113             for (CanalEntry.RowData rowData : rowDatasList) {
    114                 List<Column> newColumnList = rowData.getAfterColumnsList();
    115                 StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
    116                 for (int i = 0; i < newColumnList.size(); i++) {
    117                     sql.append(" " + newColumnList.get(i).getName()
    118                             + " = '" + newColumnList.get(i).getValue() + "'");
    119                     if (i != newColumnList.size() - 1) {
    120                         sql.append(",");
    121                     }
    122                 }
    123                 sql.append(" where ");
    124                 List<Column> oldColumnList = rowData.getBeforeColumnsList();
    125                 for (Column column : oldColumnList) {
    126                     if (column.getIsKey()) {
    127                         //暂时只支持单一主键
    128                         sql.append(column.getName() + "=" + column.getValue());
    129                         break;
    130                     }
    131                 }
    132                 SQL_QUEUE.add(sql.toString());
    133             }
    134         } catch (InvalidProtocolBufferException e) {
    135             e.printStackTrace();
    136         }
    137     }
    138 
    139     /**
    140      * 保存删除语句
    141      *
    142      * @param entry
    143      */
    144     private static void saveDeleteSql(CanalEntry.Entry entry) {
    145         try {
    146             RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    147             List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
    148             for (CanalEntry.RowData rowData : rowDatasList) {
    149                 List<Column> columnList = rowData.getBeforeColumnsList();
    150                 StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
    151                 for (Column column : columnList) {
    152                     if (column.getIsKey()) {
    153                         //暂时只支持单一主键
    154                         sql.append(column.getName() + "=" + column.getValue());
    155                         break;
    156                     }
    157                 }
    158                 SQL_QUEUE.add(sql.toString());
    159             }
    160         } catch (InvalidProtocolBufferException e) {
    161             e.printStackTrace();
    162         }
    163     }
    164 
    165     /**
    166      * 保存插入语句
    167      *
    168      * @param entry
    169      */
    170     private static void saveInsertSql(CanalEntry.Entry entry) {
    171         try {
    172             RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    173             List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
    174             for (CanalEntry.RowData rowData : rowDatasList) {
    175                 List<Column> columnList = rowData.getAfterColumnsList();
    176                 StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
    177                 for (int i = 0; i < columnList.size(); i++) {
    178                     sql.append(columnList.get(i).getName());
    179                     if (i != columnList.size() - 1) {
    180                         sql.append(",");
    181                     }
    182                 }
    183                 sql.append(") VALUES (");
    184                 for (int i = 0; i < columnList.size(); i++) {
    185                     sql.append("'" + columnList.get(i).getValue() + "'");
    186                     if (i != columnList.size() - 1) {
    187                         sql.append(",");
    188                     }
    189                 }
    190                 sql.append(")");
    191                 SQL_QUEUE.add(sql.toString());
    192             }
    193         } catch (InvalidProtocolBufferException e) {
    194             e.printStackTrace();
    195         }
    196     }
    197 }

     3.如果数据库值发生改变之后会触发增删改,咱们可以拿到这个数据插入到其他数据库中。

    注意:

    1.如果有任何不懂的地方可以咨询我,随时欢迎互相帮助。

    2.以上完整代码加群(群文件):422167709。

    3.如果希望学习更多,感谢您关注公众号 "编程小乐",回复canal领取完整代码。

  • 相关阅读:
    无法嵌入互操作类型“ADOX.CatalogClass”。请改用适用的接口。
    编码:隐匿在计算机软硬件背后的语言(3)--二进制加法器
    编码:隐匿在计算机软硬件背后的语言(2)--二进制
    C#中Mutex的用法
    C#中创建二维数组,使用[][]和[,]的区别
    git同时存在两个账号(在同一台电脑上)——三步完成(已修正)
    C++之标准库vector
    C++之标准库map
    sublime和vscode 格式化Json ——两步走
    二十八、linux下权限管理chmod
  • 原文地址:https://www.cnblogs.com/fuzongle/p/12741052.html
Copyright © 2011-2022 走看看