zoukankan      html  css  js  c++  java
  • 【Canal】利用canal实现mysql实时增量备份并对接kafka

    简介

    canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

    kafka: https://github.com/apache/kafka

    RocketMQ : https://github.com/apache/rocketmq

    本文中默认已经安装了kafka环境,仅做对接的演示;(若没有安装则需要提前安装kafka)

    演示环境如下:

    bigdata111 bigdata112 bigdata113
    centos7.0 centos7.0 centos7.0
    jdk1.8 jdk1.8 jdk1.8
    zookeeper3.4 zookeeper3.4 zookeeper3.4
    mysql5.7
    canal-server canal-server canal-server
    canal-admin
    kafka2.11 kafka2.11 kafka2.11

    1.修改canal.properties文件(三台机器)

    [root@bigdata111 canal-server]# cd conf/
    [root@bigdata111 conf]# ll
    总用量 20
    -rwxr-xr-x. 1 root root  291 3月  16 04:43 canal_local.properties
    -rwxr-xr-x. 1 root root 5182 3月  16 04:54 canal.properties
    drwxrwxrwx. 2 root root   47 3月  16 05:02 example
    -rwxr-xr-x. 1 root root 3119 3月  16 04:43 logback.xml
    drwxrwxrwx. 2 root root   38 3月  16 04:43 metrics
    drwxrwxrwx. 3 root root 4096 3月  16 04:43 spring
    [root@bigdata111 conf]# vi canal.properties 
    
    # 修改服务模式为kafka
    canal.serverMode = kafka
    # 修改kafka的对应服务器
    canal.mq.servers = bigdata111:9092,bigdata112:9092,bigdata113:9092
    

    2.修改instance.properties文件(三台机器)

    [root@bigdata111 conf]# cd example/
    [root@bigdata111 example]# ll
    总用量 196
    -rw-r--r--. 1 root root 196608 3月  16 04:43 h2.mv.db
    -rwxr-xr-x. 1 root root   2037 3月  16 05:02 instance.properties
    [root@bigdata111 example]# vi instance.properties
    
    # 将数据发送到指定的topic
    canal.mq.topic=test  
    

    3.启动zookeeper和Canal(三台机器)

    启动zookeeper、查看zk的状态

    [root@bigdata111 canal-server]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@bigdata111 canal-server]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Mode: follower
    
    [root@bigdata112 canal-server]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@bigdata112 canal-server]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Mode: follower
    
    [root@bigdata113 canal-server]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@bigdata113 canal-server]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    Mode: leader
    
    

    启动canal

    [root@bigdata111 canal-server]# bin/startup.sh
    [root@bigdata112 canal-server]# bin/startup.sh
    [root@bigdata113 canal-server]# bin/startup.sh
    

    4.启动kafka服务(三台机器)

    [root@bigdata111 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
    [root@bigdata112 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
    [root@bigdata113 kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
    

    5.查看所有进程(三台机器)

    [root@bigdata111 canal-server]# jps
    5360 Kafka
    4963 QuorumPeerMain
    5699 Jps
    5044 CanalLauncher
    

    5.启动kafka消费者(bigdata113)

    在bigdata113上启动消费者

    kafka版本大于0.9以上,使用以下命令启动消费者:

    [root@bigdata113 canal-server]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test
    

    kafka版本小于0.9版本,使用以下命令启动消费者:

    [root@bigdata113 canal-server]# kafka-console-consumer.sh --zookeeper bigdata113:2181 --from-beginning --topic test
    

    如果在启动过程中报错:

    zookeeper is not a recognized option

    则是因为版本不同造成的,对应修改如上方式即可;

    6.mysql操作数据

    比如数据库中有一个stu表,我们向里面增加,更新,删除一个数据,看kafka消费者中的数据变化;

    mysql> select * from stu limit 10;
    +----+--------+------+---------------------+
    | id | name   | sex  | stime               |
    +----+--------+------+---------------------+
    |  1 | 张三   | 男   | 2019-09-23 17:25:07 |
    |  2 | 李四   | 女   | 2019-09-23 17:25:13 |
    |  3 | 李楠   | 男   | 2019-09-23 17:25:21 |
    |  4 | 张畅   | 女   | 2019-09-23 17:25:30 |
    |  5 | 李想   | 男   | 2019-09-23 17:25:38 |
    |  6 | 赵街   | 男   | 2019-09-23 17:25:50 |
    |  7 | 林安   | 男   | 2019-09-23 17:26:00 |
    |  8 | 秦王   | 男   | 2019-09-23 17:45:47 |
    |  9 | 纣王   | 男   | 2019-09-23 17:45:47 |
    | 10 | 张楠   | 男   | 2019-09-23 17:45:47 |
    +----+--------+------+---------------------+
    10 rows in set (0.00 sec)
    
    mysql> insert into stu(id,name,sex)values(99332,'test111','男');
    Query OK, 1 row affected (0.03 sec)
    
    mysql> update stu set name='test222' where id = 99332;
    Query OK, 1 row affected (0.07 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    
    mysql> delete from stu where id =99332;
    Query OK, 1 row affected (0.03 sec)
    

    kafka消费者变化:

    [root@bigdata113 kafka-2.11]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test
    {"data":[{"id":"9999","name":"张三22222","sex":"男","stime":"2020-03-16 06:24:53"}],"database":"student","es":1584311093000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584313187762,"type":"INSERT"}
    [2020-03-16 07:19:38,216] INFO [GroupMetadataManager brokerId=113] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    {"data":[{"id":"99332","name":"test111","sex":"男","stime":"2020-03-16 07:22:02"}],"database":"student","es":1584314522000,"id":3,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314522967,"type":"INSERT"}
    {"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314635000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":[{"name":"test111","stime":"2020-03-16 07:22:02"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314635674,"type":"UPDATE"}
    {"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314658000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314658748,"type":"DELETE"}
    
    
  • 相关阅读:
    【repost】JavaScript 运行机制详解:再谈Event Loop
    【repost】学JS必看-JavaScript数据结构深度剖析
    【repost】JavaScript 基本语法
    【repost】前端学习总结(二十三)——前端框架天下三分:Angular React 和 Vue的比较
    【repost】jQuery笔记总结
    【repost】javascript:;与javascript:void(0)使用介绍
    jQuery对象与DOM对象之间的转换方法
    EBS_DBA_问题:主键insert引起的死锁
    BI_开发_问题:ORA-26002: Table DWH.W_XACT_TYPE_D has index defined upon it.
    BI_开发_问题:到target库中的字符为?
  • 原文地址:https://www.cnblogs.com/ShadowFiend/p/12522976.html
Copyright © 2011-2022 走看看