zoukankan      html  css  js  c++  java
  • Canal——canal server 读取 binlog 到 kafka 然后在使用 canal-adapter

    前言

    本篇只介绍跟 Kafka模式 相关的配置。

    一、架构

    二、canal-server 配置

    • 修改canal 配置文件: vi /usr/local/canal/conf/canal.properties

    ...
    # 可选项: tcp(默认), kafka, RocketMQ
    canal.serverMode = kafka
    ...
    # kafka
    /rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:6667 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false
    • 修改instance 配置文件: vi conf/example/instance.properties

    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=192.168.1.20:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...
    # mq config
    canal.mq.topic=rds_test
    # 针对库名或者表名发送动态topic
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #库名.表名: 唯一主键,多个表之间用逗号分隔
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################

    三、canal-adapter 配置

    • 修改adapter 配置文件:vi conf/application.yml
    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: kafka # tcp kafka rocketMQ
    #  canalServerHost: 127.0.0.1:11111
    #  zookeeperHosts: slave1:2181
      mqServers: 127.0.0.1:9092 #or rocketmq
    #  flatMessage: true
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
    
      canalAdapters:
      - instance: rds_test # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
           - name: rdb
             key: oracle1
             properties:
               jdbc.driverClassName: oracle.jdbc.OracleDriver
               jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
               jdbc.username: system
               jdbc.password: manager
        - groupId: g2
          outerAdapters:
           - name: rdb
             key: oracle2
             properties:
               jdbc.driverClassName: oracle.jdbc.OracleDriver
               jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
               jdbc.username: system
               jdbc.password: manager

    说明: 一份数据可以被多个 group 同时消费, 多个 group 之间会是一个并行执行, 一个 group 内部是一个串行执行多个 outerAdapters。

    参考:

    https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

  • 相关阅读:
    [SCOI2009] Windy数
    [P1361] 小M的作物
    Wannafly Camp 2020 Day 2E 阔力梯的树
    2017百越杯反序列化writeup
    大美西安writeup
    Thinkphp的SQL查询方式
    Thinkphp的CURD
    记一次拿webshell踩过的坑(如何用PHP编写一个不包含数字和字母的后门)
    ThinkPHP的输出和模型使用
    ThinkPHP的运行流程-2
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/13301316.html
Copyright © 2011-2022 走看看