zoukankan      html  css  js  c++  java
  • 初试DTLE-单向-聚合-分散复制

    环境准备

    主机名 IP地址 部署服务
    study01 10.186.65.68 DTLE、MySQL
    study02 10.186.65.71 DTLE、MySQL
    study03 10.186.65.72 DTLE、MySQL

    Ps:如果不适用容器进行部署,首先安装三台或以上MySQL实例,并开启binlog以及GTID

    DTLE概述

    dtle (Data-Transformation-le) 是上海爱可⽣信息技术股份有限公司 开发并开源的 CDC ⼯具. 其功能特点是:

    1. 多种数据传输模式

      • ⽀持链路压缩
      • ⽀持同构传输和异构传输
      • ⽀持跨⽹络边际的传输
    2. 多种数据处理模式

      • ⽀持库/表/⾏级别 数据过滤
    3. 多种数据通道模式

      • ⽀持多对多的数据传输
      • ⽀持回环传输
    4. 多种源/⽬标端

      • ⽀持MySQL - MySQL的数据传输
      • ⽀持MySQL - Kafka的数据传输
    5. 集群模式

      • 提供可靠的元数据存储
      • 可进⾏⾃动任务分配
      • ⽀持⾃动故障转移

    单向复制/聚合/分散

    DTLE支持的常见复制场景如下

    1. 按数据源/数据目标划分

      • 支持1个源端到一个目标端的复制
      • 支持多个源端到一个目标端的聚合复制
      • 支持一个源端到多个目标的拆分复制
    2. 按网络类型划分

      • 支持网络内部数据传输
      • 支持跨网络的数据传输(可使⽤ 链路压缩/链路限流 等功能)
    3. 按集群规模划分

      • 可配置单⼀dtle实例处理单⼀数据通道
      • 可配置 dtle集群 处理 多个数据通道

    image

    DTLE单项复制

    1. 下载DTLE的RPM安装包
    https://github.com/actiontech/dtle/releases/tag/v3.21.03.0
    
    1. 安装RPM包
    [root@study01 ~]# rpm -ivh dtle-3.21.03.0.x86_64.rpm --prefix=/data/dtle
    
    1. 修改consul配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
    [root@study01 dtle]# pwd
    /data/dtle/etc/dtle
    
    [root@study01 dtle]# cat consul.hcl 
    # Rename for each node
    node_name = "consul0"   #定义consul名称,多集群名字不能重复
    data_dir = "/data/dtle/var/lib/consul"
    ui = true
    
    disable_update_check = true
    
    # Address that should be bound to for internal cluster communications
    bind_addr = "0.0.0.0"
    # Address to which Consul will bind client interfaces, including the HTTP and DNS servers
    client_addr = "10.186.65.68"    #本机ip地址
    advertise_addr = "10.186.65.68" #本机ip地址
    ports = {
      # Customize if necessary. -1 means disable.
      #dns = -1
      #server = 8300
      #http = 8500
      #serf_wan = -1
      #serf_lan = 8301
    }
    
    limits = {
      http_max_conns_per_client = 4096
    }
    
    server = true
    # For single node
    bootstrap_expect = 1    #一台consul
    
    # For 3-node cluster
    #bootstrap_expect = 3   #三台consul,部署集群开启
    #retry_join = ["127.0.0.1", "127.0.0.2", "127.0.0.3"] # will use default serf port
    
    log_level = "INFO"
    log_file = "/data/dtle/var/log/consul/"
    
    
    1. 修改nomad配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
    root@study01 dtle]# ls
    consul.hcl  nomad.hcl
    
    [root@study01 dtle]# pwd
    /data/dtle/etc/dtle
    
    [root@study01 dtle]# cat nomad.hcl
    name = "nomad0" # rename for each node 定义nomad名字,部署集群名字不可重复
    datacenter = "dc1"
    data_dir  = "/data/dtle/var/lib/nomad"
    plugin_dir = "/data/dtle/usr/share/dtle/nomad-plugin"
    
    log_level = "Info"
    log_file = "/data/dtle/var/log/nomad/"
    
    disable_update_check = true
    
    bind_addr = "0.0.0.0"   #监听地址
    # change ports if multiple nodes run on a same machine
    ports {
      http = 4646
      rpc  = 4647
      serf = 4648
    }
    addresses {
      # Default to `bind_addr`. Or set individually here.
      #http = "127.0.0.1"
      #rpc  = "127.0.0.1"
      #serf = "127.0.0.1"
    }
    advertise {
      http = "10.186.65.68:4646"
      rpc  = "10.186.65.68:4647"
      serf = "10.186.65.68:4648"
    }
    
    server {
      enabled          = true   #服务端开启
    
      bootstrap_expect = 1
      # Set bootstrap_expect to 3 for multiple (high-availablity) nodes.
      # Multiple nomad nodes will join with consul.
    }
    
    client {
      enabled = true
      options = {
        "driver.blacklist" = "docker,exec,java,mock,qemu,rawexec,rkt"
      }
    
      # Will auto join other server with consul.
    }
    
    consul {
      # dtle-plugin and nomad itself use consul separately.
      # nomad uses consul for server_auto_join and client_auto_join.
      # Only one consul can be set here. Write the nearest here,
      #   e.g. the one runs on the same machine with the nomad server.
      address = "10.186.65.68:8500" #客户端开启
    }
    
    plugin "dtle" {
      config {
        log_level = "Info" # repeat nomad log level here
        data_dir = "/data/dtle/var/lib/nomad"
        nats_bind = "10.186.65.68:8193"
        nats_advertise = "10.186.65.68:8193"
        # Repeat the consul address above.
        consul = "10.186.65.68:8500"    #对接consul地址
    
        # By default, API compatibility layer is disabled.
        #api_addr = "127.0.0.1:8190"   # for compatibility API
        nomad_addr = "10.186.65.68:4646" # compatibility API need to access a nomad server
    
        publish_metrics = false
        stats_collection_interval = 15
      }
    }
    
    
    1. 启动nomad以及consul服务
    systemctl restart dtle-consul.service
    systemctl restart dtle-consul.service
    
    [root@study01 dtle]# ps -ef | grep dtle
    dtle      8580     1  1 14:21 ?        00:00:00 /data/dtle/usr/bin/consul agent -config-file=/data/dtle/etc/dtle/consul.hcl
    dtle      8660     1 10 14:22 ?        00:00:00 /data/dtle/usr/bin/nomad agent -config /data/dtle/etc/dtle/nomad.hcl
    root      8720  2787  0 14:22 pts/0    00:00:00 grep --color=auto dtle
    
    1. 登录nomad和consul的Web页面进行查看

    2. 在源端MySQL实例创建测试库和测试表,并在表中插入数据

    mysql> create database test;
    mysql> use test;
    mysql> create table t1 (id int primary key);
    mysql> insert into t1 values(1),(2),(3);
    mysql> select * from t1;
    
    1. 创建job的json文件
    [root@study01 dtle]# cat job.json
    {
    "Job": {
    "ID": "job1",   #job名字
    "Datacenters": ["dc1"],
    "TaskGroups": [{
    "Name": "src",
    "Tasks": [{
    "Name": "src",
    "Driver": "dtle",
    "Config": {
    "Gtid": "",
    "ReplicateDoDb": [{
    "TableSchema": "test",  #复制的库
    "Tables": [{
    "TableName": "t1"   #复制的表
    }]
    }],
    "ConnectionConfig": {
    "Host": "10.186.65.68", #源端连接信息
    "Port": 3333,
    "User": "test",
    "Password": "test"
    }
    }
    }]
    }, {
    "Name": "dest",
    "Tasks": [{
    "Name": "dest",
    "Driver": "dtle",
    "Config": {
    "ConnectionConfig": {
    "Host": "10.186.65.72", #目标端连接信息
    "Port": 4444,
    "User": "test",
    "Password": "test"
    }
    }
    }]
    }]
    }
    }
    
    1. 利用curl命令调用nomad客户端的接口,创建这个job
    [root@study01 dtle]# curl -XPOST "http://10.186.65.68:4646/v1/jobs" -d @job.json -s | jq
    {
      "EvalID": "a551d0fc-5de9-9e7b-3673-21b115919646",
      "EvalCreateIndex": 151,
      "JobModifyIndex": 150,
      "Warnings": "",
      "Index": 151,
      "LastContact": 0,
      "KnownLeader": false
    }
    
    • -d @指定的是job的json文件的路径,我是在当前路径下执行所以不需要指定绝对路径
    • jq命令是用于提取返回结果的内容,类似与linux命令grep,需要安装才可以使用
    1. 查看创建job的状态
    [root@study01 dtle]# curl -XGET "http://10.186.65.68:4646/v1/job/job1" -s | jq '.Status'
    
    • /v1/job/{创建job的ID名字}
    1. 进入到目标端数据库进行查看数据是否已经复制过去
    mysql> show databases;
    +--------------------+
    | Database           |
    +--------------------+
    | dtle               |  #有一张二进制的表。用于记录job的GTID号
    | information_schema |
    | mysql              |
    | performance_schema |
    | sys                |
    | test               |  #同步过来的库
    | universe           |
    +--------------------+
    
    mysql> select * from test.t1;
    +----+
    | id |
    +----+
    |  1 |
    |  2 |
    |  3 |
    +----+
    
    1. 在源端数据库在插入几条数据,在进行测试
    mysql> use test;
    mysql> insert into t1 values(7),(8),(9);
    

    此时我们在到目标端数据库查看数据是否有增量数据进来

    mysql> select * from test.t1;
    +----+
    | id |
    +----+
    |  1 |
    |  2 |
    |  3 |
    |  7 |
    |  8 |
    |  9 |
    +----+
    

    HTTP API、nomad 命令⾏⼯具

    nomad和consul之间的关系

    nomad 本体使⽤consul进⾏多节点注册和发现,dtle nomad 插件使⽤consul进⾏任务元数据储存,也就是说,当我们的nomad agent创建一个job的时候会通过consul进行记录元数据,以及复制位置点,如果我们要删除一个job的时候,不单单只是把job给删除,还需要删除consul中的记录,如果不删除consul中的记录,那么下次创建job的时候默认还是会从consul中记录的位置点开始复制。

    HTTP API

    实际上我们使用curl命令调用的是nomad agent端的HTTP端的接口,将我们本地的job.json文件提交到nomad agent端

    nomad命令行工具使用

    nomad工具启动一个job使用的是hcl格式的文件,不可以使用json格式,模板默认放在/usr/share/dtle/scripts下面

    1. 先使用curl的方式删除job
    [root@study01 dtle]# curl -XDELETE 10.186.65.68:4646/v1/job/job1?purge=true
    {"EvalID":"c3e69b56-eaab-fa02-afbb-a15e0b395170","EvalCreateIndex":198,"JobModifyIndex":197,"VolumeEvalID":"","VolumeEvalIndex":0,"Index":198,"LastContact":0,"KnownLeader":false}
    
    1. 删除job后如果想清空consul记录使用以下命令
    [root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
    true
    
    1. 使用nomad命令创建一个job

      • 复制一份hcl的模板,进行修改
      cp usr/share/dtle/scripts/example.job.hcl .
      
      • 修改hcl文件
      [root@study01 dtle]# mv example.job.hcl job.hcl
      
      [root@study01 dtle]# vim job.hcl    #修改复制源目标信息
      
    2. 使用nomad命令指定这个文件创建job

    [root@study01 dtle]# ./usr/bin/nomad job run job.hcl
    ==> Monitoring evaluation "21c79d55"
        Evaluation triggered by job "job1"
        Evaluation within deployment: "c5aeae1d"
        Allocation "8640b0af" created: node "e1be240c", group "dest"
        Allocation "e44c8d29" created: node "e1be240c", group "src"
        Evaluation status changed: "pending" -> "complete"
    ==> Evaluation "21c79d55" finished with status "complete"
    
    1. 查询我们刚才创建job的状态
    [root@study01 dtle]# ./usr/bin/nomad job status
    ID    Type     Priority  Status   Submit Date
    job1  service  50        running  2021-05-07T16:26:21+08:00
    
    1. 在源端插入数据,在目标端验证数据
    2. 停止job并删除
    [root@study01 dtle]# ./usr/bin/nomad job stop -purge job1
    ==> Monitoring evaluation "dce8f4f3"
        Evaluation triggered by job "job1"
        Evaluation within deployment: "c5aeae1d"
        Evaluation status changed: "pending" -> "complete"
    ==> Evaluation "dce8f4f3" finished with status "complete"
    
    • 依然需要手动删除consul记录
    [root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
    
    nomad其他命令介绍
    1. 查看server节点
    [root@study01 dtle]# ./usr/bin/nomad server members
    Name           Address       Port  Status  Leader  Protocol  Build   Datacenter  Region
    nomad0.global  10.186.65.68  4648  alive   true    2         0.11.1  dc1         global
    
    1. 查看client节点
    [root@study01 dtle]# ./usr/bin/nomad node status
    ID        DC   Name    Class   Drain  Eligibility  Status
    e1be240c  dc1  nomad0  <none>  false  eligible     ready
    
    1. 查看某个job
    [root@study01 dtle]# ./usr/bin/nomad job status {jobname}
    
    1. 查看nomad版本
    [root@study01 dtle]# ./usr/bin/nomad version
    Nomad v0.11.1 (b43457070037800fcc8442c8ff095ff4005dab33)
    
    1. 查看某⼀节点的dtle插件版本
    [root@study01 dtle]# ./usr/bin/nomad node status -verbose e1be240c | grep dtle
    dtle      true      true     Healthy   2021-05-07T14:22:17+08:00
    driver.dtle               = 1
    driver.dtle.full_version  = 3.21.03.0-3.21.03.x-2df8ad7
    driver.dtle.version       = 3.21.03.0
    
    1. 此时nomad命令作为HTTP客⼾端连接nomad agent, 如果agent不在默认地址,则需要指定 例如:--address=http://127.0.0.1:4646
    [root@study01 dtle]# ./usr/bin/nomad node status --address=http://10.186.65.68:4646
    ID        DC   Name    Class   Drain  Eligibility  Status
    e1be240c  dc1  nomad0  <none>  false  eligible     ready
    

    MySQL汇聚复制

    准备两个源端MySQL一个目标端MySQL,先记录GTID,再创建测试库和测试表

    1. src1数据库操作
    mysql> show master statusG
    Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-140
    mysql> use test;
    mysql> create table t2(id int);
    mysql> insert into t2 values(1),(2),(3);
    
    mysql> select * from test.t2;
    +------+
    | id   |
    +------+
    |    1 |
    |    2 |
    |    3 |
    +------+
    
    1. src2数据库操作
    mysql> show master status G
    Executed_Gtid_Set: efc79686-af16-11eb-bbaa-02000aba4147:1-135
    mysql> use test;
    mysql> create table t2(id int);
    mysql> insert into t2 values(4),(5),(6);
    
    1. 创建src1的配置文件
    [root@study01 dtle]# cat job_src1.hcl
    job "job1" {
      datacenters = ["dc1"]
     
      group "src" {
        task "src" {
          driver = "dtle"
          config {
            ReplicateDoDb = [{
              TableSchema = "test"
              Tables = [{
                TableName = "t2"
              }]
            }]
            DropTableIfExists = false
            Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-140"
            ChunkSize = 2000
            ConnectionConfig = {
              Host = "10.186.65.68"
              Port = 3333
              User = "test"
              Password = "test"
            }
          }
        }
      }
      group "dest" {
        task "dest" {
          driver = "dtle"
          config {
            ConnectionConfig = {
              Host = "10.186.65.72"
              Port = 4444
              User = "test"
              Password = "test"
            }
     
            # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
            #KafkaConfig = {
            #  Topic = "kafka1"
            #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
            #  Converter = "json"
            #}
          }
        }
      }
     
      reschedule {
        # By default, nomad will unlimitedly reschedule a failed task.
        # We limit it to once per 30min here.
        attempts = 1
        interval = "30m"
        unlimited = false
      }
    }
    
    1. 创建src2的配置文件
    [root@study01 dtle]# cat job_src2.hcl
    job "job2" {
      datacenters = ["dc1"]
     
      group "src" {
        task "src" {
          driver = "dtle"
          config {
            ReplicateDoDb = [{
              TableSchema = "test"
              Tables = [{
                TableName = "t2"
              }]
            }]
            DropTableIfExists = false
            Gtid = "efc79686-af16-11eb-bbaa-02000aba4147:1-135"
            ChunkSize = 2000
            ConnectionConfig = {
              Host = "10.186.65.71"
              Port = 5555
              User = "test"
              Password = "test"
            }
          }
        }
      }
      group "dest" {
        task "dest" {
          driver = "dtle"
          config {
            ConnectionConfig = {
              Host = "10.186.65.72"
              Port = 4444
              User = "test"
              Password = "test"
            }
     
            # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
            #KafkaConfig = {
            #  Topic = "kafka1"
            #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
            #  Converter = "json"
            #}
          }
        }
      }
     
      reschedule {
        # By default, nomad will unlimitedly reschedule a failed task.
        # We limit it to once per 30min here.
        attempts = 1
        interval = "30m"
        unlimited = false
      }
    }
    
    • 两个文件不同的地方有,job名称,GTID,源端连接地址
    1. 创建job
    [root@study01 dtle]# ./usr/bin/nomad job run job_src1.hcl
    ==> Monitoring evaluation "28ae70e4"
        Evaluation triggered by job "job1"
        Evaluation within deployment: "95f9a76b"
        Allocation "d07787a4" created: node "e1be240c", group "dest"
        Allocation "f46d9291" created: node "e1be240c", group "src"
        Evaluation status changed: "pending" -> "complete"
    ==> Evaluation "28ae70e4" finished with status "complete"
    
    [root@study01 dtle]# ./usr/bin/nomad job run job_src2.hcl
    ==> Monitoring evaluation "1d2da1f7"
        Evaluation triggered by job "job2"
        Evaluation within deployment: "716b11a5"
        Allocation "2dd7fe5c" created: node "e1be240c", group "src"
        Allocation "724ec296" created: node "e1be240c", group "dest"
        Evaluation status changed: "pending" -> "complete"
    ==> Evaluation "1d2da1f7" finished with status "complete"
    
    1. 目标端数据库验证数据
    mysql> select * from test.t2;
    +------+
    | id   |
    +------+
    |    1 |
    |    2 |
    |    3 |
    |    4 |
    |    5 |
    |    6 |
    +------+
    
    1. 在任意一个源端写入数据,然后到目标端查看是否增量数据
    mysql> insert into t2 values (7);
    Query OK, 1 row affected (0.01 sec)
    
    mysql> select * from test.t2;
    +------+
    | id   |
    +------+
    |    1 |
    |    2 |
    |    3 |
    |    7 |
    +------+
    4 rows in set (0.01 sec)
    
    1. 验证目标端数据库的数据
    mysql> select * from test.t2;
    +------+
    | id   |
    +------+
    |    1 |
    |    2 |
    |    3 |
    |    4 |
    |    5 |
    |    6 |
    |    7 |
    +------+
    7 rows in set (0.01 sec)
    

    MySQL分散复制

    准备三台MySQL实例,一台源端,两台目标端。根据条件,分别复制到不同的目标端数据库

    1. 首先记录下源端数据库当前的GTID
    mysql> show master status G
    Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-143
    
    1. 创建测试表,写入一些数据
    mysql> use test;
    mysql> create table t3(id int);
    mysql> insert into t3 values(1),(2);
    
    1. 创建dst1配置文件
    [root@study01 dtle]# cat job_dst1.hcl
    job "dst1" {
      datacenters = ["dc1"]
     
      group "src" {
        task "src" {
          driver = "dtle"
          config {
            ReplicateDoDb = [{
              TableSchema = "test"
              Tables = [{
                TableName = "t3"
                Where = "id<10"
              }]
            }]
            DropTableIfExists = false
            Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-143"
            ChunkSize = 2000
            ConnectionConfig = {
              Host = "10.186.65.68"
              Port = 3333
              User = "test"
              Password = "test"
            }
          }
        }
      }
      group "dest" {
        task "dest" {
          driver = "dtle"
          config {
            ConnectionConfig = {
              Host = "10.186.65.71"
              Port = 5555
              User = "test"
              Password = "test"
            }
     
            # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
            #KafkaConfig = {
            #  Topic = "kafka1"
            #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
            #  Converter = "json"
            #}
          }
        }
      }
     
      reschedule {
        # By default, nomad will unlimitedly reschedule a failed task.
        # We limit it to once per 30min here.
        attempts = 1
        interval = "30m"
        unlimited = false
      }
    }
    
    1. 创建dst2配置文件

    两个配置文件不同的地方有,job名字,where条件,以及目标端的连接信息

    1. 创建job
    [root@study01 dtle]# ./usr/bin/nomad job run job_dst1.hcl
    [root@study01 dtle]# ./usr/bin/nomad job run job_dst2.hcl
    
    [root@study01 dtle]# ./usr/bin/nomad status
    ID    Type     Priority  Status   Submit Date
    dst1  service  50        running  2021-05-07T20:13:05+08:00
    dst2  service  50        running  2021-05-07T20:13:10+08:00
    
    1. 验证数据,小于10的会在71这台数据库,大于等于10的会在72这台数据库

    2. 在源端插入数据,然后分别到目标端进行查看

    mysql> insert into t3 values(9),(10);
    mysql> select * from test.t3;
    +------+
    | id   |
    +------+
    |    1 |
    |    2 |
    |    9 |
    |   10 |
    +------+
    
    1. 到71上查看数据
    mysql> select * from test.t3;
    +------+
    | id   |
    +------+
    |    1 |
    |    2 |
    |    9 |
    +------+
    
    1. 到72上查看数据
    mysql> select * from test.t3;
    +------+
    | id   |
    +------+
    |   10 |
    +------+
    

    因为有悔,所以披星戴月;因为有梦,所以奋不顾身! 个人博客首发:easydb.net 微信公众号:easydb 关注我,不走丢!

    因为有悔,所以披星戴月;因为有梦,所以奋不顾身! 个人博客首发:easydb.net 微信公众号:easydb 关注我,不走丢!
  • 相关阅读:
    安卓5.1/7.1/8.1+wifi有叉问题解决
    历史市盈率查询
    IE9 Windows7 x64
    Error while loading shared libraries: libpq.so.5: cannot open shared object file: No such file or directory
    ubuntu kylin 13.10 无法安装ia32-libs解决方案
    ajax报错问题的解决
    关于 java swing 使用按钮关闭窗口
    关于在filter中获取WebApplicationContext的实践
    spring security
    毕设项目,系统搭建笔记文档
  • 原文地址:https://www.cnblogs.com/easydb/p/14742583.html
Copyright © 2011-2022 走看看