zoukankan      html  css  js  c++  java
  • MySQL-Canal-Kafka数据复制详解

    摘要

    MySQL被广泛用于海量业务的存储数据库,在大数据时代,我们亟需对其中的海量数据进行分析,但在MySQL之上进行大数据分析显然是不现实的,这会影响业务系统的运行稳定。如果我们要实时地分析这些数据,则需要实时地将其复制到适合OLAP的数据系统上。本文介绍一种CDC工具——Canal,由阿里巴巴开源,且广泛用于阿里的生产系统,它模拟MySQL Slave结点,实时获取变化的binlog,我们将把canal获取到的binlog投递到kafka上以供后续系统消费。

    本文基于Ubuntu 16.04 LTS

    环境说明

    • Java 8+
    • 搭建好ZooKeeper集群
    • 搭建好Kafka集群

    若未搭建ZooKeeper集群、Kafka集群,可参考:

    Linux下搭建ZooKeeper集群

    Linux下搭建kafka集群

    一、源MySQL配置

    1、开启 Binlog 写入功能

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    $ vim /etc/my.cnf
    
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不能和 canal 的 slaveId 重复
    
    #重启MySQL数据库
    $ service mysql restart
    

    2、创建并授权canal用户

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    > CREATE USER canal IDENTIFIED BY 'canal';  
    > GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -->  GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    > FLUSH PRIVILEGES;
    

    二、安装ZooKeeper

    详细请参考:Linux下搭建ZooKeeper集群

    1、在所有节点上启动zkServer

    $ zkServer.sh start&
    

    2、查看节点状态

    $ zkServer.sh status
    

    三、安装KafKa

    详细请参考:Linux下搭建kafka集群

    1、在所有节点上启动kafka

    #从后台启动Kafka集群(3台都需要启动)
    $ cd /usr/local/kafka_2.13-2.7.0/bin #进入到kafka的bin目录 
    $ ./kafka-server-start.sh -daemon ../config/server.properties
    
    #查看kafka是否启动
    $ jps
    

    2、创建与查看Topic

    $ cd /usr/local/kafka_2.13-2.7.0/bin #进入到kafka的bin目录 
    
    #创建Topic
    $ ./kafka-topics.sh --create --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --replication-factor 2 --partitions 1 --topic hello_canal
    #解释
    # --create  表示创建
    # --zookeeper 192.168.1.113:2181  后面的参数是zk的集群节点
    # --replication-factor 2  表示复本数
    # --partitions 1  表示分区数
    # --topic hello_canal  表示主题名称为hello_canal
    
    #查看topic 列表:
    $ ./kafka-topics.sh --list --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181
    
    #查看指定topic:
    $ ./kafka-topics.sh --describe --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --topic hello_canal
    Topic: hello_canal	PartitionCount: 1	ReplicationFactor: 2	Configs: 
    	Topic: hello_canal	Partition: 0	Leader: 0	Replicas: 0,2	Isr: 0,2
    

    3、验证Kafka集群是否启动成功

    # 随便在一个zk节点上启动zkCli(zookeeper客户端)
    $ sh $ZOOKEEPER_HOME/bin/zkCli.sh
    
    $ [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
    [0, 1, 2]
    
    # 如果能看到三台kafka节点的broker.id,则说明三台kafka节点正常启动
    

    四、安装Canal.server

    (一)下载并解压

    1、下载

    到Canal官网,下载最新压缩包:canal.deployer-latest.tar.gz

    $ cd /data
    $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
    

    2、解压

    $ mkdir /usr/local/canal
    
    $ tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
    

    (二)修改配置文件

    1、修改instance配置文件

    $ vim /usr/local/canal/conf/hello_canal/instance.properties
    
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name = 
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName = 
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .*\\..*
    # mq config
    canal.mq.topic=hello_canal
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
    canal.mq.partition=0
    

    canal.instance.connectionCharset代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1

    如果系统是1个 cpu,需要将canal.instance.parser.parallel设置为false

    2、修改canal配置文件

    $ vim /usr/local/canal/conf/canal.properties
    
    # ...
    # 可选项: tcp(默认), kafka, RocketMQ
    canal.serverMode = kafka
    # ...
    # kafka/rocketmq 集群配置,如果你的mq已经做了集群配置,则需要把所有节点的ip:port都写全在下方
    canal.mq.servers = 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092
    canal.mq.retries = 0
    # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    # flatMessage模式下请将该值改大, 建议50-200
    canal.mq.lingerMs = 1
    canal.mq.bufferMemory = 33554432
    # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
    canal.mq.canalBatchSize = 50
    # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
    canal.mq.canalGetTimeout = 100
    # 是否为flat json格式对象
    canal.mq.flatMessage = false
    canal.mq.compressionType = none
    canal.mq.acks = all
    # kafka消息投递是否使用事务
    canal.mq.transaction = false
    

    (三)启动canal

    1、启动

    $ cd /usr/local/canal/
    $ sh bin/startup.sh
    

    2、查看server日志

    $ vim /usr/local/canal/logs/canal/canal.log
    
    2021-02-22 15:45:24.422 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2021-02-22 15:45:24.559 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2021-02-22 15:45:24.624 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    2021-02-22 15:45:24.834 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
    2021-02-22 15:45:30.351 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
    

    3、查看instance的日志

    $ vim /usr/local/canal/logs/hello_canal/hello_canal.log
    
    2021-02-22 16:54:24.284 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2021-02-22 16:54:24.308 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [hello_canal/instance.properties]
    2021-02-22 16:54:25.143 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2021-02-22 16:54:25.144 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [hello_canal/instance.properties]
    2021-02-22 16:54:26.586 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-hello_canal
    2021-02-22 16:54:26.642 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*..*$
    2021-02-22 16:54:26.642 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql.slave_.*$
    2021-02-22 16:54:27.057 [destination = hello_canal , address = ubuntu-master.com/192.168.1.113:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    2021-02-22 16:54:27.176 [destination = hello_canal , address = ubuntu-master.com/192.168.1.113:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
    2021-02-22 16:54:27.179 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
    

    4、关闭

    $ cd /usr/local/canal/
    $ sh bin/stop.sh
    

    五、查看Canal数据同步情况

    (一)通过Kafka消费者查看

    1、启动Kafka消费者

    在另一台服务器上创建一个消费者:

    $ cd /usr/local/kafka_2.13-2.7.0/bin
    $ ./kafka-console-consumer.sh --bootstrap-server 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092 --topic hello_canal --from-beginning
    

    注:Kafka 从 2.2 版本开始将kafka-topic.sh脚本中的 −−zookeeper参数标注为 “过时”,推荐使用 −−bootstrap-server参数。

    端口也由之前的zookeeper通信端口2181,改为了kafka通信端口9092

    2、在源mysql数据库上修改数据

    mysql> use test;
    mysql> insert into fk values(13,'hello_canal',19);
    

    3、消费者窗口输出内容

    {"data":[{"id":"13","name":"hello_canal","age":"19"}],"database":"test","es":1614252283000,"id":2,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"fk","ts":1614252283248,"type":"INSERT"}
    

    说明canal已经成功捕获到源MySQL的变化数据binlog并投递到kafka集群的hello_canal主题中。

    六、遇到的问题

    (一)canal同步mysql binlog到kafka,启动后instance日志报TimeoutException: Failed to update metadata after 60000 ms.

    1、详细报错信息:

    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    

    2、报错可能原因及方案:

    报错原因1:kafka的配置文件config/server.propertieslisteners=PLAINTEXT://your.host.name:9092以及advertise.listeners=PLAINTEXT://your.host.name:9092没有配置,导致canal无法与kafka进行socket通信。

    解决方案:补充上述两项配置,重启kafka即可。

    报错原因2:canal的配置文件conf/canal.propertieskafka.bootstrap.servers = x.x.x.x:9092没有把所有的kafka节点配上。报错是因为只配了一台kafka节点,而我是以集群模式启动了三个kafka节点。

    解决方案:修改conf/canal.properties中为kafka.bootstrap.servers = x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092,重启canal即可。

    (二)canal无法stop

    1、详细报错信息:

    bin/stop.sh: 52: kill: No such process
    
    bin/stop.sh: 58: [: unexpected operator
    bin/stop.sh: 63: bin/stop.sh let: not found
    

    2、报错原因及方案:

    报错:let: not found

    因为在ubuntu默认是指向bin/dash解释器的,dash是阉割版的bash,其功能远没有bash强大和丰富。并且dash不支持leti++等功能.

    解决办法:sudo dpkg-reconfigure dash,选择"No", 表示用bash代替dash

    参考

    [1] Canal QuickStart[https://github.com/alibaba/canal/wiki/QuickStart]

    [2] Canal Kafka RocketMQ QuickStart[https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart]

    [3] 利用Canal投递MySQL Binlog到Kafka[https://www.jianshu.com/p/93d9018e2fa1]

    [4] canal实时同步mysql表数据到Kafka[https://www.cnblogs.com/zpan2019/p/13323035.html]

    更多关于大数据、分布式、存储、区块链、Linux相关文章请关注我的微信公众号:asympTech渐进线实验室

    Top
    收藏
    关注
    评论
  • 相关阅读:
    Atitit 集团与个人的完整入口列表 attilax的完整入口 1. 集团与个人的完整入口列表 1 2. 流量入口概念 2 3. 流量入口的历史与发展 2 1.集团与个人的完整入口列表
    atitit 每季度日程表 每季度流程 v3 qaf.docx Ver history V2 add diary cyar data 3 cate V3 fix detail 3cate ,
    Atitit react 详细使用总结 绑定列表显示 attilax总结 1. 前言 1 1.1. 资料数量在百度内的数量对比 1 1.2. 版本16 v15.6.1 1 1.3. 引入js 2
    Atitit r2017 r3 doc list on home ntpc.docx
    Atitit r2017 ra doc list on home ntpc.docx
    Atiitt attilax掌握的前后技术放在简历里面.docx
    Atitit q2016 qa doc list on home ntpc.docx
    Atitit r7 doc list on home ntpc.docx 驱动器 D 中的卷是 p2soft 卷的序列号是 9AD0D3C8 D:\ati\r2017 v3 r01\
    Atitit 可移植性之道attilax著
    Atitit q2016 q5 doc list on home ntpc.docx
  • 原文地址:https://www.cnblogs.com/JasonCeng/p/14456022.html
Copyright © 2011-2022 走看看