https://www.cnblogs.com/dyh004/p/9699813.html
官方文档https://www.elastic.co/guide/en/logstash/6.7/event-dependent-configuration.html#conditionals
https://windcoder.com/logstash6peizhiyufazhongdetiaojianpanduan
http://www.ttlsa.com/elk/elk-logstash-configuration-syntax/
logstash的filter的支持双引号 单引号,都可以,需要注意的是在grok的时候如果匹配字符串是双引号需要转义"
logstash的输出,如果有多个es地址,可以放到['es-node1','es-node2']
filter中的判断
if EXPRESSION {
...
} else if EXPRESSION {
...
} else {
...
}
如若action的值是login,则通过mutate filter删除secret字段:
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}
若是需要通过正则匹配(即,当if的表达式为正则时),匹配成功后添加自定义字段:
filter {
if [message] =~ /w+s+/w+(/learner/course/)/ {
mutate {
add_field => { "learner_type" => "course" }
}
}
}
在一个条件里指定多个表达式:
output {
# Send production errors to pagerduty
if [loglevel] == "ERROR" and [deployment] == "production" {
pagerduty {
...
}
}
}
在in条件,可以比较字段值:
filter {
if [foo] in [foobar] {
mutate { add_tag => "field in field" }
}
if [foo] in "foo" {
mutate { add_tag => "field in string" }
}
if "hello" in [greeting] {
mutate { add_tag => "string in field" }
}
if [foo] in ["hello", "world", "foo"] {
mutate { add_tag => "field in list" }
}
if [missing] in [alsomissing] {
mutate { add_tag => "shouldnotexist" }
}
if !("foo" in ["hello", "world"]) {
mutate { add_tag => "shouldexist" }
}
}
not in示例:
output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}
在logstash1.5版本开始,有一个特殊的字段,叫做@metadata。@metadata包含的内容不会作为事件的一部分输出
input { stdin { } }
filter {
mutate { add_field => { "show" => "This data will be in the output" } }
mutate { add_field => { "[@metadata][test]" => "Hello" } }
mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}
output {
if [@metadata][test] == "Hello" {
stdout { codec => rubydebug }
}
}
输出结果
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:42:51.496Z,
"@version" => "1",
"host" => "windcoder.com",
"show" => "This data will be in the output",
"message" => "asdf"
}
"asdf"变成message字段内容。条件与@metadata内嵌的test字段内容判断成功,但是输出并没有展示@metadata字段和其内容。
不过,如果指定了metadata => true,rubydebug codec允许显示@metadata字段的内容。
stdout { codec => rubydebug { metadata => true } }
输出结果
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:46:48.565Z,
"@metadata字段及其子字段内容。" => {
"test" => "Hello",
"no_show" => "This data will not be in the output"
},
"@version" => "1",
"host" => "windcoder.com",
"show" => "This data will be in the output",
"message" => "asdf"
}
现在就可以见到@metadata字段及其子字段内容。
只有rubydebug codec允许显示@metadata字段的内容。
只要您需要临时字段但不希望它在最终输出中,就可以使用@metadata字段。
最常见的情景是filter的时间字段,需要一临时的时间戳。如:
input { stdin { } }
filter {
grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}
output {
stdout { codec => rubydebug }
}
输出结果
$ bin/logstash -f ../test.conf
Pipeline main started
02/Mar/2014:15:36:43 +0100
{
"@timestamp" => 2014-03-02T14:36:43.000Z,
"@version" => "1",
"host" => "windcoder.com",
"message" => "02/Mar/2014:15:36:43 +0100"
}
一些示例:
input{
file {
path => "/usr/share/logstash/wb.cond/test.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter{
mutate {
gsub =>[
"message", "'", '"'
]
}
json {
source => "message"
}
mutate {
convert => {
"usdCnyRate" => "float"
"futureIndex" => "float"
}
}
date {
match => [ "timestamp", "UNIX_MS" ]
target => "logdate"
}
}
output{
stdout{
codec=>rubydebug
}
}
input {
file {
path => ["/data/test_logstash.log"]
type => ["nginx_log"]
start_position => "beginning"
}
}
filter {
if [type] =~ "nginx_log" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip} %{URIPATH:uri} %{GREEDYDATA:args} %{IP:client_ip} %{NUMBER:status}" }
}
urldecode{
field =>args
}
kv {
source =>"args"
field_split =>"&"
remove_field => [ "args","@timestamp","message","path","@version","path","host" ]
}
json {
source => "yc_log"
remove_field => [ "yc_log" ]
}
mutate {
add_field => { "lg_value" => "%{lg_vl}" }
}
json {
source => "lg_value"
remove_field => [ "lg_vl","lg_value" ]
}
}
}
output {
stdout { codec => rubydebug }
}
匹配失败
对于logstash的过滤,如果在match里面指定的正则语句对输入的日志信息无法过滤,会默认的添加一个tag字段,并且赋值 _grokparsefailure的字段
对于这种没有匹配成功的日志信息,可以选择丢弃
对于nginx的正则匹配。匹配不到通过tag字段中的值进行判断移除
filter {
grok {
match => {"message" => "%{IPV4:remote_addr} - %{DATA:nginx.access.user_name} [%{HTTPDATE:nginx.access.time}] "%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} "%{DATA:nginx.access.referrer}" "%{DATA:nginx.access.agent}" "%{DATA:nginx.access.other}""}
}
if "_grokparsefailure" in [tags] {
drop {}
}
}