zoukankan      html  css  js  c++  java
  • benthos stream nats 集成试用

    测试demo 来自官方例子

    使用docker-compose 进行运行

    nats docker-compose file

    version: '3.3'
    services:
      nats:
        image: nats
        ports:
          - "4222:4222"
          - "8222:8222"
          - "6222:6222"

    benthos stream 配置

    参考 https://github.com/Jeffail/benthos/tree/master/resources/docker/streams

    • 整体说明
      里面的例子就是数据从文件读取到nats 之后不同的消费者进行nats 消息消费,再到webhook,webhook 标准输出

    • 文件读取说明

      读取文件,之后到nats

    input:
      type: read_until
      read_until:
        condition:
          type: static
          static: false
        input:
          type: file
          file:
            path: ./sample.json
        restart_input: true
    pipeline:
      processors:
        - type: throttle
          throttle:
            period: 3s
    output:
      type: nats
      nats:
        subject: benthos_messages
        urls:
          - nats://localhost:4222
    
    • nats 消费者(三个)
    input:
      type: nats
      nats:
        subject: benthos_messages
        urls:
        - nats://localhost:4222
    pipeline:
      processors:
        - type: filter
          filter:
            type: jmespath
            jmespath:
              query: |
                keys(@) | contains(@, 'title')
    output:
      type: http_client
      http_client:
        url: http://localhost:4195/webhooks/post/customer1
        verb: POST
    
    • webhook 配置
    input:
      type: broker
      broker:
        inputs:
          - type: http_server
            http_server:
              path: /post/customer1
            processors:
              - type: text
                text:
                  operator: prepend
                  value: "Customer 1 received: "
          - type: http_server
            http_server:
              path: /post/customer2
            processors:
              - type: text
                text:
                  operator: prepend
                  value: "Customer 2 received: "
          - type: http_server
            http_server:
              path: /post/customer3
            processors:
              - type: text
                text:
                  operator: prepend
                  value: "Customer 3 received: "
    output:
      type: stdout
    

    运行

    • docker-compose
    dokcer-compose up -d
    • 启动benthos,基于文件stream 配置
    benthos --streams --streams-dir ./configs

    运行效果


    修改sample.json 测试

    参考资料

    https://github.com/Jeffail/benthos/tree/master/resources/docker/streams

  • 相关阅读:
    Swagger配置和使用
    请求SpringMVC接口如何传参数
    ssm搭建配置文件
    永久关闭windows10更新
    VSCode搭建java开发环境
    idea全局设置
    mybatis-plus查询指定字段
    mybayis-plus条件构造器
    Java日期时间操作的一些方法
    C#编写聊天软件客户端
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/9569811.html
Copyright © 2011-2022 走看看