logstash将kakfa数据消费到es
kafa集群
10.10.10.60:9092
10.10.10.61:9092
10.10.10.62:90920
kafka2es.conf
input{
kafka {
bootstrap_servers => "10.10.10.60:9092, 10.10.10.61:9092, 10.10.10.62:90920"
topics => ["topicnamexxx","topicnameyyy","topicnamezzz","topicnamewww"]
group_id => "xxx_prod_game_sitexx"
auto_offset_reset => "earliest"
codec => "json"
consumer_threads => 10
client_id => "client_site1"
max_poll_records => "150"
max_poll_interval_ms => "600000"
heartbeat_interval_ms => 2000
}
}
#debug模式调试
#input {
# stdin {
# codec => json
# }
#}
output {
#debug模式调试
#stdout {
# codec => rubydebug
#}
# 业务01 F5
elasticsearch {
index => "%{es_index_name2}"
document_id => "%{id}"
hosts => ["10.10.10.61:9200"]
user => "elastic"
password => "wpasswordxxxy"
codec => json
}
# 业务02 F5
elasticsearch {
index => "%{es_index_name2}"
document_id => "%{id}"
hosts => ["10.10.10.64:9200"]
user => "elastic"
password => "twpasswordxxxyD"
codec => json
}
}
检查logstash到es的联通性
注释掉业务使用的input输入,下面的input
打开debug模式检查是否生效
input {
stdin {
codec => json
}
}
output {
stdout {
codec => rubydebug
}
}
bin/logstash -f test_grok.conf --verbose --debug
需要后面使用--verbose --debug
这样直接在前台启动,便可以看到日志输出到终端.