zoukankan      html  css  js  c++  java
  • 阿里Canal中间件的初步搭建和使用

     

     

    一、前言

    Binlog是MySQL数据库的二进制日志,用于记录用户对数据库操作的SQL语句(除了数据查询语句)信息。而Binlog格式也有三种,分别为STATEMENT、ROW、MIXED。STATMENT模式基于SQL语句的复制,每一条会修改数据的SQL语句会记录。ROW模式除了记录SQL语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,会占用较多的空间。MIXED比较灵活的记录,当遇到表结构变更的时候,就会记录为STATMENT模式,当遇到了数据更新或者删除情况下就会变为ROW模式。Binlog三个用途分别为数据恢复、复制、审计。

    Canal是阿里MySQL数据库Binlog的增量订阅&消费组件 ,基于数据库Binlog可以监控数据库数据的变化,进而用于数据同步等业务。分为Canal Server与Canal Client,前者读取Binlog解析后存储,后者连接前者消费。

    二、安装搭建

    1、下载安装包。并上传至服务器中。下载地址为:https://github.com/alibaba/canal/releases

    2、将home文件夹中的压缩包解压至安装路径(如下图所示)。

    1 tar -xzf /home/canal.deployer-1.1.3.tar.gz -C /usr/java/canal

    3、进入canal文件夹,修改配置文件(如下图所示)。

    1 vi conf/example/instance.properties

    1 canal.instance.dbUsername=root #数据库账号
    2 canal.instance.dbPassword=1234 #数据库密码
    3 canal.instance.defaultDatabaseName = corporate_genealogy #数据库
    4 canal.instance.connectionCharset = UTF-8 #数据库编码

    4、配置MySQL数据库,开启Binlog,并选择模式为ROW(如下图所示)。

    1 vi /etc/my.cnf

    1 #canal
    2 log-bin=mysql-bin
    3 binlog-format=ROW 
    4 server_id=1

    5、数据库创建canal用户,赋予权限,并刷新(如下图所示)。

    ps:这里遇到一个异常信息,是因为数据库密码过于简单,不符合密码策略,需要修改一下策略。。。

    1 mysql -uroot -p1234
    1 SHOW VARIABLES LIKE 'validate_password%';
    1 set global validate_password_policy=LOW;
    1 set global validate_password_length=4;
    1 create user canal identified by 'canal';
    1 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    1 FLUSH PRIVILEGES;

    6、退出并重启MySQL。

    1 exit;
    1 sudo service mysqld restart;

    7、进入canal的bin文件夹,启动canal-server。

    1 ./startup.sh

    8、查看logs文件中日志是否启动成功(如下图所示)。

    三、客户端代码检测

    ps:需要注意的是服务器防火墙需打开对应端口号,这里是11111。

    1、添加Maven依赖

    1 <!-- Canal -->
    2 <dependency>
    3     <groupId>com.alibaba.otter</groupId>
    4     <artifactId>canal.client</artifactId>
    5     <version>1.1.3</version>
    6 </dependency>

    2、测试类代码

    复制代码
     1 import java.net.InetSocketAddress;
     2 import java.util.List;
     3 
     4 import com.alibaba.otter.canal.client.CanalConnector;
     5 import com.alibaba.otter.canal.client.CanalConnectors;
     6 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
     7 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
     8 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
     9 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    10 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    11 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    12 import com.alibaba.otter.canal.protocol.Message;
    13 
    14 public class TestCanal {
    15 
    16     public static void main(String args[]) {
    17         // 创建链接
    18         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服务器IP", 11111),
    19                 "example", "", "");
    20         int batchSize = 1000;
    21         int emptyCount = 0;
    22         try {
    23             connector.connect();
    24             connector.subscribe(".*\..*");
    25             connector.rollback();
    26             int totalEmtryCount = 1200;
    27             while (emptyCount < totalEmtryCount) {
    28                 // 获取指定数量的数据
    29                 Message message = connector.getWithoutAck(batchSize);
    30                 long batchId = message.getId();
    31                 int size = message.getEntries().size();
    32                 if (batchId == -1 || size == 0) {
    33                     emptyCount++;
    34                     try {
    35                         Thread.sleep(1000);
    36                     } catch (InterruptedException e) {
    37                         e.printStackTrace();
    38                     }
    39                 } else {
    40                     emptyCount = 0;
    41                     printEntry(message.getEntries());
    42                 }
    43                 // 提交确认
    44                 connector.ack(batchId);
    45             }
    46             System.out.println("empty too many times, exit");
    47         } finally {
    48             connector.disconnect();
    49         }
    50     }
    51 
    52     private static void printEntry(List<Entry> entrys) {
    53         for (Entry entry : entrys) {
    54             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
    55                     || entry.getEntryType() == EntryType.TRANSACTIONEND) {
    56                 continue;
    57             }
    58 
    59             RowChange rowChage;
    60             try {
    61                 rowChage = RowChange.parseFrom(entry.getStoreValue());
    62             } catch (Exception e) {
    63                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
    64             }
    65 
    66             EventType eventType = rowChage.getEventType();
    67             System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    68                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    69                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
    70 
    71             for (RowData rowData : rowChage.getRowDatasList()) {
    72                 if (eventType == EventType.DELETE) {
    73                     printColumn(rowData.getBeforeColumnsList());
    74                 } else if (eventType == EventType.INSERT) {
    75                     printColumn(rowData.getAfterColumnsList());
    76                 } else {
    77                     System.out.println("-------> before");
    78                     printColumn(rowData.getBeforeColumnsList());
    79                     System.out.println("-------> after");
    80                     printColumn(rowData.getAfterColumnsList());
    81                 }
    82             }
    83         }
    84     }
    85 
    86     private static void printColumn(List<Column> columns) {
    87         for (Column column : columns) {
    88             System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    89         }
    90     }
    91 
    92 }
    复制代码

    3、Navicat 连接对应数据库进行一些添加删除更新操作,控制台输出如下图所示。

    四、总结展望

    考虑到Canal的堆积能力并不强。堆积数据到10W+时,速度会变慢,并会出现假死现象。因此介入消息中间件MQ非常有必要,解决堆积能力问题,可以延后消费,能够方便的得到积压数据,进行监控报警。

    本文部分学习参考了:https://www.cnblogs.com/java-spring/p/8930740.html

    至此是关于介绍在Linux系统中阿里Canal中间件的初步搭建和使用,后续会介绍配合消息中间件等方式处理数据同步及其它业务逻辑。

    如有疏漏错误之处,还请不吝赐教!

  • 相关阅读:
    android 中文 api (43) —— Chronometer
    SVN客户端清除密码
    Android 中文 API (35) —— ImageSwitcher
    Android 中文API (46) —— SimpleAdapter
    Android 中文 API (28) —— CheckedTextView
    Android 中文 API (36) —— Toast
    Android 中文 API (29) —— CompoundButton
    android 中文 API (41) —— RatingBar.OnRatingBarChangeListener
    Android 中文 API (30) —— CompoundButton.OnCheckedChangeListener
    Android 中文 API (24) —— MultiAutoCompleteTextView.CommaTokenizer
  • 原文地址:https://www.cnblogs.com/shoshana-kong/p/14076195.html
Copyright © 2011-2022 走看看