stream 模式,我们可以通过rest api 进行控制
使用方法
- 启动
benthos --streams
- 进行流的配置(rest api)
curl http://localhost:4195/streams/foo -X POST --data-binary @- <<EOF
input:
type: http_server
buffer:
type: memory
pipeline:
threads: 4
processors:
- type: jmespath
jmespath:
query: "{id: user.id, content: body.content}"
output:
type: http_server
EOF
- 查询配置
- 添加另外一个stream 配置
curl http://localhost:4195/streams/bar -X POST --data-binary @- <<EOF
input:
type: kafka
kafka:
addresses:
- localhost:9092
topic: my_topic
buffer:
type: none
pipeline:
threads: 1
processors:
- type: sample
sample:
retain: 10
output:
type: elasticsearch
elasticsearch:
urls:
- http://localhost:9200
EOF
- 获取单个流配置信息(rest api)
curl http://localhost:4195/streams/foo | jq '.'
- 更新配置(api)
以下移除 memery buffer
curl http://localhost:4195/streams/foo -X PUT --data-binary @- <<EOF
input:
type: http_server
buffer:
type: none
pipeline:
threads: 4
processors:
- type: jmespath
jmespath:
query: "{id: user.id, content: body.content}"
output:
type: http_server
EOF
- 删除配置的stream
curl http://localhost:4195/streams/foo -X DELETE
参考资料
https://github.com/Jeffail/benthos/blob/HEAD/docs/streams/using_REST_API.md