匹配字段
%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:time-taken}|-)
filter 规则1.0
if [type] =~ "winlog-" { #删除iis日志中以#号开头的文件 if [message] =~ "^#" { drop {} } #完成匹配和拆分iislog,并删除message字段。 grok { match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:time-taken}|-)" } remove_field => ["message"] } #按指定分隔符切割指定字段 mutate { split => ["cs-uri-stem", "/ApiKey/"] add_field => { "tmpVinKey" => "%{[cs-uri-stem][1]}" } } mutate { split => ["tmpVinKey", "/"] add_field => { "apikey" => "%{[tmpVinKey][0]}" } add_field => { "action_name" => "%{[tmpVinKey][1]}" } remove_field => ["tmpVinKey"] } #设置以字段访问时间的索引 date { match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"] target => "@timestamp" } } }
filter 规则2.0
if [type] =~ "winlog-" { #删除iis日志中以#号开头的文件 if [message] =~ "^#" { drop {} } #完成匹配和拆分iislog,并删除message字段。 #完善iis字段 grok { match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-substatus}|-) (%{NUMBER:sc-win32-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:time-taken}|-)" } remove_field => ["message"] } #复制field mutate { add_field => {"request" => "%{cs-uri-stem}"} } #按指定分隔符切割指定字段 mutate { split => ["request", "/ApiKey/"] add_field => { "tmpVinKey" => "%{[request][1]}" } } mutate { split => ["tmpVinKey", "/"] add_field => { "apikey" => "%{[tmpVinKey][0]}" } add_field => { "action_name" => "%{[tmpVinKey][1]}" } remove_field => ["tmpVinKey"] remove_field => ["request"] } #设置以字段访问时间的索引 date { match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"] target => "@timestamp" } } }
filter 规则3.0以及output
if [type] =~ "winlog-" { if [message] =~ "^#" { drop {} } mutate { add_field => {"line_message" => "%{message} %{offset}"} } ruby { code => " require 'digest/md5'; event.set('computed_id', Digest::MD5.hexdigest(event.get('line_message'))) " } grok { match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-substatus}|-) (%{NUMBER:sc-win32-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:tme-taken}|-)" } remove_field => ["message","[beat][name]","[beat][version]"] } mutate { add_field => {"request" => "%{cs-uri-stem}"} } mutate { split => ["request", "/ApiKey/"] add_field => { "tp" => "%{[request][1]}" } } mutate { split => ["tp", "/"] add_field => { "apikey" => "%{[tp][0]}" } add_field => { "action_name" => "%{[tp][1]}" } remove_field => ["tp"] remove_field => ["request","line_message"] } date { match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"] target => "@timestamp" timezone => "Etc/UTC" } } } elasticsearch { hosts => ["192.168.1.150:9200"] user => logstash_internal password => changeme index => "%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" document_id => "%{computed_id}" template_overwrite => true }