zoukankan      html  css  js  c++  java
  • 使用Kafka Connect创建测试数据生成器

    在最近的一些项目中,我使用Apache Kafka开发了一些数据管道在性能测试方面,数据生成总是会在整个活动中引入一些样板代码,例如创建客户端实例,编写控制流以发送数据,根据业务逻辑随机化有效负载等等。

    在测试设置期间,拥有一个处理所有繁重工作的框架会很好,因此只需要回答两个基本和基本的问题:

    • 数据应该是什么样的?(架构)
    • 要生成多少数据?(体积)

    有了Kafka Connect,事实证明实现自定义源连接器能够实现这一目标。以下是用于生成测试数据的示例属性列表的快速概述。

    topic.name = generated.events 
    poll.size = 10
    poll.interval.ms = 5000
    message.template = {“status”:“foo”,“direction”:“up”}
    random.fields = status:foo | bar |巴兹,方向:向上|向下|向左|向右

    这些属性是不言自明的。为了回答上述两个基本问题:message.templaterandom.fields控制模式,而poll.sizepoll.interval.ms控制音量。

    基于这些属性,我创建了一个名为“kafka-connect-datagen”(或简称“datagen”)的自定义源连接器,可在GitHub上获得

    在下一节中,我将简要介绍一些实现细节。

    实现自定义连接器

    Kafka Connect源连接器将数据从数据存储复制到Kafka,而接收器连接则相反。虽然是源连接器,但datagen不会链接到任何数据存储; 它从内部生成数据。其实施的其余部分根据是标准卡夫卡连接开发指南:它延伸SourceConnectorSourceTask,并实现了一些生命周期方法的钩。以下片段缩写自datagen。

     
     

    如代码所示,Connector定义Task要运行的类型和要为其设置的配置Task,同时Task是执行自定义逻辑的工作单元。两者ConnectorTask实例都在一个Worker 进程中运行汇合的文档详细介绍了这些概念。

    除了实现这两个类之外,还有一个步骤在运行演示之前:ConfigDef为用户定义配置列表()。之后,这些类可以打包为Connector插件。全面实施可以在GitHub上找到。

    在下一节中,我将演示如何将插件与dockerized本地群集设置一起使用。

    快速入门演示

    在本快速入门示例中,我们使用docker-compose管理所有必需的服务,如ZooKeeper,Kafka和Kafka Connect。要显示所有这些服务,请运行docker-compose up -d,然后运行docker-compose ps以打印状态信息,如下所示。

    Name State Ports 
    ----------------------------------------------- --------------------
    quickstart_broker_1 Up 0.0.0.0:9092->9092/tcp
    quickstart_connect_1 Up 0.0.0.0:8083->8083/tcp,t ...
    quickstart_kafka- connect-ui_1 Up 0.0.0.0:8001->8000/tcp
    quickstart_kafka-rest-proxy_1 Up 0.0.0.0:8082->8082/tcp
    quickstart_kafka-topics-ui_1 Up 0.0.0.0:8000->8000/tcp
    quickstart_zookeeper_1 Up 0.0。 0.0:2181-> 2181 / tcp,...

    Kafka和Kafka Connect将需要更长的时间才能开始。感谢Landoop Ltd,我们有这些不错的UI工具:打开http:// localhost:8000查看Kafka主题UI,http:// localhost:8001查看Kafka Connect UI。您也可以运行docker-compose logs -f以查看日志。

    由于通常Kafka Connect服务是最后一个完成启动的服务,我们可以通过运行docker-compose logs -f connect来查看其日志,以查看如下的正常运行指标。

    INFO使用config offset -1(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
    启动连接器和任务INFO完成启动连接器和任务(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

    当所有服务完全启动时,是时候创建“datagen”连接器实例了。以下是用于此演示的配置示例。它基本上设置“datagen”任务,每5秒生成10条消息。每条消息都使用定义的JSON消息模板和一些随机字段。运行以下命令以实例化Connector和Task。

    curl -X POST http:// localhost:8083 / connectors  
    -H'Content-Type:application / json'
    -H'Eccept:application / json'
    -d @ connect.source.datagen.json

    现在,在Kafka主题UI中,我们能够看到以generated.events定义的速率发布到主题的随机JSON消息

     

    kafka-connect-datagen发布消息

    要停止生成,我们可以转到Kafka Connect UI并暂停或删除连接器。同样,我们可以使用如下所示的REST API来实现相同的结果。查看此Confluent文档以获取更多操作。

    #pause连接器(如果成功则为空响应)
    curl -X PUT http:// localhost:8083 / connectors / connect.source.datagen / pause

    #delete连接器(如果成功则为空响应)
    curl -X DELETE http:// localhost:8083 / connectors / connect.source.datagen

    总之,我们能够利用Kafka Connect,这是一种现成的工具,可以很好地与Kafka集成,以最少的样板代码实现随机数据生成。自定义连接器插件 - kafka-connect-datagen - 具有高度可移植性,可以进一步扩展以支持集成测试和不同消息格式等功能。

  • 相关阅读:
    BestCoder Round #61 (div.2)
    CCPC L(水)
    CCPC A(模拟)
    暗网是什么?如何进入暗网?
    社会工程学:关于一些信息收集的网站
    Flask开发系列之Web表单
    Flask开发系列之模板
    [转]Python 资源大全中文版
    python字符串/列表/字典互相转换
    Flask开发系列之Flask+redis实现IP代理池
  • 原文地址:https://www.cnblogs.com/a00ium/p/10947085.html
Copyright © 2011-2022 走看看