zoukankan      html  css  js  c++  java
  • 使用 Canal 增量同步 MySQL 到 ES (下)

    原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/15077622.html

    Canal Configuration

    MySQL 授权

    授权 Canal 链接 MySQL 账号具有作为 MySQL Slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    Note:对于自建 MySQL , 需要先开启 binlog 写入功能,配置 binlog-format 为 ROW 模式

    MySQL 中新建 DB、Table

    CREATE DATABASE test;
    
    USE test;
    
    CREATE TABLE `tb_user` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
      `username` varchar(30) DEFAULT NULL,
      `password` varchar(30) DEFAULT NULL,
      `age` int(10) unsigned DEFAULT NULL,
      `create_by` varchar(30) NOT NULL,
      `update_by` varchar(30) NOT NULL,
      `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
      `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

    ES 中新建 index

    PUT /test_user
    {
      "mappings": {
        "properties": {
          "username": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "age": {
            "type": "long"
          },
          "create_time": {
            "type": "date"
          },
          "update_time": {
            "type": "date"
          }
        }
      }
    }

    Canal Deployer Configruation

    下载安装包

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

    新建目录

    mkdir -p ~/app/canal-deployer

    解压安装包到目录

    tar zxvf canal.deployer-1.1.5.tar.gz -C ~/app/canal-deployer

    修改 instance.properties

    vi ~/app/canal-deployer/conf/example/instance.properties

    修改这4行

    canal.instance.mysql.slaveId=0
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123456

    完整的 instance.properties 如下

    #################################################
    ## mysql serverId , v1.0.26+ will autoGen
    canal.instance.mysql.slaveId=0
    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # position info
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal
    
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=
    
    # username/password
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123456
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
    
    # table regex
    canal.instance.filter.regex=.*\..*
    # table black regex
    canal.instance.filter.black.regex=mysql\.slave_.*
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
    
    # mq config
    canal.mq.topic=example
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=test.table:id^name,.*\..*
    #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
    #################################################

    启动

    cd ~/app/canal-deployer/bin
    ./startup.sh

    Canal Adapter Configruation

    下载安装包

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

    新建目录

    mkdir -p ~/app/canal-adapter

    解压安装包到目录

    tar zxvf canal.adapter-1.1.5.tar.gz -C ~/app/canal-adapter

    修改 application.yml

    vi ~/app/canal-adapter/conf/application.yml

    完整的 application.yml 如下

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp #tcp kafka rocketMQ rabbitMQ
      flatMessage: true
      zookeeperHosts:
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      consumerProperties:
        # canal tcp consumer
        canal.tcp.server.host: 127.0.0.1:11111
        canal.tcp.zookeeper.hosts:
        canal.tcp.batch.size: 500
        canal.tcp.username:
        canal.tcp.password:
        # kafka consumer
        kafka.bootstrap.servers: 127.0.0.1:9092
        kafka.enable.auto.commit: false
        kafka.auto.commit.interval.ms: 1000
        kafka.auto.offset.reset: latest
        kafka.request.timeout.ms: 40000
        kafka.session.timeout.ms: 30000
        kafka.isolation.level: read_committed
        kafka.max.poll.records: 1000
        # rocketMQ consumer
        rocketmq.namespace:
        rocketmq.namesrv.addr: 127.0.0.1:9876
        rocketmq.batch.size: 1000
        rocketmq.enable.message.trace: false
        rocketmq.customized.trace.topic:
        rocketmq.access.channel:
        rocketmq.subscribe.filter:
        # rabbitMQ consumer
        rabbitmq.host:
        rabbitmq.virtual.host:
        rabbitmq.username:
        rabbitmq.password:
        rabbitmq.resource.ownerId:
    
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
          username: root
          password: 123456
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
    #      - name: rdb
    #        key: mysql1
    #        properties:
    #          jdbc.driverClassName: com.mysql.jdbc.Driver
    #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    #          jdbc.username: root
    #          jdbc.password: 121212
    #      - name: rdb
    #        key: oracle1
    #        properties:
    #          jdbc.driverClassName: oracle.jdbc.OracleDriver
    #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    #          jdbc.username: mytest
    #          jdbc.password: m121212
    #      - name: rdb
    #        key: postgres1
    #        properties:
    #          jdbc.driverClassName: org.postgresql.Driver
    #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
    #          jdbc.username: postgres
    #          jdbc.password: 121212
    #          threads: 1
    #          commitSize: 3000
    #      - name: hbase
    #        properties:
    #          hbase.zookeeper.quorum: 127.0.0.1
    #          hbase.zookeeper.property.clientPort: 2181
    #          zookeeper.znode.parent: /hbase
          - name: es7
            key: es7key
            hosts: http://127.0.0.1:9200
            properties:
              mode: rest
              # security.auth: test:123456 #  only used for rest mode
              cluster.name: docker-cluster
    #        - name: kudu
    #          key: kudu
    #          properties:
    #            kudu.master.address: 127.0.0.1 # ',' split multi address

    在 ~/app/canal-adapter/conf/es7/ 目下新建一个 test_user.yml

    vi ~/app/canal-adapter/conf/es7/test_user.yml

    新增如下

    dataSourceKey: defaultDS
    outerAdapterKey: es7key
    destination: example
    groupId: g1
    esMapping:
      _index: test_user
      _id: _id
      upsert: true
      sql: "select a.id as _id, a.username, a.age, a.create_time, a.update_time from tb_user a"
      commitBatch: 3000

    启动

    cd ~/app/canal-adapter/bin
    ./startup.sh

    启动过程中会遇到一个超级大坑

    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
        at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]

    具体解决方案可以参考 Canal Adapter com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

    Test

    解决完 jar 包冲突后,再次重启后 canal adapter 后,测试 MySQL 增量数据同步是否成功

    未插入数据前

    GET /test_user/_search

    插入数据

    INSERT INTO `tb_user` (`id`, `username`, `password`, `age`, `create_by`, `update_by`, `create_time`, `update_time`)
    VALUES
    (1, 'canal1', 'canal1', 28, 'admin', 'admin', '2021-07-01 00:00:00.000', '2021-07-01 00:00:00.000');

    再次执行查询

     也可以执行单个查询

    GET /test_user/_doc/1

    更新数据

    update tb_user set age = 38 where id = 1; 

    再次查询

    删除数据

    delete from tb_user where id = 1;

    再次查询

    Reference

    https://github.com/alibaba/canal/wiki/QuickStart

    https://github.com/alibaba/canal/releases


    欢迎点赞关注和收藏

      

    强者自救 圣者渡人
  • 相关阅读:
    自然语言交流系统 phxnet团队 创新实训 项目博客 (十一)
    install ubuntu on Android mobile phone
    Mac OS, Mac OSX 与Darwin
    About darwin OS
    自然语言交流系统 phxnet团队 创新实训 项目博客 (十)
    Linux下编译安装qemu和libvirt
    libvirt(virsh命令总结)
    深入浅出 kvm qemu libvirt
    自然语言交流系统 phxnet团队 创新实训 项目博客 (九)
    自然语言交流系统 phxnet团队 创新实训 项目博客 (八)
  • 原文地址:https://www.cnblogs.com/agilestyle/p/15077622.html
Copyright © 2011-2022 走看看