"@version" => "1", "@timestamp" => "2016-09-12T08:31:06.630Z", "path" => "/data01/applog_backup/winfae_log/wj-frontend01-access.2016-09-12", "host" => "dr-mysql01.zjcap.com", "type" => "wj_frontend_access", "clientip" => "10.168.255.134", "time" => "12/Sep/2016:16:30:40 +0800", "verb" => "GET", filters/date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。 [elk@zjtest7-frontend config]$ vim stdin02.conf input { stdin { } } filter { grok { match => ["message", "%{HTTPDATE:logdate}"] } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] add_field =>["response_time","%{logdate}"] } } output { stdout { codec=>rubydebug{} } [elk@zjtest7-frontend config]$ ../bin/logstash -f stdin02.conf Settings: Default pipeline workers: 1 Pipeline main started 12/Sep/2016:21:32:33 +0800 { "message" => "12/Sep/2016:21:32:33 +0800", "@version" => "1", "@timestamp" => "2016-09-12T13:32:33.000Z", "host" => "0.0.0.0", "logdate" => "12/Sep/2016:21:32:33 +0800", "response_time" => "12/Sep/2016:21:32:33 +0800" } -------------------------------------------------------- 这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。 input { stdin { } } filter { grok { match => ["message", "%{HTTPDATE:logdate}"] } # date { # match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] # add_field =>["response_time","%{logdate}"] # } } output { stdout { codec=>rubydebug{} } } ~ ~ ~ [elk@zjtest7-frontend config]$ ../bin/logstash -f stdin02.conf Settings: Default pipeline workers: 1 Pipeline main started 12/Sep/2016:21:32:33 +0800 { "message" => "12/Sep/2016:21:32:33 +0800", "@version" => "1", "@timestamp" => "2016-09-12T13:47:08.611Z", "host" => "0.0.0.0", "logdate" => "12/Sep/2016:21:32:33 +0800" }