input
{
kafka
{
bootstrap_servers => "192.168.32.36:9092,192.168.32.37:9092,192.168.32.38:9092"
topics => "msa-log-prod"
codec => "json"
group_id => "msa-log-prod-elsearch"
consumer_threads => 4
decorate_events => true
}
kafka
{
bootstrap_servers => "192.168.32.36:9092,192.168.32.37:9092,192.168.32.38:9092"
topics => "msa-log-test"
codec => "json"
group_id => "msa-log-test-elsearch"
consumer_threads => 2
decorate_events => true
}
}
output
{
if [env] == "prod" {
elasticsearch
{
hosts => ["192.168.32.36:9200","192.168.32.37:9200","192.168.32.38:9200"]
index => "msa-log-prod-%{+YYYY.MM.dd}"
}
}
if [env] == "test" {
elasticsearch
{
hosts => ["192.168.32.36:9200","192.168.32.37:9200","192.168.32.38:9200"]
index => "msa-log-test-%{+YYYY.MM.dd}"
}
}
}
说明:需要kafka消息格式是json并且包含一个env字段用于区分环境,如果仅一个input和output可以去掉if判断。
