input { kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "log" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["nginx_access_log_test"] codec => "json" type => "nginx_log" } } filter { mutate { gsub => ["message", "\x22", '"'] gsub => ["message", "\x09", ''] } json { source => "message" remove_field=>["message","beat","@version"] } ruby { code => ' file = File.open("/usr/local/logstash/config/white.txt", "r") text = file.read file.close if !text.include?(event.get("request_uri")) then event.set("es_flag","0") else event.set("es_flag","1") end ' } } output { if [es_flag] =="1" { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx_access_log_test" } } else { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx_access_log_test2" } } stdout { codec => rubydebug } }
复杂点的生产环境配置:
input{ kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "nginxaccesslog" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["nginx_access_log"] codec => "json" type => "nginx_log" } kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "database" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["dsideal_db"] codec => "json" type => "dsideal_db" } kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "devops_real" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["devopsrealinfo"] codec => "json" type => "devopsrealinfo" } kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "devops_base" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["devopsbaseinfo"] codec => "json" type => "devopsbaseinfo" } } filter{ mutate { gsub => ["message", "\x22", '"'] gsub => ["message", "\x09", ''] } json { source => "message" remove_field=>["message","beat","@version","@timestamp"] } if [type] == "nginx_log" { ruby { code => ' file = File.open("/usr/local/logstash/config/white.txt", "r") text = file.read file.close if !text.include?(event.get("request_uri")) then event.set("es_flag","0") else event.set("es_flag","1") end ' } } } output { if [type] == "nginx_log" { if [es_flag] =="1" { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx-access-log" } } else { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx-access-log-other" } } } if [type] == "dsideal_db" { elasticsearch { hosts => "127.0.0.1:9200" index => "%{table_name}" document_id => "%{id}" } } if [type] == "devopsbaseinfo" { elasticsearch { hosts => "127.0.0.1:9200" index => "devopsbaseinfo" document_id => "%{id}" } } if [type] == "devopsrealinfo" { elasticsearch { hosts => "127.0.0.1:9200" index => "devopsrealinfo" } } }