zoukankan      html  css  js  c++  java
  • Canal-入门例子-下载安装(一)

    开发手册

    https://github.com/alibaba/canal

    mysql配置

    1.开启binlog

    找到mysql安装目录 mysql --help|grep my.cnf 编辑my.cf

    [mysqld]
    # log_bin
    log-bin = mysql-bin #开启binlog
    binlog-format = ROW #选择row模式
    server_id = 1 #配置mysql replication需要定义,不能喝canal的slaveId重复

    2.查看是否开启相关命令

    --是否开启了binlog on为开启
    show variables like 'log_bin'
    --查看当前日志文件信息
    show master status;
    --关闭当前日志文件 创建一个新的日志文件 原有日志文件编号+1
    flush logs;
    --查看所有日志文件events
    show binlog events limit 0,1
    --查看指定文件events
    show binlog events in 'mysql-bin.003243';
    --查看文件列表
    show binary logs;
    --查看binlog格式 ROW 
    show variables like 'binlog_format'

    canal目录介绍

    /bin 为启动脚本

    /conf/canal_local.propreteis 为canal admin集群时的配置通过./start.sh 使用

    /conf/example 为canal监控的数据库实例的配置 客户端通过指定Destination 可以消费具体一个 比如order服务就消费order数据库实例  product则消费product数据库实例 但是canal-server是一个 当然对应的目录则不是example而是需要创建order和produc目录 同时在canal.propreteis 指定canal.destinations = order,product

                        /h2.mv.db 为使用h2数据库时使用 /meta.dat 存储了当前读取的binlog文件 和指针位置 

                        /instace.properteis 则配置的监控的数据库实例的binlog日志

    安装CanalServer

    下载

    https://github.com/alibaba/canal/releases/tag/canal-1.1.4

    或者根据下载源码执行maven安装

    这种方式推荐,因为有时需要修改源码完成定制化需求

    mvn clean install -Dmaven.test.skip -Denv=release

    1.解压

    2.修改配置文件

    canal.deployer-1.1.4/conf/example/instance.properties 配置文件

    3.启动

    4.查看是否启动成功 

    canal.deployer-1.1.4/logs/canal

    Cannal客户端

    1.创建一个demo项目

    2.引入pom依赖

      <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
            <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.3</version>
        </dependency>

    3.demo代码

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    /**
     * @author liqiang
     * @date 2020/1/13 16:21
     * @Description:
     */
    @Component
    public class CanalStart implements InitializingBean {
        @Override
        public void afterPropertiesSet() throws Exception {
            System.out.println("开始消费....");
            // 创建链接  canal的链接和地址 port默认是11111
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.3.17.72", 11111),
                    "example", "", "");//或者example2
            int batchSize = 1000;//每个批次处理1000条
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\..*");//订阅所有库下面的所有表
                //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal
                connector.rollback();
                int totalEmtryCount = 1200;
                while (emptyCount < totalEmtryCount) {//实际生产中需要设置为true,死循环
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                       // System.out.println("empty count : " + emptyCount);//代表没有需要消费的数据
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        emptyCount = 0;
                        System.out.printf("message[batchId=%s,size=%s] 
    ", batchId, size);
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据 
                }
    
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                System.out.println("rowChare ======>"+rowChage.toString());
    
                EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名称
                        entry.getHeader().getLogfileOffset(), //偏移量
                        entry.getHeader().getSchemaName(),//庫名
                        entry.getHeader().getTableName(), //表名
                        eventType));//事件名
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
    }

    测试场景

    简单查询

    select * from `user`; 

    并不会被消费到

    执行删除

    整表删除

    message[batchId=7,size=3] 
    rowChare ======>tableId: 33
    eventType: DELETE
    isDdl: false
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "2"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "liqiang"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test2@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "3"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "Tom"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "28"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test3@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "4"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "Sandy"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "21"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test4@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "5"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "Billie"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "24"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test5@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    
    ================> binlog[mysql-bin.000001:3000] , name[haoke,user] , eventType : DELETE #表示是删除事件  
    id : 2    update=false
    name : liqiang    update=false
    age : 20    update=false
    email : test2@baomidou.com    update=false
    id : 3    update=false
    name : Tom    update=false
    age : 28    update=false
    email : test3@baomidou.com    update=false
    id : 4    update=false
    name : Sandy    update=false
    age : 21    update=false
    email : test4@baomidou.com    update=false
    id : 5    update=false
    name : Billie    update=false
    age : 24    update=false
    email : test5@baomidou.com    update=false

    可以发现会消费5个事件

    根据条件删除

    delete from`user` where id=2;

    打印

    message[batchId=10,size=3] 
    rowChare ======>tableId: 116
    eventType: DELETE
    isDdl: false
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "2"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "liqiang"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test2@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    
    ================> binlog[mysql-bin.000001:3551] , name[haoke,user] , eventType : DELETE
    id : 2    update=false
    name : liqiang    update=false
    age : 20    update=false
    email : test2@baomidou.com    update=false

    修改

    update `user` u set u.`name`='小明' where id=2
    message[batchId=13,size=3] 
    rowChare ======>tableId: 117
    eventType: UPDATE
    isDdl: false
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "2"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "liqiang"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test2@baomidou.com"
        mysqlType: "varchar(50)"
      }
      afterColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "2"
        mysqlType: "bigint(20)"
      }
      afterColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: true
        isNull: false
        value: "345260217346230216"
        mysqlType: "varchar(30)"
      }
      afterColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      afterColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test2@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    
    ================> binlog[mysql-bin.000001:3989] , name[haoke,user] , eventType : UPDATE
    -------> before
    id : 2    update=false
    name : liqiang    update=false
    age : 20    update=false
    email : test2@baomidou.com    update=false
    -------> after
    id : 2    update=false
    name : 小明    update=true
    age : 20    update=false
    email : test2@baomidou.com    update=false

    可以发现打印了修改前修改后数据 以及哪个字段被修改

    新增

    INSERT INTO `haoke`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '小明2', 20, 'test6@baomidou.com');
    message[batchId=14,size=3] 
    rowChare ======>tableId: 117
    eventType: INSERT
    isDdl: false
    rowDatas {
      afterColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: true
        isNull: false
        value: "6"
        mysqlType: "bigint(20)"
      }
      afterColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: true
        isNull: false
        value: "3452602173462302162"
        mysqlType: "varchar(30)"
      }
      afterColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: true
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      afterColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: true
        isNull: false
        value: "test6@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    
    ================> binlog[mysql-bin.000001:4245] , name[haoke,user] , eventType : INSERT
    id : 6    update=true
    name : 小明2    update=true
    age : 20    update=true
    email : test6@baomidou.com    update=true

    事件是新增

    测试事物

    未提交

    mysql> START TRANSACTION;
    Query OK, 0 rows affected (0.00 sec)
     
    mysql> update `user` s set s.name='小张' where id=2;
    Query OK, 1 row affected (0.00 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    mysql> 

    并没有消费

    提交

    mysql> START TRANSACTION;
    Query OK, 0 rows affected (0.00 sec)
     
    mysql> update `user` s set s.name='小张' where id=2;
    Query OK, 1 row affected (0.00 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    mysql> commit;
    Query OK, 0 rows affected (0.01 sec)

    打印:

    message[batchId=15,size=3] 
    rowChare ======>tableId: 117
    eventType: UPDATE
    isDdl: false
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "2"
        mysqlType: "bigint(20)"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: false
        isNull: false
        value: "345260217346230216"
        mysqlType: "varchar(30)"
      }
      beforeColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test2@baomidou.com"
        mysqlType: "varchar(50)"
      }
      afterColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: true
        updated: false
        isNull: false
        value: "2"
        mysqlType: "bigint(20)"
      }
      afterColumns {
        index: 1
        sqlType: 12
        name: "name"
        isKey: false
        updated: true
        isNull: false
        value: "345260217345274240"
        mysqlType: "varchar(30)"
      }
      afterColumns {
        index: 2
        sqlType: 4
        name: "age"
        isKey: false
        updated: false
        isNull: false
        value: "20"
        mysqlType: "int(11)"
      }
      afterColumns {
        index: 3
        sqlType: 12
        name: "email"
        isKey: false
        updated: false
        isNull: false
        value: "test2@baomidou.com"
        mysqlType: "varchar(50)"
      }
    }
    
    ================> binlog[mysql-bin.000001:4461] , name[haoke,user] , eventType : UPDATE
    -------> before
    id : 2    update=false
    name : 小明    update=false
    age : 20    update=false
    email : test2@baomidou.com    update=false
    -------> after
    id : 2    update=false
    name : 小张    update=true
    age : 20    update=false
    email : test2@baomidou.com    update=false

    测试回滚

    mysql> START TRANSACTION;
    Query OK, 0 rows affected (0.00 sec)
     
    mysql> update `user` s set s.name='小张2' where id=2;
    Query OK, 1 row affected (0.00 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    mysql> rollback;
    Query OK, 0 rows affected (0.00 sec)

    并没有消费

    Adapter

    可以参考一下源码实现client

     http://www.mamicode.com/info-detail-2851627.html

  • 相关阅读:
    美化的滚动条
    网站系统开发参考网址
    正则表达式获取URL参数
    类实例 及 实例化对象 对象引用
    C# 静态方法 静态属性 调用静态方法
    C# 静态方法调用非静态方法
    winform 窗体间传值
    从数据库中读出数据并输出
    数据库链接字符串
    DbHelper
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12177295.html
Copyright © 2011-2022 走看看