匹配字段
%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:time-taken}|-)
filter 规则1.0
if [type] =~ "winlog-" {
#删除iis日志中以#号开头的文件
if [message] =~ "^#" {
drop {}
}
#完成匹配和拆分iislog,并删除message字段。
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:time-taken}|-)" }
remove_field => ["message"]
}
#按指定分隔符切割指定字段
mutate {
split => ["cs-uri-stem", "/ApiKey/"]
add_field => {
"tmpVinKey" => "%{[cs-uri-stem][1]}"
}
}
mutate {
split => ["tmpVinKey", "/"]
add_field => {
"apikey" => "%{[tmpVinKey][0]}"
}
add_field => {
"action_name" => "%{[tmpVinKey][1]}"
}
remove_field => ["tmpVinKey"]
}
#设置以字段访问时间的索引
date {
match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
}
}
}
filter 规则2.0
if [type] =~ "winlog-" {
#删除iis日志中以#号开头的文件
if [message] =~ "^#" {
drop {}
}
#完成匹配和拆分iislog,并删除message字段。
#完善iis字段
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-substatus}|-) (%{NUMBER:sc-win32-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:time-taken}|-)" }
remove_field => ["message"]
}
#复制field
mutate {
add_field => {"request" => "%{cs-uri-stem}"}
}
#按指定分隔符切割指定字段
mutate {
split => ["request", "/ApiKey/"]
add_field => {
"tmpVinKey" => "%{[request][1]}"
}
}
mutate {
split => ["tmpVinKey", "/"]
add_field => {
"apikey" => "%{[tmpVinKey][0]}"
}
add_field => {
"action_name" => "%{[tmpVinKey][1]}"
}
remove_field => ["tmpVinKey"]
remove_field => ["request"]
}
#设置以字段访问时间的索引
date {
match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
}
}
}
filter 规则3.0以及output
if [type] =~ "winlog-" {
if [message] =~ "^#" {
drop {}
}
mutate {
add_field => {"line_message" => "%{message} %{offset}"}
}
ruby {
code => "
require 'digest/md5';
event.set('computed_id', Digest::MD5.hexdigest(event.get('line_message')))
"
}
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{WORD:s-sitename}|-) (%{IPORHOST:s-ip}|-) (%{WORD:cs-method}|-) %{NOTSPACE:cs-uri-stem} %{NOTSPACE:cs-uri-query} (%{NUMBER:s-port}|-) %{NOTSPACE:cs-referer} (%{IPORHOST:c-ip}|-) %{NOTSPACE:cs-useragent} %{NOTSPACE:cs-host} (%{NUMBER:sc-status}|-) (%{NUMBER:sc-substatus}|-) (%{NUMBER:sc-win32-status}|-) (%{NUMBER:sc-bytes}|-) (%{NUMBER:cs-bytes}|-) (%{NUMBER:tme-taken}|-)" }
remove_field => ["message","[beat][name]","[beat][version]"]
}
mutate {
add_field => {"request" => "%{cs-uri-stem}"}
}
mutate {
split => ["request", "/ApiKey/"]
add_field => {
"tp" => "%{[request][1]}"
}
}
mutate {
split => ["tp", "/"]
add_field => {
"apikey" => "%{[tp][0]}"
}
add_field => {
"action_name" => "%{[tp][1]}"
}
remove_field => ["tp"]
remove_field => ["request","line_message"]
}
date {
match => ["log_timestamp", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Etc/UTC"
}
}
}
elasticsearch {
hosts => ["192.168.1.150:9200"]
user => logstash_internal
password => changeme
index => "%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
document_id => "%{computed_id}"
template_overwrite => true
}