zoukankan      html  css  js  c++  java
  • debezium mongodb 集成测试

    debezium 是一个方便的cdc connector 可以帮助我们解决好多数据实时变更处理、数据分析、微服务的数据通信
    从上次跑简单demo到现在,这个工具是有好多的变更,添加了好多方便的功能,支持了越来越多的数据库。
    demo 使用了官方提供的docker-compose 文件

    环境准备

    • docker-compose 文件
     
    version: '2'
    services:
      zookeeper:
        image: debezium/zookeeper:0.9
        ports:
         - 2181:2181
         - 2888:2888
         - 3888:3888
      kafka:
        image: debezium/kafka:0.9
        ports:
         - 9092:9092
        links:
         - zookeeper
        environment:
         - ZOOKEEPER_CONNECT=zookeeper:2181
      mongodb:
        image: debezium/example-mongodb:0.9
        hostname: mongodb
        ports:
         - 27017:27017
        environment:
         - MONGODB_USER=debezium
         - MONGODB_PASSWORD=dbz
      connect:
        image: debezium/connect:0.9
        ports:
         - 8083:8083
        links:
         - kafka
         - mongodb
        environment:
         - BOOTSTRAP_SERVERS=kafka:9092
         - GROUP_ID=1
         - CONFIG_STORAGE_TOPIC=my_connect_configs
         - OFFSET_STORAGE_TOPIC=my_connect_offsets
    • 启动
    docker-compose up -d
     
    • 加载mongodb 数据
      demo 镜像内置了一个mongo 数据的脚本
     
    docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c '/usr/local/bin/init-inventory.sh'
     
     

    连接的用户账户密码: debezium dbz
    效果

    • 启动connector
     
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json
     

    register-mongodb.json 内容

    {
        "name": "inventory-connector",
        "config": {
            "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
            "tasks.max" : "1",
            "mongodb.hosts" : "rs0/mongodb:27017",
            "mongodb.name" : "dbserver1",
            "mongodb.user" : "debezium",
            "mongodb.password" : "dbz",
            "database.whitelist" : "inventory",
            "database.history.kafka.bootstrap.servers" : "kafka:9092"
        }
    }
     
     

    效果

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connect
    ors/ -d @register-mongodb.json
    HTTP/1.1 201 Created
    Date: Tue, 25 Dec 2018 12:15:59 GMT
    Location: http://localhost:8083/connectors/inventory-connector
    Content-Type: application/json
    Content-Length: 379
    Server: Jetty(9.4.12.v20180830)
    {"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mongodb.MongoDbConnector","tasks.max":"1","mongodb.hosts":"rs0/mongodb:27017","mongodb.name":"dbserver1","mongodb.user":"debezium","mongodb.password":"dbz","database.whitelist":"inventory","database.history.kafka.bootstrap.servers":"kafka:9092","name":"inventory-connector"},"tasks":[],"type":null}%
     
     

    测试

    • 打开消费者(kafka 命令行)
    docker-compose -f docker-compose-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh 
        --bootstrap-server kafka:9092 
        --from-beginning 
        --property print.key=true 
        --topic dbserver1.inventory.customers
     
    • 修改数据&&查看效果
      修改数据:

      效果
     
    {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":"1001"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"ns"},{"type":"int32","optional":false,"field":"sec"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"},{"type":"boolean","optional":true,"default":false,"field":"initsync"}],"optional":false,"name":"io.debezium.connector.mongo.Source","version":1,"field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"after":null,"patch":"{"$v" : 1,"$set" : {"first_name" : "dalong"}}","source":{"version":"0.9.0.Beta2","connector":"mongodb","name":"dbserver1","rs":"rs0","ns":"inventory.customers","sec":1545740280,"ord":1,"h":5866229044893041819,"initsync":false},"op":"u","ts_ms":1545740280088}}
     

    说明

    debezium mongodb 的cdc 是基于复制集实现的,实际上mongodb 已经支持了stream 可以进行数据的捕获处理

    参考资料

    https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mongodb
    https://debezium.io/

  • 相关阅读:
    poj 1743 Musical Theme 后缀数组
    poj 1743 Musical Theme 后缀数组
    cf 432D Prefixes and Suffixes kmp
    cf 432D Prefixes and Suffixes kmp
    hdu Data Structure? 线段树
    关于position和anchorPoint之间的关系
    ios POST 信息
    CALayers的代码示例
    CALayers详解
    ios中得sqlite使用基础
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/10176261.html
Copyright © 2011-2022 走看看