一、安装配置MySQL
1.1 安装MySQL 5.7
- 更新系统所有包。
- 添加 Yum 包。
- 安装 MySQL。
- 修改密码。
|
|
Fig.1.1 MySQL服务启动成功
1.2 安装中出现的问题
1) cannot verify dev.mysql.com's certificate.(解决所用时间:0.5h)
解决:命令中要加上 --no-check-certificate。
wget https://dev.mysql.com/get/mysql80-community-release-el7-1.noarch.rpm --no-check-certificate。
2)not an rpm package (or package manifest)(解决所用时间:2h)
解决:在windows下下载rpm包,然后使用SFTP传给服务器
二、配置JAVA环境
- JDK1.8下载,解压
- 移动
- 创建java.sh文件生效
- 验证
|
|
Fig.2.1 Java配置成功
三、Canal 搭建
3.1搭建mysql环境
- 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式。
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
Fig.3.1 授权成功
|
|
3.2 搭建Canal环境
- 下载canal,以1.1.4 版本为例
- 解压缩
- 配置修改
- 启动
|
|
Fig.3.2 搭建成功
3.3 maven下载
- 下载maven-3.6.3
- 设置环境变量
Fig.3.3 配置成功
四、实例运行
具体操作见:
https://github.com/alibaba/canal/wiki/ClientExample
- 建立实例maven工程
- 添加pom依赖
- 更新依赖 mvn install
- 运行代码
|
|
问题:import org.jetbrains.annotations.NotNull;失败(解决所用时间:0.5h)
解决:在pom添加依赖;
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>13.0</version>
</dependency>
问题:mvn install 出错
解决:https://www.jianshu.com/p/1ed0ec397575
五、配置成功
问题:java连接不上(解决所用时间:2h)
解决:关闭防火墙
六、数据库DDL、DML操作
package com.alibaba.otter; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.Header; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; public class CanalTest { public static void main(String[] args) throws InterruptedException { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.80.10", 11111), "example", "canal", "canal"); connector.connect();//建立连接 connector.subscribe(".*\..*");//开启订阅 while (true) { try { Message message = connector.getWithoutAck(50);// 每次读取 50 条 long batchID = message.getId(); int size = message.getEntries().size(); if (batchID == -1 || size == 0) { System.out.println("数据库没有更新"); Thread.sleep(1000); } else { System.out.println("-------------------------- 数据库已更新 -----------------------"); PrintEntry(message.getEntries()); } // position id ack (方便处理下一条) connector.ack(batchID);//确认提交 } finally { Thread.sleep(1000); } } } //获取每条打印的记录 public static void PrintEntry(List<Entry> entrys) { for (Entry entry : entrys) { Header header = entry.getHeader();//分解entry EntryType entryType = entry.getEntryType(); // 如果当前是RowData,那就需要打印的数据 if (entryType == EntryType.ROWDATA) { String tableName = header.getTableName(); String schemaName = header.getSchemaName(); RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } EventType eventType = rowChange.getEventType(); System.out.println(String.format("当前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType)); // 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么直接打出来 if (eventType == EventType.QUERY || rowChange.getIsDdl()) { System.out.println("rowchange sql ----->" + rowChange.getSql()); return; } // 具体数据查询 rowChange.getRowDatasList().forEach((rowData) -> { // 获取更新之前的column情况 List<Column> beforeColumns = rowData.getBeforeColumnsList(); // 获取更新之后的 column 情况 List<Column> afterColumns = rowData.getAfterColumnsList(); // 当前执行的是 删除操作 if (eventType == EventType.DELETE) { PrintColumn(beforeColumns); } // 当前执行的是 插入操作 if (eventType == EventType.INSERT) { PrintColumn(afterColumns); } // 当前执行的是 更新操作 if (eventType == EventType.UPDATE) { PrintColumn(afterColumns); } }); } } } // 具体数据更改 public static void PrintColumn(List<Column> columns) { columns.forEach((column) -> { String columnName = column.getName(); String columnValue = column.getValue(); int columnType = column.getSqlType(); // 判断 该字段是否更新 boolean isUpdated = column.getUpdated(); System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated)); }); } }
Fig. 插入操作