zoukankan      html  css  js  c++  java
  • canal

    canal 是阿里巴巴开源的 MySQL binlog 增量订阅&消费组件

    canal 工作原理:

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    本文使用的 canal 版本为 v1.1.4

    一、canal 安装与配置

    安装与配置可以参考:QuickStart,我的 conf/example/instance.properties 配置:

    ## mysql serverId , v1.0.26+ will autoGen
    canal.instance.mysql.slaveId=1234
    
    ... ...
    
    canal.instance.master.address=192.168.137.1:3306
    
    ... ...
    
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    
    ... ...
    

    二、客户端使用

    添加引用:

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.4</version>
    </dependency>
    

    代码:

    @Component
    public class CanalRunner implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(CanalRunner.class);
    
        @Override
        public void run(String... args) throws Exception {
    
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("192.168.137.101", 11111)
                    , "example"
                    , "canal", "canal");
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
    
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取指定数量的数据
                long batchId = message.getId();
                if (batchId == -1 || message.getEntries().isEmpty()) {
                    Thread.sleep(1000);
                    continue;
                }
    
                try {
                    printEntries(message.getEntries());
                    connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
                } catch (Exception e) {
    //                connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
                }
    
            }
    
        }
    
        private void printEntries(List<CanalEntry.Entry> entries) throws Exception {
            for (CanalEntry.Entry entry : entries) {
                if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                    continue;
                }
    
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    
                CanalEntry.EventType eventType = rowChange.getEventType();
    
                LOGGER.info("数据库名: {}, 数据表名: {}, 事件: {}"
                        , entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType);
    
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    switch (rowChange.getEventType()) {
                        case INSERT:
                            printColumns(rowData.getAfterColumnsList());
                            break;
                        case UPDATE:
                            LOGGER.info("更新前数据:");
                            printColumns(rowData.getBeforeColumnsList());
                            LOGGER.info("更新后数据:");
                            printColumns(rowData.getAfterColumnsList());
                            break;
                        case DELETE:
                            printColumns(rowData.getBeforeColumnsList());
                            break;
                        default:
                            break;
                    }
                }
            }
        }
    
        private void printColumns(List<CanalEntry.Column> columns) {
            Map<String, Object> map = new HashMap<>();
            for(CanalEntry.Column column : columns) {
                map.put(column.getName(), column.getValue());
            }
    
            ObjectMapper objectMapper = new ObjectMapper();
    
            try {
                LOGGER.info(objectMapper.writeValueAsString(map));
            } catch (JsonProcessingException e) {
            }
        }
    }
    

    CanalConnector 中的参数来自于 /conf/canal.properties:

      
    port canal.port
    destination canal.destinations
    username canal.instance.tsdb.dbUsername
    password canal.instance.tsdb.dbPassword

    参考

    1. canal
    2. 史上最全的分布式数据同步中间间canal 之入门篇
  • 相关阅读:
    使用事件模式(Event API)读取Excel2007(.xlsx)文件
    rocketMq消息的发送和消息消费
    Java 七牛云存储与下载
    Spring boot + Jpa + Maven + Mysql 初级整合
    Spring+SpringMvc+Hibernate整合记录
    Mybatis-Generator自动生成代码
    idea spring+springmvc+mybatis环境配置整合详解
    Linux下安装redis
    SpringMvc的基础配置<一>
    JAVA从本机获取IP地址
  • 原文地址:https://www.cnblogs.com/victorbu/p/13064256.html
Copyright © 2011-2022 走看看