中间加了一层转换,日志从filebeat采集进入logstash,然后将日志中的时间字段覆盖到es的@timestamp字段,kibana会将这个字段当做是日志的上传时间
input {
beats {
port => 10515
}
}
filter{
if "xxx" in [tags]{
grok {
match => ["message","%{TIMESTAMP_ISO8601:log.date}"]
}
date {
match => ["log.date", "yyyy-MM-dd HH:mm:ss,SSS"]
target => "@timestamp"
}
}
if "xxx" in [tags]{
grok {
match => ["message","%{TIMESTAMP_ISO8601:timestamp8601}"]
}
date {
match => ["timestamp8601", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
}
if "xxx" in [tags]{
grok {
match => ["message","%{TIMESTAMP_ISO8601:timestamp8601}"]
}
date {
match => ["timestamp8601", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
}
}
output {
if "xxx" in [tags]{
elasticsearch {
hosts => ["http://xxx:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "xxx-%{+YYYY.MM}"
user => "xxx"
password => "xxx"
}
}
if "xxx" in [tags]{
elasticsearch {
hosts => ["http://xxx:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "xxx-%{+YYYY.MM}"
user => "xxx"
password => "xxx"
}
}
if "xxx" in [tags]{
elasticsearch {
hosts => ["http://xxx:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "xxx-%{+YYYY.MM}"
user => "xxx"
password => "xxx"
}
}
}