zoukankan      html  css  js  c++  java
  • docker化canal-adapter

    今日公司需求,需要将mysql更新实时同步到kafka中,后来又要将数据库中的一张表的变化实时同步到另一台mysql中,并且将数据库中的sql也同步到es中,于是乎canal与canal-adapter紧急解决,其中踩了不少坑,下面为总结内容

    官方文档:https://github.com/alibaba/canal/wiki

    前提:默认安装了es,本文采用6.8.8版本

    需要先创建es的index及mapping

    {
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 2
        },
        "mappings": {
            "_doc": {
                "properties": {
                    "personnel_name": {
                        "type": "text"
                    },
                    "personnel_num": {
                        "type": "keyword"
                    }
                }
            }
        }
    }

    {
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 2
        },
        "mappings": {
            "_doc": {
                "properties": {
                    "clock_record_id": {
                        "type": "keyword"
                    },
                    "personnel_name": {
                        "type": "text"
                    },
                    "personnel_num": {
                        "type": "keyword"
                    }
                }
            }
        }

    一、canal-server镜像的创建及canal-server的compose文件

    1、Dockerfile文件内容

    FROM openjdk:8-jre-alpine
    ADD [ "https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz", "/opt/canal-server/" ]
    WORKDIR /opt/canal-server
    EXPOSE 11110 11112
    COPY ["entrypoint.sh", "/"]
    RUN apk add bash tzdata
    && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
    && apk del tzdata
    VOLUME ["/opt/canal-server/logs", "/opt/canal-server/conf"]
    ENTRYPOINT /entrypoint.sh

    2、entrypoint.sh文件内容

    #!/bin/bash
    
    Base_dir=/opt/canal-server/conf
    Log_dir=/opt/canal-server/logs
    
    set -e
    # 配置canal-server的运行模式,当前镜像支持tcp和kafka
    if [ -n "${canal_serverMode}" ]; then
      sed -i "/^canal.serverMode/ s/serverMode.*/serverMode = ${canal_serverMode}/" ${Base_dir}/canal.properties
    else
      echo "Invalid mode ${canal_serverMode}, This image support tcp and kafka mode now"
      exit 1
    fi
    
    if [ -n "${instances}" ]; then
      destinations=$(echo ${instances} | sed 's/ /,/g')
      sed -i "/^canal.destinations/ccanal.destinations = ${destinations}" ${Base_dir}/canal.properties
      for instance in ${instances}
      do
        declare -A dict
        ins_dic=$(eval echo '$'"{${instance}_dict}" | awk -F'"' '{print $2}')
        for kv in ${ins_dic}
        do
          k=`echo $kv | awk -F'=' '{print $1}'`
          v=`echo $kv | awk -F'=' '{print $2}'`
          dict[$k]=$v
        done
        if [ "${instance}" != "example" ]; then
          mkdir ${Base_dir}/${instance} && cp ${Base_dir}/example/* ${Base_dir}/${instance}/
          if [ ${canal_serverMode} = 'kafka' ]; then
            sed -i "/^canal.mq.servers/ccanal.mq.servers=${canal_mq_servers}" ${Base_dir}/canal.properties
            if [ -n "${dict[canal_mq_topic]}" ];then
              sed -i "/.*canal.mq.topic/ccanal.mq.topic=${dict[canal_mq_topic]}" ${Base_dir}/${instance}/instance.properties
            else
              sed -i "/^canal.mq.topic/d" ${Base_dir}/${instance}/instance.properties
              sed -i "/.*canal.mq.dynamicTopic=/ccanal.mq.dynamicTopic=${dict[canal_mq_dynamicTopic]}" ${Base_dir}/${instance}/instance.properties
            fi
          fi
    
          if [ -n "${dict[canal_instance_master_address]}" ]; then
            sed -i  "/^canal.instance.master.address=/ccanal.instance.master.address=${dict[canal_instance_master_address]}" ${Base_dir}/${instance}/instance.properties
          fi
    
          if [ -n "${dict[canal_instance_filter_regex]}" ]; then
            sed -i "/^canal.instance.filter.regex/ccanal.instance.filter.regex=${dict[canal_instance_filter_regex]}" ${Base_dir}/${instance}/instance.properties
          fi
        fi
      done
    fi
    
    /bin/sh /opt/canal-server/bin/startup.sh
    sleep 3
    tail -F /opt/canal-server/logs/canal/canal.log
    View Code
     

    3、docker-compose文件内容

    version: "3"
    services:
      canal-server:
        image: canal-server:1.1.4
        container_name: canal-server
        env_file:
          - ./wave-canal-server.env
        ports:
          - "11110:11110"
          - "11112:11112"
        volumes:
          - "/etc/hosts:/etc/hosts"
        deploy:
          resources:
            limits:
              cpus: '0.5'
              memory: 2G
        restart: always
        logging:
          driver: "json-file"
          options:
            max-size: "10m"
            max-file: "3"

    4、docker-compose文件中的env文件内容

    # canal-server运行模式,支持kafka或tcp
    canal_serverMode=kafka
    # canal实例名称,多个实例则以逗号分隔,wave1为同步到es中的实例,wave2为大屏展示
    instances=wave1,wave2
    # 定义每个实例的选项值,以实例"名称_dict"来定义,canal_mq_topic为topic名称,不指定时则以canal_mq_dynamicTopic的值为准,canal_instance_master_address为要同步的源mysql地址,canal_instance_filter_regex为要监听的数据库表,多个以逗号分隔,canal_mq_dynamicTopic为topic名称命名规则,其中有两为转义符
    wave1_dict="canal_mq_topic=test canal_instance_master_address=mysql01.inside.wavewisdom-bj.com:3306 canal_instance_filter_regex=wavewisdom-bj-develop.odin_device_position,wavewisdom-bj-develop.odin_device_camera,wavewisdom-bj-develop.odin_device_device_position_associate canal_mq_dynamicTopic=.*\\..*"
    wave2_dict="canal_mq_topic=task canal_instance_master_address=mysql02.inside.wavewisdom-bj.com:3306 canal_instance_filter_regex=wavewisdom-bj-develop.odin_business_emergency_record,wavewisdom-bj-develop.odin_business_capture_record,wavewisdom-bj-develop.work_flow_mission,wavewisdom-bj-develop.odin_device_camera,wavewisdom-bj-develop.odin_device_device_position_associate,wavewisdom-bj-develop.odin_device_position,wavewisdom-bj-develop.odin_business_alarm_record,wavewisdom-bj-develop.odin_advise_info canal_mq_dynamicTopic=.*\\..*"
    canal_mq_servers=kafka01.inside.wavewisdom-bj.com:9092
    # 其中有两个转义符
    canal_mq_dynamicTopic=.*\\..*

    5、启动即可,观察日志

    二、canal-adapter镜像的创建及canal-adapter的compose文件

    1、Dockerfile文件内容

    FROM openjdk:8-jre-alpine
    RUN echo "Asia/Shanghai" > /etc/timezone
    ADD https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz /opt/adapter/
    WORKDIR /opt/adapter
    COPY conf /opt/adapter/conf/
    COPY ["entrypoint.sh", "/"]
    ENTRYPOINT /entrypoint.sh
    FROM openjdk:8-jre-alpine 
    ADD [ "canal.tar.gz", "/opt/" ]
    WORKDIR /opt/canal
    EXPOSE 11110 11112
    COPY ["entrypoint.sh", "/"]
    VOLUME ["/opt/canal/logs", "/opt/canal/conf"]
    ENTRYPOINT /entrypoint.

    2、entrypoint.sh文件内容

    #!/bin/sh
    set -e
    Conf_Dir=/opt/adapter/conf
    # 配置adapter中的canal-server的模式
    if [ ${Canal_mode} == 'kafka' ]; then
      sed -i "/^.*mode:/ s/:.*/: ${Canal_mode}/" ${Conf_Dir}/application.yml
      sed -i  "/mqServers:/ s/:.*/: ${Mq_Servers}/" ${Conf_Dir}/application.yml
    elif [ ${Canal_mode} == 'tcp' ]; then
      sed -i "/^.*mode:/ s/:.*/: ${Canal_mode}/" ${Conf_Dir}/application.yml
      sed -i  "/mqServers:/ s/mqServers:.*/canalServerHost: ${Canal_Servers}/" ${Conf_Dir}/application.yml
    else
      echo "Invalid mode ${Canal_mode}, This image support tcp and kafka mode now"
      exit 1
    fi
    
    # 源mysql地址
    if [ -n ${Src_Data_Server} ]; then
      sed -i  "/^.*url: jdbc:mysql:/ s/mysql:.*/mysql://${Src_Data_Server}/${Src_Database}?useUnicode=true/" ${Conf_Dir}/application.yml
    fi
    
    # 源mysql用户名
    if [ -n ${Src_User} ]; then
      sed -i  "/^.*username:/ s/:.*/: ${Src_User}/" ${Conf_Dir}/application.yml
    fi
    
    # 源mysql用户名密码
    if [ -n ${Src_Password} ]; then
      sed -i  "/^.*password:/ s/:.*/: ${Src_Password}/" ${Conf_Dir}/application.yml
    fi
    
    # 配置实例名称,若为tcp模式,则与canal-server中实例名称一直,若为tcp模式,则与topic名称一直
    if [ -n ${Adapter_instance} ]; then
      sed -i  "/- instance:/ s/:.*/: ${Adapter_instance}/g" ${Conf_Dir}/application.yml
      sed -i "/destination:/ s/:.*/: ${Adapter_instance}/g" ${Conf_Dir}/rdb/mytest_user.yml
      sed -i "/destination:/ s/:.*/: ${Adapter_instance}/g" ${Conf_Dir}/es/mytest_user.yml
    fi
    
    for Out in ${Out_adapters}
    do
      echo ${Out}
      if [ ${Out} == 'rdb' ];then
        if [ -n ${Src_Database} ]; then
          sed -i  "/^.*database:/ s/:.*/: ${Src_Database}/" ${Conf_Dir}/rdb/mytest_user.yml
        fi
        if [ -n ${Src_Table} ]; then
          sed -i  "/^.*table:/ s/:.*/: ${Src_Table}/" ${Conf_Dir}/rdb/mytest_user.yml
        fi
    
        if [ -n ${Dest_User} ]; then
          sed -i  "/^.*jdbc.username:/ s/:.*/: ${Dest_User}/" ${Conf_Dir}/application.yml
        fi
        if [ -n ${Dest_Password} ]; then
          sed -i  "/^.*jdbc.password:/ s/:.*/: ${Dest_Password}/" ${Conf_Dir}/application.yml
        fi
        if [ -n ${Dest_Database} ] && [ -n ${Dest_Table} ]; then
          sed -i  "/^.*targetTable:/ s/:.*/: ${Dest_Database}.${Dest_Table}/" ${Conf_Dir}/rdb/mytest_user.yml
        fi
        if [ -n ${Target_Pk} ]; then
          R_Target_Pk=`echo $Target_Pk | sed -e 's/:/: /g'`
          sed -i  "/^.*targetPk:/{n;s/[a-z].*/${R_Target_Pk}/g}" ${Conf_Dir}/rdb/mytest_user.yml
        fi
        if [ -n ${Dest_Data_Server} ]; then
          sed -i  "/^.*jdbc.url: jdbc:mysql:/ s/mysql:.*/mysql://${Dest_Data_Server}/${Dest_Database}/" ${Conf_Dir}/application.yml
        fi
        if [ ${Map_All} == 'true' ]; then
          sed -i "/mapAll:/c  mapAll: true" ${Conf_Dir}/rdb/mytest_user.yml
          sed -i "/targetColumns:/c#  targetColumns:" ${Conf_Dir}/rdb/mytest_user.yml
        else
          sed -i "/mapAll:/c#  mapAll: true" ${Conf_Dir}/rdb/mytest_user.yml
          sed -i "/targetColumns:/c  targetColumns:" ${Conf_Dir}/rdb/mytest_user.yml
          for colume in ${Mapping_Columes}
          do
            R_colume=`echo $colume | sed -e 's/:/: /g'`
            sed -i "/^.*targetColumns:/a    ${R_colume}" ${Conf_Dir}/rdb/mytest_user.yml
          done
        fi
      elif [ ${Out} == 'es' ];then
        if [ -n ${Es_hosts} ];then
          sed -i "/^.*hosts:/ s/hosts:.*/hosts: ${Es_hosts}/" ${Conf_Dir}/application.yml
        fi
        if [ -n ${Es_cluster} ];then
          sed -i "/^.*cluster.name:/ s/name:.*/name: ${Es_cluster}/" ${Conf_Dir}/application.yml
        fi
        if [ -n ${Es_index} ];then
          sed -i "/^.*_index:/ s/index:.*/index: ${Es_index}/"  ${Conf_Dir}/es/mytest_user.yml
        fi
        if [ -n ${Es_type} ];then
          sed -i "/^.*_type:/ s/type:.*/type: ${Es_type}/"  ${Conf_Dir}/es/mytest_user.yml
        fi
        if [ -n ${Es_id} ];then
          sed -i "/^.*_id:/ s/id:.*/id: ${Es_id}/"  ${Conf_Dir}/es/mytest_user.yml
        fi
        sed -i "/^.*sql:/ s/sql:.*/sql: ${Sql_map}/"  ${Conf_Dir}/es/mytest_user.yml
      else
        echo "Invalid outerAdapters ${Out}, This image support es and rdb mode now"
        exit 1
      fi
    done
    
    sh /opt/adapter/bin/startup.sh
    tail -F /opt/adapter/logs/adapter/adapter.log
    View Code
    #!/bin/sh
    
    Base_dir=/opt/canal/conf
    Log_dir=/opt/canal/logs
    if [ -n ${canal_instance_master_address} ]; then
      sed -i  "/^canal.instance.master.address=/ccanal.instance.master.address=${canal_instance_master_address}" ${Base_dir}/example/instance.properties
    fi
    
    if [ -n ${canal_mq_servers} ]; then
      sed -i "/^canal.mq.servers/ccanal.mq.servers=${canal_mq_servers}" ${Base_dir}/canal.properties
    fi
    
    if [ -n ${canal_instance_filter_regex} ]; then
      sed -i "/^canal.instance.filter.regex/ccanal.instance.filter.regex=${canal_instance_filter_regex}" ${Base_dir}/example/instance.properties
    fi
    
    if [ -n ${canal_mq_dynamicTopic} ]; then
      sed -i "/^canal.mq.dynamicTopic/ccanal.mq.dynamicTopic=${canal_mq_dynamicTopic}" ${Base_dir}/example/instance.properties
    fi
    
    /bin/sh /opt/canal/bin/startup.sh
    tail -F ${Log_dir}/canal/canal.log

    3、docker-compose文件内容

    version: "3"
    services:
      canal-adapter:
        image: adapter:v1.1.4
        container_name: canal-adapter
        ports:
          - 8081:8081
        #volumes:
        #  - ./conf:/opt/adapter/conf
        env_file:
          - ./canal-adapter.env
        restart: always
        logging:
             driver: "json-file"
             options:
               max-size: "10m"
               max-file: "3"
    version: "3"
    services:
      canal:
        image: registry.cn-beijing.aliyuncs.com/wavewisdom-bj-registry-common/canal:1.1.4
        container_name: canal
        env_file:
          - ./wave-canal.env
        ports:
          - "11110:11110"
          - "11112:11112"
        restart: always

    4、docker-compose文件用到的wave-canal.env文件内容

    # canal-server运行模式,当前镜像支持kafka和tcp模式
    Canal_mode=kafka
    # canal-server实例名称,若模式为kafka,则实例名称为kafka中的topic
    Adapter_instance=test.odin_business_clock_record
    # canal-adapter运行方式,当前镜像支持es和rdb,并且可同时支持,中间以空格分隔
    Out_adapters=es rdb
    # 以下为adapter运行方式为es时用到的变量
    # es的主机地址,若集群,以逗号分隔
    Es_hosts=193.168.1.39:9300
    # es集群或单机的唯一名称
    Es_cluster=wave-es
    # 同步到es中的索引名称
    Es_index=test
    # 索引类型
    Es_type=_doc
    # 索引id
    Es_id=_id
    # sql映射
    Sql_map='select a.clock_record_id as _id, a.personnel_name, a.personnel_num from odin_business_clock_record a'
    #  canal-server运行为kafka模式时,kafka服务地址
    Mq_Servers=193.168.1.136:9092
    # 源mysql的服务地址
    Src_Data_Server=193.168.1.136:3306
    # 源mysql用户名
    Src_User=root
    # 源mysql用户名密码
    Src_Password=waveDevelop123
    # 源mysql要同步的库名
    Src_Database=test
    # 源mysql要同步的库中的表名
    Src_Table=odin_business_clock_record
    # 下面为adapter运行为rdb方式时用到的变量
    # 目标mysql地址
    Dest_Data_Server=193.168.1.167:3306
    # 目标mysql的用户名
    Dest_User=root
    # 目标mysql用户名密码
    Dest_Password=wa123
    # 目标mysql中的库名
    Dest_Database=test
    # 目标mysql中的表名
    Dest_Table=odin_business_clock_record
    # 主键id
    Target_Pk=clock_record_id:clock_record_id
    # 是否开启全表映射,开启为true
    Map_All=false
    # 当未开启全表映射时,需要映射的列名,格式为:“目标表中的列名:源表中的列名”,多个列以空格分隔
    Mapping_Columes=clock_record_id:clock_record_id personnel_name:personnel_name personnel_num:personnel_num
    canal_instance_master_address=193.168.1.136:3306
    canal_instance_filter_regex=wavewisdom-bj-develop.odin_business_emergency_record,wavewisdom-bj-develop.odin_business_capture_record,wavewisdom-bj-develop.work_flow_mission,wavewisdom-bj-develop.odin_device_camera,wavewisdom-bj-develop.odin_device_device_position_associate,wavewisdom-bj-develop.odin_device_position,wavewisdom-bj-develop.odin_business_alarm_record,wavewisdom-bj-develop.odin_advise_info
    canal_mq_servers=10.0.14.47:9092
    # 其中有两个转义符
    canal_mq_dynamicTopic=.*\\..

    启动后观察日志

    PS:通过以上的Dockerfile和entrypoint.sh可以构建adapter镜像,通过compose文件和env文件可以将构建的镜像运行起来

    三、将环境变量配置正确后,即可运行起来

    然后在源mysql中增加一条记录

     然后在es中观察效果

     然后在另一台mysql中观察效果

     如此,则配置完成

    本文中支持同时将mysql中的变更同步到es和mysql中,canal-server的运行模式支持kafka和tcp模式,其他模式不适用本文,同样可以自己写entrypoint.sh文件,本文使用的es版本为6.8.8

    参考文献:https://www.jianshu.com/p/5bcf97335e71    https://blog.csdn.net/q936889811/article/details/95745721  https://blog.csdn.net/singgel/article/details/86166154

    四、在目标数据库中必须先全量同步源表后方可使用adapter,否则可能会出现update失败失败。本次的adapter工作模式是kafka,从kafka获取数据后对目标表进行update;本次的adapter的镜像仅针对单库单表的同步,若多库多表需重写entrypoint.sh文件

    五、canal动态创建topic参数“canal.mq.dynamicTopic”表达式说明

    test\.test 指定匹配的单表,发送到以 test\.test为名字的topic上
    .\..* 匹配所有表,每个表都会发送到各自表名的topic上
    test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
    test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
    .*\..* 将匹配到的表发送到库名.表名的topic上

    test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1\.test1 topic上,其余的表发送到默认的canal.mq.topic值

  • 相关阅读:
    python——(os, shutil)
    python-(subprocess, commands)
    PHP设计模式二:单例模式
    PHP设计模式一:工厂方法设计模式
    PHP垃圾回收机制
    PHP异常处理机制
    超文本传送协议HTTP
    IP地址
    Linux系统网络基本配置
    Linux系统LVM基本使用
  • 原文地址:https://www.cnblogs.com/caibao666/p/12056042.html
Copyright © 2011-2022 走看看