1、下载安装包
canal&github的地址,最权威的学习canal相关知识的地方
https://github.com/alibaba/canal
在下面的wiki列表中找到AdminGuide连接,进入
https://github.com/alibaba/canal/wiki/AdminGuide
环境要求:
① windows/linux均可
② jdk1.6.25以上
③ mysql 5.7.13/5.6.10及以下的版本
④ 开启mysql的binlog写入功能,binlog模式为row,server_id不能和canal的slaveId重复
安装包下载地址
https://github.com/alibaba/canal/releases
选择deployer包下载
2、配置解析
canal配置分为两部分:
① canal.properties (系统根配置文件)
② instance.properties (instance级别的配置文件,每个instance一份),每个数据库对应1个instance,如果需要同步多个数据库,只需要在conf下创建多个destination目录,创建多个instance.properties,并在每个instance.properties中配置需要的数据库即可。
tar -zxvf canal.deployer-1.0.25.tar.gz
解压后的目录为
启动脚本在bin目录下,但此时启动还为时过早,我们并没有设置同步哪个mysql的哪个库的哪些表,所以先关注一下canal的配置文件
canal.properties是服务端的配置入口,下面为该配置文件需要重点了解的配置项
参数名字 | 参数说明 | 默认值 |
canal.destinations | 当前server上部署的instance列表,默认不配死,通过动态扫描获取 | 无 |
canal.conf.dir | 配置文件的目录 | ../conf |
canal.auto.scan | 开启instance自动扫描,扫描canal.conf.dir目录下的instance | true |
canal.port | 服务端端口,客户端连接的时候要用到 | 11111 |
canal.zkServers | 配置canal集群的时候用到该配置,用来连接zookeeper集群 | 无 |
canal.file.data.dir | canal持久化数据到file上的目录 | ../conf (默认和instance.properties为同一目录,方便运维和备份) |
canal.instance.global.spring.xml | 全局的spring配置方式的组件文件,选择了基于file持久化模式,用于非HA场景下 | classpath:spring/file-instance.xml |
canal.properties都是针对canal本身的一些配置,还没解决我们上面的问题。example是系统自带的例子,mysql相关的配置我们可以从此处获取。
进入example,查看instance.properties,其中的关键配置如下
参数名字 | 参数说明 | 默认值 |
canal.instance.mysql.slaveId | mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一,前面有介绍,不能与mysql的server_id重复 | 1234 |
canal.instance.master.address | mysql主库链接地址 | 127.0.0.1:3306 |
canal.instance.master.journal.name | mysql主库链接时起始的binlog文件 | 无 |
canal.instance.master.position | mysql主库链接时起始的binlog偏移量,需要与上一个配置组合使用 | 无 |
canal.instance.master.timestamp | mysql主库链接时起始的binlog的时间戳 | 无 |
canal.instance.dbUsername | mysql数据库帐号 | canal |
canal.instance.dbPassword | mysql数据库密码 | canal |
canal.instance.defaultDatabaseName | mysql链接时默认schema,数据库名,每个instance对应一个数据库 | 无 |
canal.instance.filter.regex | mysql 数据解析关注的表,Perl正则表达式,默认为数据库下所有表 | .*\..* |
关于配置的几点说明,后面也会陆续涉及到:
1、mysql链接时的起始位置
① canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
② canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
③ 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
2、mysql解析关注表是Perl正则表达式,所有表的两种表达方式:.* or .*\..*
3、在新版本canal的instance.properties中有standby相关的配置,对应于master,主要用于解决mysql多节点解析自动切换,这种针对高可用mysql集群的同步在后面会讲到
如此添加好mysql的配置后,就可以启动了。
3、配置示例
1 ################################################# 2 ######### common argument ############# 3 ################################################# 4 canal.id= 1 5 canal.ip= 6 canal.port= 11111 7 canal.zkServers= 8 # flush data to zk 9 canal.zookeeper.flush.period = 1000 10 # flush meta cursor/parse position to file 11 canal.file.data.dir = ${canal.conf.dir} 12 canal.file.flush.period = 1000 13 ## memory store RingBuffer size, should be Math.pow(2,n) 14 canal.instance.memory.buffer.size = 16384 15 ## memory store RingBuffer used memory unit size , default 1kb 16 canal.instance.memory.buffer.memunit = 1024 17 ## meory store gets mode used MEMSIZE or ITEMSIZE 18 canal.instance.memory.batch.mode = MEMSIZE 19 20 ## detecing config 21 canal.instance.detecting.enable = false 22 #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() 23 canal.instance.detecting.sql = select 1 24 canal.instance.detecting.interval.time = 3 25 canal.instance.detecting.retry.threshold = 3 26 canal.instance.detecting.heartbeatHaEnable = false 27 28 # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery 29 canal.instance.transaction.size = 1024 30 # mysql fallback connected to new master should fallback times 31 canal.instance.fallbackIntervalInSeconds = 60 32 33 # network config 34 canal.instance.network.receiveBufferSize = 16384 35 canal.instance.network.sendBufferSize = 16384 36 canal.instance.network.soTimeout = 30 37 38 # binlog filter config 39 canal.instance.filter.query.dcl = false 40 canal.instance.filter.query.dml = false 41 canal.instance.filter.query.ddl = false 42 canal.instance.filter.table.error = false 43 canal.instance.filter.rows = false 44 45 # binlog format/image check 46 canal.instance.binlog.format = ROW,STATEMENT,MIXED 47 canal.instance.binlog.image = FULL,MINIMAL,NOBLOB 48 49 # binlog ddl isolation 50 canal.instance.get.ddl.isolation = false 51 52 ################################################# 53 ######### destinations ############# 54 ################################################# 55 canal.destinations= example 56 # conf root dir 57 canal.conf.dir = ../conf 58 # auto scan instance dir add/remove and start/stop instance 59 canal.auto.scan = true 60 canal.auto.scan.interval = 5 61 62 canal.instance.global.mode = spring 63 canal.instance.global.lazy = false 64 #canal.instance.global.manager.address = 127.0.0.1:1099 65 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml 66 canal.instance.global.spring.xml = classpath:spring/file-instance.xml 67 #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
对于想同步的数据库可以配在第55行,写死的方式,多个数据库用逗号隔开(这种方式不够灵活);也可以在59行配置自动扫描conf目录下的instance,我这里采用了第二种
如上图,可以看出我的conf目录下有5个文件夹(spring不是),表示我同步了5个数据库,而destination就是这些文件夹的名称。为什么要说这句话呢,因为客户端连接服务端的时候需要用到destination属性,而网上对于destination的介绍比较少,有些开发人员对这个概念比较模糊,其实很简单,就是代表同步的目的地(destination),就是第55行canal.destinations的配置内容。而canal要求根据canal.destinations配置项的值,在conf目录下建立同名的文件夹。
每个文件夹下都有一个instance.properties
1 ################################################# 2 ## mysql serverId 3 canal.instance.mysql.slaveId = 1024 4 5 # position info 6 canal.instance.master.address = ip:3306 7 canal.instance.master.journal.name = 8 canal.instance.master.position = 9 canal.instance.master.timestamp = 10 11 #canal.instance.standby.address = 12 #canal.instance.standby.journal.name = 13 #canal.instance.standby.position = 14 #canal.instance.standby.timestamp = 15 16 # username/password 17 canal.instance.dbUsername = canal 18 canal.instance.dbPassword = canal123 19 canal.instance.defaultDatabaseName = dbname 20 canal.instance.connectionCharset = UTF-8 21 22 # table regex 23 canal.instance.filter.regex = .*\..* 24 # table black regex 25 canal.instance.filter.black.regex = 26 27 #################################################
对于同一台mysqk服务器上的不同数据库,它们的区别只在于第19行。
按照如上配置即可启动了,只需在bin目录下执行startup.sh即可。启动情况可以到log下查看
到此,canal服务端的简单使用基本完成。但是,对于一个喜欢专研的程序猿来说,怎么能浅尝辄止呢,
4、canal如何维护一份增量订阅&消费的关系信息
① 解析点位,parse模块会记录,上一次解析binlog到了什么位置,对应组件为:CanalLogPositionManager
② 消费点位,canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,对应的组件为:CanalMetaManager
对应的两个位点组件,目前都有几种实现:
① memory (memory-instance.xml中使用)
② zookeeper
③ mixed
④ file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上)
⑤ period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)
我们是有的是第四种,file模式。
4.1 memory-instance.xml介绍:
所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
特点:速度最快,依赖最少(不需要zookeeper)
场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境
4.2 file-instance.xml介绍:
所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.
特点:支持单机持久化
场景:生产环境,无HA需求,简单可用.
解析点位和消费点位这些元数据会定时从内容刷新到本地文件,文件为meta.dat,与instance.properties在同一目录,该目录是在canal.properties中配置的。数据结构如下:
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"gov","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"10.10.49.11","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000031","position":49097461,"serverId":1,"timestamp":1514357874000}}}],"destination":"gov"}
事实证明,单实例的模式已经足够好用,即使服务端异常停止,我们仍然能从指定位置或指定时间重新解析binlog,不会造成数据丢失。无需一味追求集群、HA、高可用,简单适合自己才是王道。
4.3 default-instance.xml介绍:
所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
特点:支持HA
场景:生产环境,集群化部署.
4.4 group-instance.xml介绍:
主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。
场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
后面两种方式都是针对于比较复杂的应用场景,本人尚未测试验证过。