一、前言
Binlog是MySQL数据库的二进制日志,用于记录用户对数据库操作的SQL语句(除了数据查询语句)信息。而Binlog格式也有三种,分别为STATEMENT、ROW、MIXED。STATMENT模式基于SQL语句的复制,每一条会修改数据的SQL语句会记录。ROW模式除了记录SQL语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,会占用较多的空间。MIXED比较灵活的记录,当遇到表结构变更的时候,就会记录为STATMENT模式,当遇到了数据更新或者删除情况下就会变为ROW模式。Binlog三个用途分别为数据恢复、复制、审计。
Canal是阿里MySQL数据库Binlog的增量订阅&消费组件 ,基于数据库Binlog可以监控数据库数据的变化,进而用于数据同步等业务。分为Canal Server与Canal Client,前者读取Binlog解析后存储,后者连接前者消费。
二、安装搭建
1、下载安装包。并上传至服务器中。下载地址为:https://github.com/alibaba/canal/releases

2、将home文件夹中的压缩包解压至安装路径(如下图所示)。

1 tar -xzf /home/canal.deployer-1.1.3.tar.gz -C /usr/java/canal
3、进入canal文件夹,修改配置文件(如下图所示)。
1 vi conf/example/instance.properties

1 canal.instance.dbUsername=root #数据库账号 2 canal.instance.dbPassword=1234 #数据库密码 3 canal.instance.defaultDatabaseName = corporate_genealogy #数据库 4 canal.instance.connectionCharset = UTF-8 #数据库编码
4、配置MySQL数据库,开启Binlog,并选择模式为ROW(如下图所示)。
1 vi /etc/my.cnf

1 #canal 2 log-bin=mysql-bin 3 binlog-format=ROW 4 server_id=1
5、数据库创建canal用户,赋予权限,并刷新(如下图所示)。
ps:这里遇到一个异常信息,是因为数据库密码过于简单,不符合密码策略,需要修改一下策略。。。


1 mysql -uroot -p1234
1 SHOW VARIABLES LIKE 'validate_password%';
1 set global validate_password_policy=LOW;
1 set global validate_password_length=4;
1 create user canal identified by 'canal';
1 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
1 FLUSH PRIVILEGES;
6、退出并重启MySQL。
1 exit;
1 sudo service mysqld restart;
7、进入canal的bin文件夹,启动canal-server。
1 ./startup.sh
8、查看logs文件中日志是否启动成功(如下图所示)。


三、客户端代码检测
ps:需要注意的是服务器防火墙需打开对应端口号,这里是11111。
1、添加Maven依赖
1 <!-- Canal --> 2 <dependency> 3 <groupId>com.alibaba.otter</groupId> 4 <artifactId>canal.client</artifactId> 5 <version>1.1.3</version> 6 </dependency>
2、测试类代码
1 import java.net.InetSocketAddress;
2 import java.util.List;
3
4 import com.alibaba.otter.canal.client.CanalConnector;
5 import com.alibaba.otter.canal.client.CanalConnectors;
6 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
7 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
8 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
9 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
10 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
11 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
12 import com.alibaba.otter.canal.protocol.Message;
13
14 public class TestCanal {
15
16 public static void main(String args[]) {
17 // 创建链接
18 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服务器IP", 11111),
19 "example", "", "");
20 int batchSize = 1000;
21 int emptyCount = 0;
22 try {
23 connector.connect();
24 connector.subscribe(".*\..*");
25 connector.rollback();
26 int totalEmtryCount = 1200;
27 while (emptyCount < totalEmtryCount) {
28 // 获取指定数量的数据
29 Message message = connector.getWithoutAck(batchSize);
30 long batchId = message.getId();
31 int size = message.getEntries().size();
32 if (batchId == -1 || size == 0) {
33 emptyCount++;
34 try {
35 Thread.sleep(1000);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39 } else {
40 emptyCount = 0;
41 printEntry(message.getEntries());
42 }
43 // 提交确认
44 connector.ack(batchId);
45 }
46 System.out.println("empty too many times, exit");
47 } finally {
48 connector.disconnect();
49 }
50 }
51
52 private static void printEntry(List<Entry> entrys) {
53 for (Entry entry : entrys) {
54 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
55 || entry.getEntryType() == EntryType.TRANSACTIONEND) {
56 continue;
57 }
58
59 RowChange rowChage;
60 try {
61 rowChage = RowChange.parseFrom(entry.getStoreValue());
62 } catch (Exception e) {
63 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
64 }
65
66 EventType eventType = rowChage.getEventType();
67 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
68 entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
69 entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
70
71 for (RowData rowData : rowChage.getRowDatasList()) {
72 if (eventType == EventType.DELETE) {
73 printColumn(rowData.getBeforeColumnsList());
74 } else if (eventType == EventType.INSERT) {
75 printColumn(rowData.getAfterColumnsList());
76 } else {
77 System.out.println("-------> before");
78 printColumn(rowData.getBeforeColumnsList());
79 System.out.println("-------> after");
80 printColumn(rowData.getAfterColumnsList());
81 }
82 }
83 }
84 }
85
86 private static void printColumn(List<Column> columns) {
87 for (Column column : columns) {
88 System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
89 }
90 }
91
92 }
3、Navicat 连接对应数据库进行一些添加删除更新操作,控制台输出如下图所示。



四、总结展望
考虑到Canal的堆积能力并不强。堆积数据到10W+时,速度会变慢,并会出现假死现象。因此介入消息中间件MQ非常有必要,解决堆积能力问题,可以延后消费,能够方便的得到积压数据,进行监控报警。
本文部分学习参考了:https://www.cnblogs.com/java-spring/p/8930740.html
至此是关于介绍在Linux系统中阿里Canal中间件的初步搭建和使用,后续会介绍配合消息中间件等方式处理数据同步及其它业务逻辑。
如有疏漏错误之处,还请不吝赐教!
