zoukankan      html  css  js  c++  java
  • 基于Canal和Kafka实现MySQL的Binlog实时同步

    主机环境:
    CentOS 7.6 内存至少 1.5G,否则服务会启动不起来

    软件版本
    MySQL 5.7.28
    OpenJDK 8
    Zookeeper 3.5.6-bin
    Kafka 2.12(Scala)-2.3.0
    Canal deployer-1.1.4**

    一、MySQL安装
    采用从官方源直接安装的方式
    1.添加 MySQL 5.7 官方源
    rpm -ivh https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

    2.更新源
    yum -y update

    3.安装 MySQL 5.7
    yum -y install mysql-community-server

    4.添加 Canal 所需 MySQL 配置
    vim /etc/my.cnf

    [mysqld] #在此处下方新添配置
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    5.启动
    MySQL systemctl start mysqld

    查看状态:
    systemctl status mysqld

    6.查看生成的临时密码
    cat /var/log/mysqld.log | grep 'temporary password'
    初次配置 mysql_secure_installation 按照提示完成配置

    7.测试中允许 root 远程连接并添加 Canal 所需的用户, mysql -uroot -p 后执行 SQL
    use mysql;
    UPDATE user SET Host = '%' WHERE user = 'root';
    CREATE user canal IDENTIFIED BY 'Canal123!';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    EXIT

    二、Java 安装
    采用从官方源直接安装的方式
    1.安装源中的 OpenJDK 8
    yum install -y java-1.8.0-openjdk.x86_64

    验证安装:
    java -version

    三、Zookeeper 安装
    在 https://zookeeper.apache.org/releases.html 中查找 zookeeper 最新 bin 版本压缩包的 HTTP 方式的下载链接
    下载压缩包 :
    wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz

    1.解压缩 :
    mkdir -p tmp/zookeeper/ && tar zxvf apache-zookeeper-3.5.6-bin.tar.gz -C tmp/zookeeper/

    将解压缩后的软件移动至 /usr/local 下
    mkdir /usr/local/zookeeper && mv tmp/zookeeper/apache-zookeeper-3.5.6-bin/* /usr/local/zookeeper/

    2.添加 zookeeper 环境变量
    vim /etc/profile 末尾添加

    #zookeeper env
    export ZOOKEEPER_HOME=/usr/local/zookeeper/
    export PATH=$PATH:$ZOOKEEPER_HOME/bin

    使环境变量生效 :
    source /etc/profile

    3.创建配置文件,从默认的配置文件创建,因为 zookeeper 启动时会去找 conf/zoo.cfg 作为配置文件
    cd /usr/local/zookeeper/
    cp conf/zoo_sample.cfg conf/zoo.cfg
    mkdir data

    4.编辑配置 vim conf/zoo.cfg,此处配置的是单机,如果需要配置集群也是在这里
    #dataDir=/tmp/zookeeper #这是默认的配置
    dataDir=/usr/local/zookeeper/data # 这是 zookeeper 的数据目录
    admin.serverPort=2191
    启动 : zookeeper zkServer.sh start
    查看状态 :zkServer.sh status

    四、Kafka 安装
    1.在 https://kafka.apache.org/downloads 中查找 kafka 最新版本的 HTTP 方式下载链接
    下载压缩包 :
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

    2.解压缩:
    mkdir tmp/kafka && tar zxvf kafka_2.12-2.3.0.tgz -C tmp/kafka

    将解压缩后的软件移动至 /usr/local 下:
    mkdir /usr/local/kafka && mv tmp/kafka/kafka_2.12-2.3.0/* /usr/local/kafka && cd /usr/local/kafka

    3.添加 kafka 环境变量vim /etc/profile末尾添加
    #Set Kafka env
    export KAFKA_HOME=/usr/local/kafka/
    export PATH=$PATH:$KAFKA_HOME/bin

    使环境变量生效:
    source /etc/profile

    4.修改配置文件 vim config/server.properties,配置集群的话还需要更改 broker.id
    zookeeper.connect=localhost:2181 # 这里需要配置成 zookeeper 的地址,这里默认就是正确的
    #listeners=PLAINTEXT://:9092
    listeners=PLAINTEXT://:9092 # 删除前面的注释符号

    5.启动 Kafka 守护进程:
    kafka-server-start.sh -daemon config/server.properties &

    6.测试创建一个 topic:
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    7.列出 topic :kafka-topics.sh --list --zookeeper localhost:2181

    五、Canal安装
    在 https://github.com/alibaba/canal/releases 中查找 canal 最新deploy 版本的下载链接
    1.下载压缩包:
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    2.解压
    mkdir tmp/canal && tar zxvf canal.deployer-1.1.4.tar.gz -C tmp/canal/

    将解压后的文件复制进 /usr/local:
    mkdir /usr/local/canal && mv tmp/canal/* /usr/local/canal/

    3.创建测试配置直接使用自带的样例进行更改,并复制出默认配置以备后期增加库时使用,这里的每一个文件夹就代表一个数据源,多个数据源就复制出多个文件夹后在 canal.properties 的 canal.destinations 里逗号分隔进行配置:
    mv conf/example/ conf/test && cp conf/test/instance.properties instance.properties.bak

    4.修改单个数据源的配置
    vim conf/test/instance.properties

    canal.instance.master.address=127.0.0.1:3306 # 数据库服务器地址
    canal.instance.dbUsername=canal # 数据库用户名
    canal.instance.dbPassword=Canal123! # 数据库密码
    canal.instance.filter.regex=.*\..* # 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
    canal.mq.topic=test # 这个数据库存储进 Kafka 时使用的 topic

    修改 Canal 全局设置 vim conf/canal.properties
    canal.destinations = test # 这里配置开启的 instance,具体方法上面步骤有说明
    canal.serverMode = kafka # 更改模式,直接把数据扔进 Kafka
    #canal.mq.servers = 127.0.0.1:6667
    canal.mq.servers = 127.0.0.1:9092 # Kafka 的地址
    canal.mq.batchSize = 16384 # 这里没有更改,值应该小于 Kafka 的 config/producer.properties 中 batch.size,但是 Kafka 里没设置,这里也就不更改了
    #canal.mq.flatMessage = false
    canal.mq.flatMessage = true # 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便

    配置文件详解:
    参数名字 参数说明 默认值
    canal.destinations 当前server上部署的instance列表
    canal.id 每个canal server实例的唯一标识,暂无实际意义 1
    canal.ip canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 无
    canal.port canal server提供socket服务的端口 11111
    canal.zkServers canal server链接zookeeper集群的链接信息
    例子:127.0.0.1:2181,127.0.0.1:2182 无
    canal.zookeeper.flush.period canal持久化数据到zookeeper上的更新频率,单位毫秒 1000
    canal.file.data.dir canal持久化数据到file上的目录 …/conf (默认和instance.properties为同一目录,方便运维和备份)
    canal.file.flush.period canal持久化数据到file上的更新频率,单位毫秒 1000
    canal.instance.mysql.slaveId mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 1234
    canal.instance.master.address mysql主库链接地址 127.0.0.1:3306
    canal.instance.master.journal.name mysql主库链接时起始的binlog文件 无
    canal.instance.master.position mysql主库链接时起始的binlog偏移量 无
    canal.instance.master.timestamp mysql主库链接时起始的binlog的时间戳 无
    canal.instance.dbUsername mysql数据库帐号 canal
    canal.instance.dbPassword mysql数据库密码 canal
    canal.instance.defaultDatabaseName mysql链接时默认schema
    canal.instance.connectionCharset mysql 数据解析编码 UTF-8
    canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式.

    多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
    常见例子:
    1. 所有表:.* or .*\..*
    2. canal schema下所有表: canal\..*
    3. canal下的以canal打头的表:canal\.canal.*
    4. canal schema下的一张表:canal.test1
    5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤) .*\..*

    开启 Canal :
    bin/startup.sh #因为 Canal 的脚本名称都太普通,所以没有添加到 PATH 里
    查看日志是否有异常:
    vim logs/canal/canal.log
    vim logs/test/test.log

    测试,连上数据库尝试执行更改 SQL
    CREATE DATABASE IF NOT EXISTS test;
    USE test;
    CREATE TABLE test_tb(c1 INT COMMENT "中文测试",c2 VARCHAR(36)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

    查看Kafka中的数据,列出所有topic:
    kafka-topics.sh --list --zookeeper localhost:2181
    消费其中的数据:
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    此时应该已经可以列出创建表时的语句
    后期 Bug 修复:
    消费端 Kafka 不进数据,Canal日志报错 org.apache.kafka.common.errors.RecordTooLargeException,认为是 Kafka 消息体大小限制造成的,需要同时修改 Kafka 与 Canal 消息体的最大限制
    修改Kafka配置,server.properties 中修改或添加配置项 message.max.bytes=100000000,producer.properties 中修改或添加配置项 max.request.size=100000000,consumer.properties 中修改或添加配置项 max.partition.fetch.bytes=100000000,重启 Kafka
    修改Canal配置,canal.properties 修改 canal.mq.maxRequestSize 参数值为 90000000,重启 Canal
    查看Canal日志是否报错 Could not find first log file name in binary log index file at… 如果报错则停止 Canal ,再删除实例配置下的 meta.dat 文件,再启动Canal即可

    转载自:

    https://blog.csdn.net/weixin_45203131/article/details/104823320

    https://blog.csdn.net/uxiAD7442KMy1X86DtM3/article/details/104890044

    https://blog.csdn.net/IT_liuzhiyuan/article/details/102854515

  • 相关阅读:
    跨公司销售利润中心替代
    [WCF学习笔记] 我的WCF之旅(1):创建一个简单的WCF程序
    linux操作常用命令
    java lambda表达式
    关于lock和synchronized的选择
    ssh免密登陆(简单快捷)
    su和sudo的区别
    Linux常用查找命令
    vmware完整克隆(linux)
    springboot2.0拦截器和webconfigure配置
  • 原文地址:https://www.cnblogs.com/OrcinusOrca/p/14704731.html
Copyright © 2011-2022 走看看