zoukankan      html  css  js  c++  java
  • debezium mongodb 集成测试

    debezium 是一个方便的cdc connector 可以帮助我们解决好多数据实时变更处理、数据分析、微服务的数据通信
    从上次跑简单demo到现在,这个工具是有好多的变更,添加了好多方便的功能,支持了越来越多的数据库。
    demo 使用了官方提供的docker-compose 文件

    环境准备

    • docker-compose 文件
     
    version: '2'
    services:
      zookeeper:
        image: debezium/zookeeper:0.9
        ports:
         - 2181:2181
         - 2888:2888
         - 3888:3888
      kafka:
        image: debezium/kafka:0.9
        ports:
         - 9092:9092
        links:
         - zookeeper
        environment:
         - ZOOKEEPER_CONNECT=zookeeper:2181
      mongodb:
        image: debezium/example-mongodb:0.9
        hostname: mongodb
        ports:
         - 27017:27017
        environment:
         - MONGODB_USER=debezium
         - MONGODB_PASSWORD=dbz
      connect:
        image: debezium/connect:0.9
        ports:
         - 8083:8083
        links:
         - kafka
         - mongodb
        environment:
         - BOOTSTRAP_SERVERS=kafka:9092
         - GROUP_ID=1
         - CONFIG_STORAGE_TOPIC=my_connect_configs
         - OFFSET_STORAGE_TOPIC=my_connect_offsets
    • 启动
    docker-compose up -d
     
    • 加载mongodb 数据
      demo 镜像内置了一个mongo 数据的脚本
     
    docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c '/usr/local/bin/init-inventory.sh'
     
     

    连接的用户账户密码: debezium dbz
    效果

    • 启动connector
     
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json
     

    register-mongodb.json 内容

    {
        "name": "inventory-connector",
        "config": {
            "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
            "tasks.max" : "1",
            "mongodb.hosts" : "rs0/mongodb:27017",
            "mongodb.name" : "dbserver1",
            "mongodb.user" : "debezium",
            "mongodb.password" : "dbz",
            "database.whitelist" : "inventory",
            "database.history.kafka.bootstrap.servers" : "kafka:9092"
        }
    }
     
     

    效果

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connect
    ors/ -d @register-mongodb.json
    HTTP/1.1 201 Created
    Date: Tue, 25 Dec 2018 12:15:59 GMT
    Location: http://localhost:8083/connectors/inventory-connector
    Content-Type: application/json
    Content-Length: 379
    Server: Jetty(9.4.12.v20180830)
    {"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mongodb.MongoDbConnector","tasks.max":"1","mongodb.hosts":"rs0/mongodb:27017","mongodb.name":"dbserver1","mongodb.user":"debezium","mongodb.password":"dbz","database.whitelist":"inventory","database.history.kafka.bootstrap.servers":"kafka:9092","name":"inventory-connector"},"tasks":[],"type":null}%
     
     

    测试

    • 打开消费者(kafka 命令行)
    docker-compose -f docker-compose-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh 
        --bootstrap-server kafka:9092 
        --from-beginning 
        --property print.key=true 
        --topic dbserver1.inventory.customers
     
    • 修改数据&&查看效果
      修改数据:

      效果
     
    {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":"1001"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"ns"},{"type":"int32","optional":false,"field":"sec"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"},{"type":"boolean","optional":true,"default":false,"field":"initsync"}],"optional":false,"name":"io.debezium.connector.mongo.Source","version":1,"field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"after":null,"patch":"{"$v" : 1,"$set" : {"first_name" : "dalong"}}","source":{"version":"0.9.0.Beta2","connector":"mongodb","name":"dbserver1","rs":"rs0","ns":"inventory.customers","sec":1545740280,"ord":1,"h":5866229044893041819,"initsync":false},"op":"u","ts_ms":1545740280088}}
     

    说明

    debezium mongodb 的cdc 是基于复制集实现的,实际上mongodb 已经支持了stream 可以进行数据的捕获处理

    参考资料

    https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mongodb
    https://debezium.io/

  • 相关阅读:
    leetcode题库
    递归的存储以及执行顺序
    linux与开发板串口通信
    opencv基础到进阶(2)
    opencv基础到进阶(1)
    js的搜索遍历精讲
    js闭包深度讲解
    js使用for in遍历时的细节问题
    分分钟解决正则表达式
    css3中的新特性经典应用
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/10176261.html
Copyright © 2011-2022 走看看