zoukankan      html  css  js  c++  java
  • canal同步mysql数据

    整体架构图:

    canal.depoyer端配置

     1、mysql开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    3、配置canal.deployer  将canal server接收到的binlog数据直接投递到MQ

    参考文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    下载 https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz 

    解压canal.deployer-1.1.5.tar.gz 后结构如下:

    a、修改canal 配置文件   conf/canal.properties

    # tcp, kafka, rocketMQ, rabbitMQ
    canal.serverMode = kafka
    ..........
    ##################################################
    #########                    Kafka                   #############
    ##################################################
    kafka.bootstrap.servers = 127.0.0.1:9092 #修改成实际的kafka地址
    kafka.acks = all
    kafka.compression.type = none
    kafka.batch.size = 16384
    kafka.linger.ms = 1
    kafka.max.request.size = 1048576
    kafka.buffer.memory = 33554432
    kafka.max.in.flight.requests.per.connection = 1
    kafka.retries = 0
    
    kafka.kerberos.enable = false
    kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
    kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

    b、修改instance 配置文件 conf/example/instance.properties

    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=127.0.0.1:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...
    #table regex
    canal.instance.filter.regex=sam_cms\..* #配置哪个数据库需要同步
      # mq config
      canal.mq.topic=sam_cms
    4、启动
    sh bin/startup.sh

    配置canal.adapter

    1、下载 canal.adapter

         https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

    解压后目录结构

    2、配置canal.adapter 接收kafka 消息,把数据同步到mysql

    参考文档:https://github.com/alibaba/canal/wiki/ClientAdapter

    a、修改 conf/application.yml

    canal.conf:
       mode: kafka #tcp kafka rocketMQ rabbitMQ
       consumerProperties:
            # kafka consumer
            kafka.bootstrap.servers: 192.168.0.103:9092 #实际地址
       canalAdapters:
       - instance: sam_cms # canal instance Name or mq topic name
         groups:
         - groupId: g1
           outerAdapters:
           - name: logger
           - name: rdb
             key: mysql1
             properties:
               jdbc.driverClassName: com.mysql.jdbc.Driver
               jdbc.url: jdbc:mysql://127.0.0.1:3306/sam_cms?useUnicode=true
               jdbc.username: root
               jdbc.password: 123456

    b、修改配置文件 /conf/rdb/mytest_user.yml

    destination: sam_cms #cannal的instance或者MQ的topic
    groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
    outerAdapterKey: mysql1  #adapter key, 对应上面配置outAdapters中的key
    concurrent: true #是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
    dbMapping:
     mirrorDb: true
     database: sam_cms #源数据源的database/shcema

    3、启动

    bin/startup.sh
  • 相关阅读:
    不常用的cmd命令
    js获取宽度
    Marshaling Data with Platform Invoke 概览
    Calling a DLL Function 之三 How to: Implement Callback Functions
    Marshaling Data with Platform Invoke 之四 Marshaling Arrays of Types
    Marshaling Data with Platform Invoke 之一 Platform Invoke Data Types
    Marshaling Data with Platform Invoke 之三 Marshaling Classes, Structures, and Unions(用时查阅)
    Calling a DLL Function 之二 Callback Functions
    WCF 引论
    Marshaling Data with Platform Invoke 之二 Marshaling Strings (用时查阅)
  • 原文地址:https://www.cnblogs.com/hwd-cnblogs/p/15066763.html
Copyright © 2011-2022 走看看