zoukankan      html  css  js  c++  java
  • canal同步mysql到elasticsearch

    1、mysql配置

    1、编辑mysql配置文件

    docker exec -it mysql5.7 /bin/bash    #mysql5.7为容器名称
    cd etc
    vi my.cnf
    

    新增如下配置:

    log-bin=mysql-bin          #添加这一行就ok
    binlog-format=ROW          #选择row模式
    server_id=1                #配置mysql replaction需要定义,不能和canal的slaveId重复
    expire_logs_days=5         #日志过期时间为5天      
    

    2、新建数据库用户,并赋予相应权限

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

    3、新建测试数据库

    新建mytest数据库,并新建两张表

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    -- ----------------------------
    -- Table structure for role
    -- ----------------------------
    DROP TABLE IF EXISTS `role`;
    CREATE TABLE `role`  (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `role_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
    
    -- ----------------------------
    -- Table structure for user
    -- ----------------------------
    DROP TABLE IF EXISTS `user`;
    CREATE TABLE `user`  (
      `id` bigint(255) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      `role_id` bigint(20) NULL DEFAULT NULL,
      `c_time` datetime(0) NULL DEFAULT NULL,
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
    
    SET FOREIGN_KEY_CHECKS = 1; 

    2、安装canal-server和canal-admin

    1、安装

    具体安装操作可以参考canal-server安装canal-admin安装  

    主要有几个注意点:

    1. 如果两个都要装,建议先安装canal-admin,我在实际操作过程中遇到了先安装canal-server之后再安装canal-admin,canal-admin的server管理中发现不了之前安装的服务
    2. 如果先安装了canal-server,可以修改conf/canal_local.properties文件中的配置,这样canal-admin可以发现canal-server

    2、配置

    打开canal-admin,可以在server管理页面下看到安装好的server

    打开Instance管理页面新增实例,并修改相应配置

    3、在ElasticSearch中新建两个索引

    4、运行canal-adapter

    1、下载canel-1.1.4版本源码

    git clone -b canal-1.1.4 https://github.com/alibaba/canal.git
    

    2、打开其中的client-adapter

    3、修改launcher的application.yml

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp # kafka rocketMQ
      canalServerHost: 192.168.1.223:11111    #修改为canal-server的IP,端口不变
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://192.168.1.223:3306/mytest?useUnicode=true 
          username: canal
          password: canal
      canalAdapters:
      - instance: lietou # canal instance Name or mq topic name 此处的实例名为上文创建时候的实例名称
        groups:
        - groupId: g1
          outerAdapters:
          - name: es
            hosts: 192.168.1.223:9200 # 127.0.0.1:9200 for rest mode
            properties:
              mode: rest # or rest|transport 此处默认是transport,修改为rest
              cluster.name: docker-cluster #此处的名称可以通过上面配置的hosts的地址在浏览器访问查看
    

    4、在launcher下的的resources下新建es文件夹

    新增mytest_role.yml

    dataSourceKey: defaultDS #与application.yml中的srcDataSources对应
    destination: lietou #此处的实例名为上文配置的名称
    groupId: g1
    esMapping:
      _index: mytest_role
      _type: _doc
      _id: _id
      upsert: true
      #  pk: id
      sql: "select a.id as _id, a.role_name as _role_name from role a"
      #  objFields:
      #    _labels: array:;
      etlCondition: "where a.id>={}"
      commitBatch: 3000

      新增mytest_user.yml

    dataSourceKey: defaultDS #与application.yml中的srcDataSources对应
    destination: lietou  #此处的实例名为上文配置的名称
    groupId: g1
    esMapping:
      _index: mytest_user
      _type: _doc
      _id: _id
      upsert: true
      #  pk: id
      sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
            a.c_time as _c_time from user a
            left join role b on b.id=a.role_id"
      #  objFields:
      #    _labels: array:;
      etlCondition: "where a.c_time>={}"
      commitBatch: 3000
    

    5、启动launher项目

    然后在数据库中新增数据查看效果是否成功

    查询elasticsearch是否成功写入

    6、全量同步

    curl  -X POST http://192.168.1.14:8081/etl/es/mytest_sysuser.yml   #此处的ip地址为运行adapter机器的IP
    

      

    至此,使用canel同步mysql到elasticsearch成功!

  • 相关阅读:
    Oracle 的字符集与乱码
    linux 时间同步的2种方法
    2 创建型模式-----工厂方法模式
    条款4:确定对象在使用前已被初始化
    条款3:尽可能地使用const
    条款2:尽量以const、enum、inline替换#define
    条款13:以对象管理资源
    条款12:牢记复制对象的所有成员
    条款11:在operator=中处理“自我赋值”
    条款10:令operator=返回一个*this的引用
  • 原文地址:https://www.cnblogs.com/adawoo/p/12484941.html
Copyright © 2011-2022 走看看