zoukankan      html  css  js  c++  java
  • 阿里Canal安装和代码示例

    Canal的简单使用


    canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据,用于实际工作中,比较实用,特此记录一下

    Canal简介

    canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

    阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务

    基于日志增量订阅&消费支持的业务:
    数据库镜像
    数据库实时备份
    多级索引 (卖家和买家各自分库索引)
    search build
    业务cache刷新
    价格变化等重要业务消息

    keyword:数据库同步,增量订阅&消费。

    首先,下载canal安装包:

    https://github.com/alibaba/canal/releases

     

    安装步骤:

    opt(或者其他目录)创建一个canal文件夹

     

    因为canal解压后并没有整体的文件夹,是把里面的内容一下解压出来的

    然后进入canal文件夹,将安装包canal.deployer-1.0.24.tar.gz传输进来

    解压:

    tar -zxvf canal.deployer-1.0.24.tar.gz

     

    可以删除安装包canal.deployer-1.0.24.tar.gz

    修改canal配置文件

    vi conf/example/instance.properties

     

    修改红框内的内容 
    canal.instance.dbUsername = ****          #数据库用户名 
    canal.instance.dbPassword = ****           #数据库密码 
    canal.instance.defaultDatabaseName = ****   #指定需要同步的数据库 
    canal.instance.connectionCharset = UTF-8      #指定编码方式

    然后保存:

    esc
    
    :wq

    配置mysql数据库 

    vi /etc/my.cnf 

    添加以下三行内容,如果原来存在,则不需要添加,只需对当前配置项进行修改即可

    log-bin=mysql-bin             #添加这一行就ok 
    binlog-format=ROW           #选择row模式 
    server-id=1       #配置mysql replaction需要定义,不能和canalslaveId重复

     

    然后保存:

    esc
    
    :wq

    配置canal用户 

    root用户登录mysql:

    mysql -uroot -p

    回车,输入密码

    创建canal用户:

    create user canal identified by 'canal';

    有可能会出现如下情况:

     

    直接刷新:

    FLUSH PRIVILEGES;

    然后再次创建canal”用户

     

    canal”用户赋予相应权限:

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

    然后刷新权限:

    FLUSH PRIVILEGES;

    退出mysql

    exit;

    重启mysql服务

    sudo service mysqld restart

     

    cdbin目录,启动canal-server

    cd bin/
    
    ./startup.sh

    查看启动状态:

    tail -f -n 50 ../logs/canal/canal.log

     

    用客户端代码进行检测

    package com.test;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.alibaba.otter.canal.protocol.Message;
    
    public class TestCanal {
    
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("***canal所在的主机ip***", 11111),
                    "example", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\..*");
                connector.rollback();
                int totalEmtryCount = 1200;
                while (emptyCount < totalEmtryCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
    //                    System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] 
    ", batchId, size);
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
    
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
    
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
    }

     直接启动mian函数

    然后任意选择一张表,插入数据库一条数据

    控制台输出内容

     

     更多信息可以参考:

    谈谈对Canal( 增量数据订阅与消费 )的理解

    http://www.importnew.com/25189.html

    canal集群部署与java接入

    https://blog.csdn.net/my201110lc/article/details/78836270

  • 相关阅读:
    MD文件利用标题等级进行分割代码实现
    IDEA插件-Git Commit Template
    IDEA插件-Translation
    IDEA使用-Debug回到上一步
    Java语法糖详解
    MySQL 事务的隔离级别初窥
    Java异常体系概述
    ssh-copy-id三步实现SSH免密登录
    深入理解ThreadLocal
    使用Guava RateLimiter限流入门到深入
  • 原文地址:https://www.cnblogs.com/java-spring/p/8930740.html
Copyright © 2011-2022 走看看