zoukankan      html  css  js  c++  java
  • (1)Canal入门

    1.前言

    在我们系统开发过程中,根据业务场景很多数据库数据并不会直接给用户访问的,需要同步保存到ElasticSearch、Redis等存储应用当中(例如最常见的是搜索页面的ElasticSearch数据)。而阿里开源的框架Canal就是做这方面的功能,它可以把数据库(暂时只支持MySQL和Oracle部分版本)日志解析获取增量变更同步到其他存储应用去。

    2.什么是Canal?

    官网介绍,canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。
    从上述介绍我们可以简单认为Canal就是一个简单的增量数据同步工具。

    2.1MySQL主备复制原理

    根据官网介绍,MySQL主备复制原理如下:
     
    ●MySQL master(主库)将数据变更(增删改)写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看)。
    ●MySQL slave(从库)将master的binary log events 拷贝到它的中继日志(relay log)。
    ●MySQL slave(从库)重放relay log中事件,将数据变更反映它自己的数据。

    2.2Canal工作原理

    根据官网介绍,Canal工作原理如下图所示:
     
    ●canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议。
    ●MySQL master收到dump请求,开始推送binary log给slave(即canal )。
    ●canal解析binary log对象(原始为byte流),再推送到MySQL、kafka、ElasticSearch等存储应用当中。

    3.Canal能做什么?

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger(触发器)获取增量变更。从2010年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以Canal就是在这个场景中诞生的,它主要作用就是解决基于日志增量订阅和消费的业务,例如:
    ●数据库镜像。
    ●数据库实时备份。
    ●索引构建和实时维护(拆分异构索引、倒排索引等)。
    ●业务缓存刷新。
    ●带业务逻辑的增量数据处理(例如ElasticSearch、Redis数据同步)。
    在我做过的项目中,cancal经常被用到如下场景:
    ●根据数据库的数据变更实时更新搜索引擎数据,比如我司电商场景下物料数据发生变更(例如后台上传更新物料信息、价格),实时同步到搜索引擎Elasticsearch上。
    ●根据数据库的数据变更实时更新缓存,比如专门运营人员每次修改物料品牌信息同时都会同步到Redis上。
    ●根据数据库的数据变更实时推送到消息队列,比如为了丰富自身系统物料库存,定时作业拉取第三方渠道物料库存推送到RabbitMQ等消息队列去消费入库。

    4.如何搭建Canal?

    4.1首先得安装个MySQL数据库

    如果已经安装好MySQL数据库的,这一步可以跳过,如果没有安装好,请自行安装(也可以查看我之前写过一篇MySQL安装教程,不过个人建议最好还是在Docker上安装,简单方便快捷,如果自己手动安装,不懂点运维基础知识,坑太多了),具体安装教程度娘一堆资料。当前的canal支持MySQL版本包括5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
    我的MySQL版本是8.0.23,所以canal是支持的。不知道自己安装是什么版本可以通过SELECT VERSION();命令查看。

    4.2数据库配置

    从上述可知,因为canal是模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议获取binary log内容对象的,所以需要MySql开启binlog。
    ●修改mysql.cnf中的配置:

    -- 编辑mysql.cnf文件
    vim /etc/my.cnf;
    -- 在my.cnf上加入如下配置
    [mysqld]
    log-bin=mysql-bin #开启binlog
    binlog-format=ROW #选择ROW模式
    server_id=1 #配置MySQL replaction需要定义,不要和canal的slaveId重复
    expire-logs-days=10 #binlog日志保留的天数,清除超过10天的日志,防止日志文件过大,导致磁盘空间不足

    ●授权canal链接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接grant(我这边是根据官网示例创建一个canal账号来演示):

    -- 先登录MySQL
    mysql -u root -p
    -- 创建用户,用户名:canal,密码:qwer1234
    CREATE USER canal IDENTIFIED BY 'qwer1234';   
    -- 授予上的所有权限给canal用户;
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- 刷新权限;
    FLUSH PRIVILEGES;

    ●查看下MySql是否开启binlog日志
    是否开启binlog日志:

    SHOW VARIABLES LIKE 'log_bin';

     
    查看binlog日志文件列表:

    SHOW BINARY LOGS;

     
    查看当前正在写入的binlog文件:

    SHOW MASTER STATUS;

     

    4.3Canal配置

    安装运行Canal服务端,一定要记得先检查当前Linux系统是否安装了java8环境,如果没有安装启动Canal时候会有如下提示:

    [root@dengwu canal]# sh bin/startup.sh
    which: no java in (/data/mysql/bin:/data/mysql/lib:/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)
    Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.

     
    具体安装步骤如下:
    ●先从Oracle官网下载JDK安装包:
     
    通过Xftp工具导入到预先创建/app/package安装包目录下,再在/usr目录下创建java目录并解压:

    mkdir /usr/java
    cd /app/package
    tar zxvf jdk-8u291-linux-x64.tar.gz  -C /usr/java

    然后配置java环境变量:

    vim /etc/profile

    用vim编辑器来编辑profile文件,输入i在文件末尾添加以下内容:

    export JAVA_HOME=/usr/java/jdk1.8.0_291
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
    export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
    export PATH=$PATH:${JAVA_PATH}

     
    配置完java环境变量后,:wq保存退出,看看是否生效:

    echo $PATH

    如果没有生效,让其生效:

    source /etc/profile

    再瞄瞄java8是否安装成功:

    java -version

     
    ●然后下载canal, 访问release页面, 选择需要的包下载, 如最新版本1.1.5为例:
     
    可以使用wget工具下载:

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

    或者手动下载,通过Xftp等工具拉入安装包目录(/app/package)中:
     
    再创建canal安装目录解压安装包:

    mkdir /app/canal
    cd /app/package
    tar zxvf canal.deployer-1.1.5.tar.gz -C /app/canal

    然后修改配置:

    cd /app/canal
    vi conf/example/instance.properties
    i
    :wq

     
    ●启动canal:

    cd /app/canal
    sh bin/startup.sh

    注:Windows使用startup.bat启动
    ●查看canal进程是否启动成功:

    ps -ef | grep canal

    ●查看instance的日志:

    vi logs/example/example.log

    ●关闭canal:

    sh bin/stop.sh

    ●在数据库中查看从库信息:

    SHOW SLAVE HOSTS;

     
    查看下canal实例(example)配置是否成功。
    ●记得把canal端口加入防火墙策略去:

    -- 允许通过防火墙
    firewall-cmd --permanent --zone=public --add-port=11111/tcp
    -- 从防火墙里移除
    firewall-cmd --permanent --zone=public --remove-port=11111/tcp
    -- 查看端口在防火墙状态
    firewall-cmd --permanent --zone=public --query-port=11111/tcp
    -- 重启防火墙
    firewall-cmd --reload

    注:如果是买阿里云服务器,要到阿里云安全组添加允许通过策略。还有Canal Server的默认端口为:11111,若需要修改,可以去/canal/conf目录下的canal.properties配置文件中进行修改。

    5.Canal的.NET客户端CanalSharp使用

    5.1快速入门

    ●先安装客户端:

    Install-Package CanalSharp

    ●初始化日志:
    CanalSharp使用Microsoft.Extensions.Logging.Abstractions,因为目前主流日志组件,如:nlog、serilog等,全部支持此日志抽象接入,也就是说你可以通过安装nlog、serilog对其的适配,来使用它们,无论是Console App或则是Web App。

    var loggerFactory = LoggerFactory.Create(builder =>
    {
        builder
            .AddFilter("Microsoft", LogLevel.Debug)
            .AddFilter("System", LogLevel.Information)
            .AddConsole();
    });
    var logger= loggerFactory.CreateLogger<SimpleCanalConnection>();

    ●创建连接:

    var conn=new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1",11111,1234),logger);
    await conn.ConnectAsync();//连接到Canal Server
    await conn.SubscribeAsync();//订阅

    ●获取数据:

    var msg = await conn.GetAsync(1024);

    5.2进阶使用

    ●解析数据
    ○Entry
    上文conn.GetAsync()返回的是一个Entry集合,Entry对应binlog记录,它可能是事务标记也有可能是行数据变化,通过Entry.EntryType来区分,一般事务的标记在业务消费处理时不需要处理。
    示例:

    var entries = await conn.GetAsync(1024);
    foreach (var entry in entries)
    {
        //不处理事务标记
        if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
        {
            continue;
        }
    }

    Entry.Header包含了一些binlog以及数据库信息:

    属性

    说明

    Entry.Header.LogfileName

    binlog文件名

    Entry.Header.LogfileOffset

    binlog偏移

    Entry.Header.SchemaName

    mysql schema名称

    Entry.Header.TableName

    表名

    ○RowChange
    一般在业务处理中,都会需要行数据的变更,将Entry转换为RowChange对象。
    示例:

    RowChange rowChange = null;
    try
    {
        rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
    }
    catch (Exception e)
    {
        _logger.LogError(e);
    }

    通过RowChange.EventType来Row是什么变化,Update、Delete和Insert对应sql中的update、delete和insert语句,通过RowChange.RowDatas属性,来访问RowChange对象中包含的行变化数据集合。示例,遍历 RowChange 中的行数据:

    foreach (var rowData in rowChange.RowDatas)
    {
        //删除的数据
        if (eventType == EventType.Delete)
        {
            PrintColumn(rowData.BeforeColumns.ToList());
        }
        //插入的数据
        else if (eventType == EventType.Insert)
        {
            PrintColumn(rowData.AfterColumns.ToList());
        }
        //更新的数据
        else
        {
            _logger.LogInformation("-------> before");
            PrintColumn(rowData.BeforeColumns.ToList());
            _logger.LogInformation("-------> after");
            PrintColumn(rowData.AfterColumns.ToList());
        }
    }
    
    private static void PrintColumn(List<Column> columns)
    {
        foreach (var column in columns)
        {
            Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
        }
    }

    ○Column
    Column如其名,代表数据库中表的每一列的信息:

    属性名

    说明

    Column.Name

    列名

    Column.Value

    列的值

    Column.Updated

    列是否被更新

    5.3应答机制

    应答机制可以保证消费数据的准确性,Canal服务端会记录Client消费的进度,需要客户端发送ACK消息,服务端才会更新进度。类似于在消息队列中的ACK机制,如RabbitMQ。

    ●自动应答

    await conn.GetAsync(1024);//获取数据并自动应答
    GetAsync()会在获取数据后,自动向Server发送ack消息。

    ●手动应答

    var msg = await conn.GetWithoutAckAsync(1024);//获取数据
    await conn.AckAsync(msg.Id);//手动应答
    await conn.RollbackAsync(msg.Id);//回滚

    5.4高可用

    这里的高可用分为两类,客户端集群和服务端集群。都是采用冷备模式,因为对于binlog数据消费来说,并行处理将会带来数据顺序错乱的问题,当然你可以通过一些复杂的机制去实现,这里不做说明。集群部署需要Zookeeper组件。
    ●服务端集群
    在conf/canal.properties文件中修改zookeeper地址:

    canal.zkServers=127.0.0.1:2181

    集群中每个实例需配置相同的zookeeper地址。
    ●客户端集群
    客户端集群和服务端集群采用相同的模式,每个实例去抢占锁,获得了锁那么这个实例就运行获取数据,其他实例做冷备。若正在运行消费数据的实例由于网络波动,导致和zookeeper失去连接,那么其他客户端实例不会立即抢占,会等待60s后才执行抢占,给与这个实例恢复的机会。
    客户端集群使用的连接对象和快速入门中的不同:ClusterCanalConnection,但使用方法基本相同。
    示例:

    //初始化日志
    var loggerFactory = LoggerFactory.Create(builder =>
                {
                    builder
                        .AddFilter("Microsoft", LogLevel.Debug)
                        .AddFilter("System", LogLevel.Information)
                        .AddConsole();
                });
    var logger = loggerFactory.CreateLogger<Program>();
    //设置zk地址和clientid,统一集群的client必须相同
    var conn = new ClusterCanalConnection( new ClusterCanalOptions("localhost:2181", "12350")
    //连接到Server                                      loggerFactory);
    await conn.ConnectAsync();
    //订阅
    await conn.SubscribeAsync();
    await conn.RollbackAsync(0);
    while (true)
    {
        try
        {
            //获取数据
            var msg = await conn.GetAsync(1024);
        }
        catch (Exception e)
        {
            _logger.LogError(e,"Error.");
            //发生异常执行重连,此方法只有集群连接对象才有
            await conn.ReConnectAsync();
        }
    }

    5.5订阅

    订阅指过滤表(table)的规则,Canal客户端发送给客户端订阅规则,那么服务端将会推送符合规则的表数据过来,采用正则匹配。
    允许所有表:.*\\..*

    6.小结

    这里这是简单介绍Canal工作原理,能做什么,还有.NET客户端CanalSharp使用,其实Canal涉及知识点还是很多的,例如配置MQ模式、服务集群、Web管理界面部署,多实例等等。后面如果有时间,我还会继续花时间去学习。

    参考文献:
    CanalSharp文档
    CanalSharp

  • 相关阅读:
    Spring boot unable to determine jdbc url from datasouce
    Unable to create initial connections of pool. spring boot mysql
    spring boot MySQL Public Key Retrieval is not allowed
    spring boot no identifier specified for entity
    Establishing SSL connection without server's identity verification is not recommended
    eclipse unable to start within 45 seconds
    Oracle 数据库,远程访问 ora-12541:TNS:无监听程序
    macOS 下安装tomcat
    在macOS 上添加 JAVA_HOME 环境变量
    Maven2: Missing artifact but jars are in place
  • 原文地址:https://www.cnblogs.com/wzk153/p/14951991.html
Copyright © 2011-2022 走看看