zoukankan      html  css  js  c++  java
  • Canal——增量同步MySQL数据到ElasticSearch

    前言

    本篇只介绍 TCP模式 下详细的canal相关配置。

    1.准备

    1.1.组件

      JDK:1.8版本及以上;

      ElasticSearch:6.x版本,目前貌似不支持7.x版本;

          Kibana:6.x版本;

         Canal.deployer:1.1.4

       Canal.Adapter:1.1.4

     

    1.2.配置

    • 需要先开启MySQL的 binlog 写入功能,配置 binlog-formatROW 模式

    找到my.cnf文件,我的目录是/etc/my.cnf,添加以下配置:

    log-bin=mysql-bin   # 开启 binlog
    binlog-format=ROW   # 选择 ROW 模式
    server_id=1        # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

     然后重启mysql,用以下命令检查一下binlog是否正确启动:

    mysql> show variables like 'log_bin%';
    +---------------------------------+----------------------------------+
    | Variable_name                   | Value                            |
    +---------------------------------+----------------------------------+
    | log_bin                         | ON                               |
    | log_bin_basename                | /data/mysql/data/mysql-bin       |
    | log_bin_index                   | /data/mysql/data/mysql-bin.index |
    | log_bin_trust_function_creators | OFF                              |
    | log_bin_use_v1_row_events       | OFF                              |
    +---------------------------------+----------------------------------+
    5 rows in set (0.00 sec)
    mysql
    > show variables like 'binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
    • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
    CREATE USER canal IDENTIFIED BY 'Aa123456.';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

    2.安装

    2.1.ElasticSearch

    安装配置方法:https://www.cnblogs.com/caoweixiong/p/11826295.html

    2.2.canal.deployer

    2.2.1.下载解压

    直接下载
    
    访问:https://github.com/alibaba/canal/releases ,会列出所有历史的发布版本包 下载方式,比如以1.1.4版本为例子:
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    
    自己编译
    git clone git@github.com:alibaba/canal.git
    cd canal; 
    mvn clean install -Dmaven.test.skip -Denv=release
    编译完成后,会在根目录下产生target/canal.deployer-$version.tar.gz
    mkdir /usr/local/canal
    tar zxvf canal.deployer-1.1.4.tar.gz  -C /usr/local/canal

    解压完成后,进入 /usr/local/canal目录,可以看到如下结构:

    2.2.2.配置

    • 配置server
    cd /usr/local/canal/conf
    vi canal.properties

    标红的需要我们重点关注的,也是平常修改最多的参数:

    #################################################
    #########               common argument         #############
    #################################################
    # tcp bind ip
    canal.ip =   #canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务,默认:无
    # register ip to zookeeper
    canal.register.ip =  #运行canal-server服务的主机IP,可以不用配置,他会自动绑定一个本机的IP
    canal.port = 11111   #canal-server监听的端口(TCP模式下,非TCP模式不监听1111端口)
    canal.metrics.pull.port = 11112 #canal-server metrics.pull监听的端口
    # canal instance user/passwd
    # canal.user = canal
    # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
    
    # canal admin config
    #canal.admin.manager = 127.0.0.1:8089
    canal.admin.port = 11110
    canal.admin.user = admin
    canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
    
    canal.zkServers =  #canal server链接zookeeper集群的链接信息,集群模式下要配置zookeeper进行协调配置,单机模式可以不用配置
    # flush data to zk
    canal.zookeeper.flush.period = 1000  #canal持久化数据到zookeeper上的更新频率,单位毫秒
    canal.withoutNetty = false
    # tcp, kafka, RocketMQ
    canal.serverMode = tcp #canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir} #存放数据的路径
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 16384
    ## memory store RingBuffer used memory unit size , default 1kb  #下面是一些系统参数的配置,包括内存、网络等
    canal.instance.memory.buffer.memunit = 1024
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE
    canal.instance.memory.rawEntry = true
    
    ## detecing config  #这里是心跳检查的配置,做HA时会用到
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60
    
    # network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    
    # binlog filter config  #binlog过滤的配置,指定过滤那些SQL
    canal.instance.filter.druid.ddl = true       
    canal.instance.filter.query.dcl = false    #是否忽略DCL的query语句,比如grant/create user等,默认false
    canal.instance.filter.query.dml = false    #是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query记录),默认false
    canal.instance.filter.query.ddl = false    #是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index.
                                 (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型),默认false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.filter.transaction.entry = false
    
    # binlog format/image check #binlog格式检测,使用ROW模式,非ROW模式也不会报错,但是同步不到数据
    canal.instance.binlog.format = ROW,STATEMENT,MIXED
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    # parallel parser config
    canal.instance.parser.parallel = true  #并行解析配置,如果是单个CPU就把下面这个true改为false
    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    #canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
    # table meta tsdb info
    canal.instance.tsdb.enable = true
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername = canal
    canal.instance.tsdb.dbPassword = canal
    # dump snapshot interval, default 24 hour
    canal.instance.tsdb.snapshot.interval = 24
    # purge snapshot expire , default 360 hour(15 days)
    canal.instance.tsdb.snapshot.expire = 360
    
    # aliyun ak/sk , support rds/mq
    canal.aliyun.accessKey =
    canal.aliyun.secretKey =
    
    #################################################
    #########               destinations            #############
    #################################################
    canal.destinations = example       #canal-server创建的实例,在这里指定你要创建的实例的名字,比如test1,test2等,逗号隔开
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = spring
    canal.instance.global.lazy = false
    canal.instance.global.manager.address = ${canal.admin.manager}
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    #########                    MQ                      #############
    ##################################################
    canal.mq.servers = 127.0.0.1:6667
    canal.mq.retries = 0
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    canal.mq.lingerMs = 100
    canal.mq.bufferMemory = 33554432
    canal.mq.canalBatchSize = 50
    canal.mq.canalGetTimeout = 100
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    #canal.mq.properties. =
    canal.mq.producerGroup = test
    # Set this value to "cloud", if you want open message trace feature in aliyun.
    canal.mq.accessChannel = local
    # aliyun mq namespace
    #canal.mq.namespace =
    
    ##################################################
    #########     Kafka Kerberos Info    #############
    ##################################################
    canal.mq.kafka.kerberos.enable = false
    canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
    canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
    • 配置example

    在根配置文件中创建了实例名称之后,需要在根配置的同级目录下创建该实例目录,canal-server为我们提供了一个示例的实例配置,因此我们可以直接复制该示例,举个例子吧:根配置配置了如下实例:

    [root@aliyun conf]# vim canal.properties
    ...
    canal.destinations = user_order,delivery_info
    ...
    
    我们需要在根配置的同级目录下创建这两个实例
    [root@aliyun conf]# pwd
    /usr/local/canal-server/conf
    [root@aliyun conf]# cp -a example/ user_order
    [root@aliyun conf]# cp -a example/ delivery_info

    这里只举例1个example的配置:

    vi /usr/local/canal/conf/example/instance.properties

    标红的需要我们重点关注的,也是平常修改最多的参数:

    ################################################### 
    mysql serverId , v1.0.26+ will autoGencanal.instance.mysql.slaveId=11

    # enable gtid use
    true/false canal.instance.gtidon=false # position info canal.instance.master.address=172.16.10.26:3306 #指定要读取binlog的MySQL的IP地址和端口 canal.instance.master.journal.name= #从指定的binlog文件开始读取数据 canal.instance.master.position= #指定偏移量,做过主从复制的应该都理解这两个参数。
    #tips:binlog和偏移量也可以不指定,则canal-server会从当前的位置开始读取。我建议不设置
    canal.instance.master.timestamp= #mysql主库链接时起始的binlog的时间戳,默认:无 canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal
    #这几个参数是设置高可用配置的,可以配置mysql从库的信息 #canal.instance.standby.address
    = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal #指定连接mysql的用户密码 canal.instance.dbPassword=Aa123456. canal.instance.connectionCharset = UTF-8 #字符集 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex
    # mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
    # 常见例子:
    # 1. 所有表:.* or .*\..*
    # 2. canal schema下所有表: canal\..*
    # 3. canal下的以canal打头的表:canal\.canal.*
    # 4. canal schema下的一张表:canal.test1
    # 5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    canal.instance.filter.regex=risk.canal,risk.cwx #这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user

    # table black regex
    canal.instance.filter.black.regex=
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
    
    # mq config
    canal.mq.topic=example
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=test.table:id^name,.*\..*
    #################################################

    2.2.3.启动

    bin/startup.sh
    • 查看 server 日志
    vi logs/canal/canal.log
    2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
    2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    • 查看 instance 的日志
    vi logs/example/example.log
    2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
    2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
    • 关闭
    bin/stop.sh

    2.3.canal.adapter

    2.3.1.下载解压

    访问:https://github.com/alibaba/canal/releases ,会列出所有历史的发布版本包 下载方式,比如以1.1.4版本为例子:
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
    mkdir /usr/local/canal-adapter
    tar zxvf canal.adapter-1.1.4.tar.gz -C /usr/local/canal-adapter

    2.3.2.配置

    • adapter配置
    cd /usr/local/canal-adapter
    vim conf/application.yml

    标红的需要我们重点关注的,也是平常修改最多的参数:

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp                                       #模式
      canalServerHost: 127.0.0.1:11111                         #指定canal-server的地址和端口
    #  zookeeperHosts: slave1:2181
    #  mqServers: 127.0.0.1:9092 #or rocketmq
    #  flatMessage: true
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:                                   #数据源配置,从哪里获取数据
        defaultDS:                                      #指定一个名字,在ES的配置中会用到,唯一
          url: jdbc:mysql://172.16.10.26:3306/risk?useUnicode=true
          username: root
          password: 123456
      canalAdapters:
      - instance: example # canal instance Name or mq topic name         #指定在canal-server配置的实例
        groups:
        - groupId: g1                                   #默认就好,组标识
          outerAdapters:
          - name: logger
    #      - name: rdb
    #        key: mysql1
    #        properties:
    #          jdbc.driverClassName: com.mysql.jdbc.Driver
    #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    #          jdbc.username: root
    #          jdbc.password: 121212
    #      - name: rdb
    #        key: oracle1
    #        properties:
    #          jdbc.driverClassName: oracle.jdbc.OracleDriver
    #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    #          jdbc.username: mytest
    #          jdbc.password: m121212
    #      - name: rdb
    #        key: postgres1
    #        properties:
    #          jdbc.driverClassName: org.postgresql.Driver
    #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
    #          jdbc.username: postgres
    #          jdbc.password: 121212
    #          threads: 1
    #          commitSize: 3000
    #      - name: hbase
    #        properties:
    #          hbase.zookeeper.quorum: 127.0.0.1
    #          hbase.zookeeper.property.clientPort: 2181
    #          zookeeper.znode.parent: /hbase
          - name: es                                   #输出到哪里,指定es
            hosts: 172.16.99.2:40265                          #指定es的地址,注意端口为es的传输端口9300
            properties:
              # mode: transport # or rest
              # security.auth: test:123456 #  only used for rest mode
              cluster.name: log-es-cluster                      #指定es的集群名称
    • es配置
    [root@aliyun es]# pwd
    /usr/local/canal-adapter/conf/es
    [root@aliyun es]# ll
    total 12
    -rwxrwxrwx 1 root root 466 Apr  4 10:27 biz_order.yml          #这三个配置文件是自带的,可以删除,不过最好不要删除,因为可以参考他的格式
    -rwxrwxrwx 1 root root 855 Apr  4 10:27 customer.yml
    -rwxrwxrwx 1 root root 416 Apr  4 10:27 mytest_user.yml

    创建canal.yml文件:

    cp customer.yml canal.yml
    vim conf/es/canal.yml

    标红的需要我们重点关注的,也是平常修改最多的参数:

    dataSourceKey: defaultDS   #指定数据源,这个值和adapter的application.yml文件中配置的srcDataSources值对应。
    destination: example       #指定canal-server中配置的某个实例的名字,注意:我们可能配置多个实例,你要清楚的知道每个实例收集的是那些数据,不要瞎搞。
    groupId: g1                #组ID,默认就好
    esMapping:                 #ES的mapping(映射)
      _index: canal            #要同步到的ES的索引名称(自定义),需要自己在ES上创建哦!
      _type: _doc              #ES索引的类型名称(自定义)
      _id: _id                 #ES标示文档的唯一标示,通常对应数据表中的主键ID字段,注意我这里写成的是"_id",有个下划线哦!
    #pk: id #如果不需要_id, 则需要指定一个属性为主键属性
    sql: "select t.id as _id, t.name, t.sex, t.age, t.amount, t.email, t.occur_time from canal t" #这里就是数据表中的每个字段到ES索引中叫什么名字的sql映射,注意映射到es中的每个字段都要是唯一的,不能重复。
    #etlCondition: "where t.occur_time>='{0}'"
    commitBatch: 3000

    sql映射文件写完之后,要去ES上面创建对应的索引和映射,映射要求要和sql文件的映射保持一致,即sql映射中有的字段在ES的索引映射中必须要有,否则同步会报字段错误,导致失败。

    2.3.3.创建mysql表和es索引

    CREATE TABLE `canal` (
    id int(11) NOT NULL AUTO_INCREMENT,
    name varchar(20) NULL COMMENT '名称',
    sex  varchar(2) NULL COMMENT '性别',
    age  int NULL COMMENT '年龄',
    amount decimal(12,2) NULL COMMENT '资产',
    email  varchar(50) NULL COMMENT '邮箱',
    occur_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`ID`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
    {
        "mappings": {
            "_doc": {
                "properties": {
                    "id": {
                        "type": "long"
                    },
                    "name": {
                        "type": "text"
                    },
                    "sex": {
                        "type": "text"
                    },
                    "age": {
                        "type": "long"
                    },
                    "amount": {
                        "type": "text"
                    },
                    "email": {
                        "type": "text"
                    },
                    "occur_time": {
                        "type": "date"
                    }
                }
            }
        }
    }

    2.3.4.启动

    cd /usr/local/canal-adapter
    ./bin/startup.sh

    查看日志:

    cat logs/adapter/adapter.log

    2.4.Kibana

    安装配置方法:https://www.cnblogs.com/caoweixiong/p/11826655.html

    3.验证

    • 没有数据时:

    • 插入1条数据:
    insert into canal(id,name,sex,age,amount,email,occur_time) values(null,'cwx','',18,100000000,'249299170@qq.com',now());

     

    • 更新1条数据:
    update canal set name='cwx1',sex='',age=28,amount=200000,email='asdf',occur_time=now() where id=16;

     

    • 删除1条数据:
    delete from canal where id=16;

     

    4.总结

     4.1.全量更新不能实现,但是增删改都是可以的;

     4.2.一定要提前创建好es索引;

     4.3.es配置的是tcp端口,比如默认的9300;

     4.4.目前es貌似支持6.x版本,不支持7.x版本;

  • 相关阅读:
    摄像头调试
    OpenGL学习记录
    Ubuntu使用操作记录/笔记
    ROS学习材料/链接
    ubuntu14 16使用libusb过程中遇到的问题及解决方法
    nodejs: 版本常识
    JS:Html事件处理程序 vs DOM0级事件处理程序 vs DOM2级事件处理程序
    网站性能优化(一)
    Css布局:左边固定,右边自适应
    css实现显示隐藏的5种方法
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/11825303.html
Copyright © 2011-2022 走看看