zoukankan      html  css  js  c++  java
  • 使用 Canal 增量同步 MySQL 到 ES (下)

    原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/15077622.html

    Canal Configuration

    MySQL 授权

    授权 Canal 链接 MySQL 账号具有作为 MySQL Slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    Note:对于自建 MySQL , 需要先开启 binlog 写入功能,配置 binlog-format 为 ROW 模式

    MySQL 中新建 DB、Table

    CREATE DATABASE test;
    
    USE test;
    
    CREATE TABLE `tb_user` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
      `username` varchar(30) DEFAULT NULL,
      `password` varchar(30) DEFAULT NULL,
      `age` int(10) unsigned DEFAULT NULL,
      `create_by` varchar(30) NOT NULL,
      `update_by` varchar(30) NOT NULL,
      `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
      `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

    ES 中新建 index

    PUT /test_user
    {
      "mappings": {
        "properties": {
          "username": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "age": {
            "type": "long"
          },
          "create_time": {
            "type": "date"
          },
          "update_time": {
            "type": "date"
          }
        }
      }
    }

    Canal Deployer Configruation

    下载安装包

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

    新建目录

    mkdir -p ~/app/canal-deployer

    解压安装包到目录

    tar zxvf canal.deployer-1.1.5.tar.gz -C ~/app/canal-deployer

    修改 instance.properties

    vi ~/app/canal-deployer/conf/example/instance.properties

    修改这4行

    canal.instance.mysql.slaveId=0
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123456

    完整的 instance.properties 如下

    #################################################
    ## mysql serverId , v1.0.26+ will autoGen
    canal.instance.mysql.slaveId=0
    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # position info
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal
    
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=
    
    # username/password
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123456
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
    
    # table regex
    canal.instance.filter.regex=.*\..*
    # table black regex
    canal.instance.filter.black.regex=mysql\.slave_.*
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
    
    # mq config
    canal.mq.topic=example
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=test.table:id^name,.*\..*
    #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
    #################################################

    启动

    cd ~/app/canal-deployer/bin
    ./startup.sh

    Canal Adapter Configruation

    下载安装包

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

    新建目录

    mkdir -p ~/app/canal-adapter

    解压安装包到目录

    tar zxvf canal.adapter-1.1.5.tar.gz -C ~/app/canal-adapter

    修改 application.yml

    vi ~/app/canal-adapter/conf/application.yml

    完整的 application.yml 如下

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp #tcp kafka rocketMQ rabbitMQ
      flatMessage: true
      zookeeperHosts:
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      consumerProperties:
        # canal tcp consumer
        canal.tcp.server.host: 127.0.0.1:11111
        canal.tcp.zookeeper.hosts:
        canal.tcp.batch.size: 500
        canal.tcp.username:
        canal.tcp.password:
        # kafka consumer
        kafka.bootstrap.servers: 127.0.0.1:9092
        kafka.enable.auto.commit: false
        kafka.auto.commit.interval.ms: 1000
        kafka.auto.offset.reset: latest
        kafka.request.timeout.ms: 40000
        kafka.session.timeout.ms: 30000
        kafka.isolation.level: read_committed
        kafka.max.poll.records: 1000
        # rocketMQ consumer
        rocketmq.namespace:
        rocketmq.namesrv.addr: 127.0.0.1:9876
        rocketmq.batch.size: 1000
        rocketmq.enable.message.trace: false
        rocketmq.customized.trace.topic:
        rocketmq.access.channel:
        rocketmq.subscribe.filter:
        # rabbitMQ consumer
        rabbitmq.host:
        rabbitmq.virtual.host:
        rabbitmq.username:
        rabbitmq.password:
        rabbitmq.resource.ownerId:
    
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
          username: root
          password: 123456
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
    #      - name: rdb
    #        key: mysql1
    #        properties:
    #          jdbc.driverClassName: com.mysql.jdbc.Driver
    #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    #          jdbc.username: root
    #          jdbc.password: 121212
    #      - name: rdb
    #        key: oracle1
    #        properties:
    #          jdbc.driverClassName: oracle.jdbc.OracleDriver
    #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    #          jdbc.username: mytest
    #          jdbc.password: m121212
    #      - name: rdb
    #        key: postgres1
    #        properties:
    #          jdbc.driverClassName: org.postgresql.Driver
    #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
    #          jdbc.username: postgres
    #          jdbc.password: 121212
    #          threads: 1
    #          commitSize: 3000
    #      - name: hbase
    #        properties:
    #          hbase.zookeeper.quorum: 127.0.0.1
    #          hbase.zookeeper.property.clientPort: 2181
    #          zookeeper.znode.parent: /hbase
          - name: es7
            key: es7key
            hosts: http://127.0.0.1:9200
            properties:
              mode: rest
              # security.auth: test:123456 #  only used for rest mode
              cluster.name: docker-cluster
    #        - name: kudu
    #          key: kudu
    #          properties:
    #            kudu.master.address: 127.0.0.1 # ',' split multi address

    在 ~/app/canal-adapter/conf/es7/ 目下新建一个 test_user.yml

    vi ~/app/canal-adapter/conf/es7/test_user.yml

    新增如下

    dataSourceKey: defaultDS
    outerAdapterKey: es7key
    destination: example
    groupId: g1
    esMapping:
      _index: test_user
      _id: _id
      upsert: true
      sql: "select a.id as _id, a.username, a.age, a.create_time, a.update_time from tb_user a"
      commitBatch: 3000

    启动

    cd ~/app/canal-adapter/bin
    ./startup.sh

    启动过程中会遇到一个超级大坑

    java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
        at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]

    具体解决方案可以参考 Canal Adapter com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

    Test

    解决完 jar 包冲突后,再次重启后 canal adapter 后,测试 MySQL 增量数据同步是否成功

    未插入数据前

    GET /test_user/_search

    插入数据

    INSERT INTO `tb_user` (`id`, `username`, `password`, `age`, `create_by`, `update_by`, `create_time`, `update_time`)
    VALUES
    (1, 'canal1', 'canal1', 28, 'admin', 'admin', '2021-07-01 00:00:00.000', '2021-07-01 00:00:00.000');

    再次执行查询

     也可以执行单个查询

    GET /test_user/_doc/1

    更新数据

    update tb_user set age = 38 where id = 1; 

    再次查询

    删除数据

    delete from tb_user where id = 1;

    再次查询

    Reference

    https://github.com/alibaba/canal/wiki/QuickStart

    https://github.com/alibaba/canal/releases


    欢迎点赞关注和收藏

      

    强者自救 圣者渡人
  • 相关阅读:
    java枚举常见用法
    redis初使用
    Linux上搭建svn资源库
    redis集群创建
    大数据学习之Hadoop运行模式
    集群时间同步
    ssh免密登录
    mvc项目问题清单以及解决方法
    Memcached分布式缓存初体验
    Asp.Net 一个请求的处理流程
  • 原文地址:https://www.cnblogs.com/agilestyle/p/15077622.html
Copyright © 2011-2022 走看看