1.canal介绍及canal搭建、mysql数据库的配置:https://zhuanlan.zhihu.com/p/96628405 !感谢原作者提供
2.从https://github.com/chenqian56131/spring-boot-starter-canal 下载此项目打成jar包 并通过maven 命令上传到本地仓库或者远程仓库
3.在项目中导入依赖及配置配置文件
<dependencies> <dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies>
application.yml
server: port: 18083 spring: application: name: canal canal: client: instances: example: host: 192.168.211.132 port: 11111
4.创建启动类
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) @EnableCanalClient public class CanalApplication { public static void main(String[] args) { SpringApplication.run(CanalApplication.class); } }
5测试
@CanalEventListener public class CanalDataEventListener { /** * @InsertListenPoint增加数据监听 只有增加后的数据 *getAfterColumnsList():之前数据,用于增加、修改 *getBeforeColumnsList():之后数据 ,用于删除、修改 *@param eventType 当前操作的类型 * @param rowData 发生变更的一行数据 */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { System.out.println("列名称:"+column.getName()+"------------------增加的数据:"+column.getValue()); } } /** * @UpdateListenPoint 修改数据监听 有修改前后数据 *getAfterColumnsList():之前数据,用于增加、修改 *getBeforeColumnsList():之后数据 ,用于删除、修改 *@param eventType 当前操作的类型 * @param rowData 发生变更的一行数据 */ @UpdateListenPoint public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { System.out.println("列名称:"+column.getName()+"------------------修改前的数据:"+column.getValue()); } for (CanalEntry.Column column : afterColumnsList) { System.out.println("列名称:"+column.getName()+"------------------修改后的数据:"+column.getValue()); } } /** * @InsertListenPoint删除数据监听 只有删除前的数据 *getAfterColumnsList():之前数据,用于增加、修改 *getBeforeColumnsList():之后数据 ,用于删除、修改 *@param eventType 当前操作的类型 * @param rowData 发生变更的一行数据 */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> afterColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : afterColumnsList) { System.out.println("列名称:"+column.getName()+"------------------删除前的数据:"+column.getValue()); } } /** * @ListenPoint 自定义监听 *getAfterColumnsList():之前数据,用于增加、修改 *getBeforeColumnsList():之后数据 ,用于删除、修改 *@param eventType 当前操作的类型 * @param rowData 发生变更的一行数据 */ @ListenPoint( eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE}, //自定义监听类型 schema = {"rose"},//指定监听的数据库 table = {"tb_content"},//指定监控的数据库表 ,不指定则监控数据库中的所有表 destination = "example"//指定实例地址 ) public void onEventCustom(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { System.out.println("列名称:"+column.getName()+"------------------自定义操作前的数据:"+column.getValue()); } for (CanalEntry.Column column : afterColumnsList) { System.out.println("列名称:"+column.getName()+"------------------自定义操作后的数据:"+column.getValue()); } } }
在数据库中插入一条数据
在数据库中修改一条数据