zoukankan      html  css  js  c++  java
  • Canal的安装与使用

    一、Canal介绍

      Canal的原理就是它自己伪装成slave, 向mysql发送dump协议,MySQL master接收到dump请求之后推送binlog文件给slave, 也就是canal。  

    二、Canal安装

      1. 下载Canal 

       wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

      2. 解压到/opt/softwares/canal目录, 解压完之后如下图所示:

      3. 配置instance

      4. 修改canal.properties

    三、Mysql 安装

      1、mysql 安装

        yum install mysql

        yum install mysql-server 

      2、启动mysql

        /etc/init.d/mysqld start 或者sevice mysqld start

      3、设置root用户密码

        mysqladmin -u root password '123456' 

      4、登录msyql

        mysql -uroot -p123456

      5、检查并开启binlog复制功能及binlog模式是否为ROW模式

        参考: binlog详解

    四、Canal抽取binlog

      Canal只是伪装成slave抽取binlog,Canal拿到binlog之后还需要交给业务方去做响应的处理,那么怎么去交给业务方呢?一般都是Canal获取到binlog之后写到kafka里,业务方订阅kafka topic消费binlog,完成业务逻辑处理。

      但是Canal不能直接写Kafka, 所以还需要有个client连接Canal,Canal获取binlog之后交给Client, Client在往Kafka里写binlog消息,Client代码如下:

      

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class CanalClientExample {
    
        public static void main(String[] args) {
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.41.254", 11111), "example", "canal", "canal");
            try {
                int batchSize = 1000;
    
                connector.connect();
                connector.subscribe("zhengxinv6\..*");
                connector.rollback();
    
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(batchSize);
    
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                        continue;
                    }
    
                    System.out.println("batchId = [" + batchId + "]");
                    printEntry(message.getEntries());
    
                    connector.ack(batchId); //提交确认
                    //connector.rollback(batchId);
                }
    
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                CanalEntry.RowChange rowChange = null;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException(
                            "ERROR ## parser of eromanga-event has an error,data:"
                                    + entry.toString(), e);
                }
    
                CanalEntry.EventType eventType = rowChange.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(),
                        entry.getHeader().getLogfileOffset()+"",
                        entry.getHeader().getSchemaName(),
                        entry.getHeader().getTableName(),
                        eventType));
    
                for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue()
                        + "    update=" + column.getUpdated());
            }
        }
    }

     

    五、Canal使用过程出现的问题及解决方法

      参考:canal报错解决方法

      

    参考:https://www.jianshu.com/p/6299048fad66

  • 相关阅读:
    芯片难题
    permutation
    小凸玩矩阵
    gender
    NOI2019序列非启发式做法
    莫比乌斯函数&莫比乌斯反演
    「雅礼day2」最大公约数gcd
    容斥原理&反演
    树上路径的交和并
    CF906D Power Tower
  • 原文地址:https://www.cnblogs.com/jsnr-tdyd/p/9074699.html
Copyright © 2011-2022 走看看