zoukankan      html  css  js  c++  java
  • canal demo搭建全记录

    一、环境介绍

    canal是阿里开源的中间件,主要用于同步mysql数据库变更。具体参见:https://github.com/alibaba/canal/releases

    搭建环境:

    vmware centos7 部署mysql和canal

    windows开发canal client,自动捕获mysql数据库变更

    二、Centos安装Mysql

    1、尝试用yum安装mysql

    wget http://repo.mysql.com/mysql57-community-release-el7-10.noarch.rpm

    返回:2018-07-13 16:04:42 (63.9 KB/s) - ‘mysql57-community-release-el7-10.noarch.rpm’ saved [25548/25548]

    sudo rpm -Uvh mysql57-community-release-el7-10.noarch.rpm
    sudo yum install -y mysql-community-server

    如果执行顺利,安装mysql server 成功。

    2.改用阿里源安装

    可是官方的yum源在国内访问效果不佳,我下载mysql server的速度太慢了,决定改用阿里源

    #下载wget
    yum install wget -y
    
    #备份当前的yum源
    mv /etc/yum.repos.d /etc/yum.repos.d.backup4comex
    
    #新建空的yum源设置目录
    mkdir /etc/yum.repos.d
    
    #下载阿里云的yum源配置
    wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
    
    #最后重建缓存
    yum clean all
    yum makecache

    3.安装MariaDB

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可。开发这个分支的原因之一是:甲骨文公司收购了MySQL后,有将MySQL闭源的潜在风险,因此社区采用分支的方式来避开这个风险。MariaDB的目的是完全兼容MySQL,包括API和命令行,使之能轻松成为MySQL的代替品。

    安装mariadb,大小59 M。

    [root@yl-web yl]# yum install mariadb-server mariadb

    其它几条常用的mariadb命令:

    systemctl start mariadb  #启动MariaDB

    systemctl stop mariadb  #停止MariaDB

    systemctl restart mariadb  #重启MariaDB

    systemctl enable mariadb  #设置开机启动

    运行systemctl start mariadb,然后就可以正常使用mysql了

    4.设置数据库密码:

    set password for 'root'@'localhost' =password('root');

    5.遇到的几个问题

    ①从windows访问centos mysql失败

    解决方案:设置mysql允许远程连接

    mysql -u root;
    //赋予任何主机访问数据的权限
    mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
    
    //使修改生效
    mysql>FLUSH PRIVILEGES;

    ②进行上述操作之后,发现仍然连接失败,返回错误

    Can't connect to MySQL server on '10.168.12.43' (10060)

    解决方案:从windows连接vmware里面的mysql失败,关闭windows防火墙后成功。

    三、部署canal server

    (参考:https://github.com/alibaba/canal/wiki/QuickStart

    1.下载canal server

    https://github.com/alibaba/canal/releases

    Image

    我下载的是canal.exaple-1.0.24.gar.gz,下载完成后解压缩:

    mkdir /tmp/canal
    tar zxvf canal.deployer-1.0.24.tar.gz  -C /tmp/canal

    2.查看binlog相关数据库命令:

    是否启用了日志
    mysql>show variables like 'log_bin';
    
    怎样知道当前的日志
    mysql> show master status;
    
    查看mysql binlog模式
    show variables like 'binlog_format';
    
    获取binlog文件列表
    show binary logs;
    
    查看当前正在写入的binlog文件
    show master statusG
    
    查看指定binlog文件的内容
    show binlog events in 'mysql-bin.000002';

    3.开启binlog

    如果log_bin关闭,需要在etc下面找到my.cnf,开启binlog:

    server-id=1
    log-bin=/var/lib/mysql/mysql-bin

    然后重启mysql服务。

    4.添加canal mysql数据库账号

    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    
    GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    
    FLUSH PRIVILEGES;

    5.配置canal实例,设置本地数据库信息

    vi conf/example/instance.properties

    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    # position info
    canal.instance.master.address = 10.168.12.43:3306
    canal.instance.master.journal.name =mysql-bin.000003
    canal.instance.master.position =
    canal.instance.master.timestamp =
    
    ……
    
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =testcanal
    canal.instance.connectionCharset = UTF-8
    
    # table regex
    canal.instance.filter.regex = .*\..*

    6.启动canal

    sh bin/startup.sh

    7.查看日志

    vi logs/canal/canal.log
    vi logs/example/example.log

    四、canal lient demo

    1.引入pom依赖

    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.24</version>
    </dependency>

    2.客户端代码

    public class ClientTest {
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.168.12.43",
                    11111), "example", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                }
    
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
    
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------&gt; before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------&gt; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }

    3.建立数据库连接,进行insert,delete等数据库操作

    Image(1)

    Image(2)

    五、遇到的问题

    1.canal建立连接失败

    解决方案:用telnet命令测试建立连接仍然失败,关闭linux防火墙。

    systemctl stop firewalld.service

    其他centos7防火墙相关命令:

    firewall-cmd --list-ports#查看已经开放的端口:

    firewall-cmd --reload #重启firewall

    systemctl stop firewalld.service #停止firewall

    systemctl disable firewalld.service #禁止firewall开机启动

    firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)

  • 相关阅读:
    uva11552
    zoj3820 树的直径+二分
    hdu 5068 线段树加+dp
    zoj3822
    uva1424
    DAY 36 前端学习
    DAY 35 前端学习
    DAY 34 PYTHON入门
    DAY 33 PYTHON入门
    DAY 32 PYTHON入门
  • 原文地址:https://www.cnblogs.com/janes/p/9318576.html
Copyright © 2011-2022 走看看