zoukankan      html  css  js  c++  java
  • nakadi 一款基于kafka 的http event broker

    nakadi 是zalando 开源的一款基于kafka 的event broker ,我们可以方便的使用http 协议进行操作
    支持一些特性:

    • stream 操作,我们可以流的方式订阅event
    • event 支持基于json schema 我们可以对于event 进行数据校验,方便的schema 注册
    • 支持oauth 、event type 的安全认证,同时也支持黑名单用户以及应用授权
    • 比较全的监控集成

    环境准备

    • docker-compose 文件
      说明使用pg 存储基本的元数据,此项目集成了UI界面
     
    version: '3'
    services:
      nakadi-ui:
        image: nakadi/nakadi-ui
        ports:
         - "3000:3000"
        depends_on:
         - nakadi
        environment:
         - NAKADI_API_URL=http://nakadi:8080
      nakadi:
        image: adyach/nakadi-docker:latest
        ports:
         - "8080:8080"
        depends_on:
         - postgres
         - zookeeper
         - kafka
        environment:
          - SPRING_PROFILES_ACTIVE=local
          - NAKADI_OAUTH2_MODE=OFF
          - NAKADI_ZOOKEEPER_BROKERS=zookeeper:2181
          - SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/local_nakadi_db
      postgres:
        image: adyach/nakadi-postgres:latest
        ports:
          - "5432:5432"
        environment:
          POSTGRES_USER: nakadi
          POSTGRES_PASSWORD: nakadi
          POSTGRES_DB: local_nakadi_db
      zookeeper:
        image: wurstmeister/zookeeper:3.4.6
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka:1.1.0
        ports:
          - "9092:9092"
        depends_on:
          - zookeeper
        environment:
          KAFKA_ADVERTISED_HOST_NAME: kafka
          KAFKA_ADVERTISED_PORT: 9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
          KAFKA_DELETE_TOPIC_ENABLE: 'true'
          KAFKA_BROKER_ID: 0
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
     
     

    启动&&测试

    • 启动
    docker-compose up -d
     
     
    • 创建event type
    curl -v -XPOST http://localhost:8080/event-types -H "Content-type: application/json" -d '{
      "name": "order.ORDER_RECEIVED",
      "owning_application": "order-service",
      "category": "undefined",
      "partition_strategy": "random",
      "schema": {
        "type": "json_schema",
        "schema": "{ "properties": { "order_number": { "type": "string" } } }"
      }
    }'
     
     
    • 发布event
    curl -v -XPOST http://localhost:8080/event-types/order.ORDER_RECEIVED/events -H "Content-type: application/json" -d '[
      {
        "order_number": "24873243241",
        "metadata": {
          "eid": "d765de34-09c0-4bbb-8b1e-7160a33a0791",
          "occurred_at": "2016-03-15T23:47:15+01:00"
        }
      }, {
        "order_number": "24873243242",
        "metadata": {
          "eid": "a7671c51-49d1-48e6-bb03-b50dcf14f3d3",
          "occurred_at": "2016-03-15T23:47:16+01:00"
        }
      }]'
     
     
    • 订阅event
    curl -v http://localhost:8080/event-types/order.ORDER_RECEIVED/events 
     
     
    • 效果
    {"cursor":{"partition":"0","offset":"001-0001-000000000000000059"},"events":[{"order_number":"24873243242","metadata":{"occurred_at":"2016-03-15T23:47:16+01:00","eid":"a7671c51-49d1-48e6-bb03-b50dcf14f3d3","event_type":"order_received","partition":"0","received_at":"2018-12-12T14:12:36.479Z","flow_id":"9XJC8HBQY1lqiNK5N8g9doXR","version":"1.0.0"}}]}
    {"cursor":{"partition":"0","offset":"001-0001-000000000000000060"},"events":[{"order_number":"24873243241","metadata":{"occurred_at":"2016-03-15T23:47:15+01:00","eid":"d765de34-09c0-4bbb-8b1e-7160a33a0791","event_type":"order_received","partition":"0","received_at":"2018-12-12T14:12:37.603Z","flow_id":"PIvB1uG1qicLUiTWPyu8nMZI","version":"1.0.0"}}]}
    {"cursor":{"partition":"0","offset":"001-0001-000000000000000061"},"events":[{"order_number":"24873243242","metadata":{"occurred_at":"2016-03-15T23:47:16+01:00","eid":"a7671c51-49d1-48e6-bb03-b50dcf14f3d3","event_type":"order_received","partition":"0","received_at":"2018-12-12T14:12:37.603Z","flow_id":"PIvB1uG1qicLUiTWPyu8nMZI","version":"1.0.0"}}]}
     
     

    说明

    nakadi 的功能还是很强大的,对于kafka 的集成,可以让我们只需要关注业务系统功能的开发

    参考资料

    https://nakadi.io/manual.html

  • 相关阅读:
    Mac 卸载MySql的方法
    关于mysql的wait_timeout参数 设置不生效的问题
    linux下利用nohup后台运行jar文件包程序
    MySql创建视图
    spring mvc获取header
    Spring Data Jpa 查询返回自定义对象
    Caused by: org.xml.sax.SAXParseException: The reference to entity "characterEncoding" must end with the ';' delimiter.
    eclipse Reference 功能之——项目之间的引用
    Mac 在启动eclipse时 Failed to load JavaHL Library解决方法
    MySQL Workbench update语句错误Error Code: 1175.
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/10111498.html
Copyright © 2011-2022 走看看