zoukankan      html  css  js  c++  java
  • logstash的filter

    https://www.cnblogs.com/dyh004/p/9699813.html
    官方文档https://www.elastic.co/guide/en/logstash/6.7/event-dependent-configuration.html#conditionals
    https://windcoder.com/logstash6peizhiyufazhongdetiaojianpanduan
    http://www.ttlsa.com/elk/elk-logstash-configuration-syntax/
    logstash的filter的支持双引号 单引号,都可以,需要注意的是在grok的时候如果匹配字符串是双引号需要转义"

    logstash的输出,如果有多个es地址,可以放到['es-node1','es-node2']

    filter中的判断

    if EXPRESSION {
      ...
    } else if EXPRESSION {
      ...
    } else {
      ...
    }
    
    

    如若action的值是login,则通过mutate filter删除secret字段:
    filter {
      if [action] == "login" {
        mutate { remove_field => "secret" }
      }
    }
    
    若是需要通过正则匹配(即,当if的表达式为正则时),匹配成功后添加自定义字段:
    filter {
        if [message] =~ /w+s+/w+(/learner/course/)/ {
            mutate {
                add_field => { "learner_type" => "course" }
            }
        }
    }
    在一个条件里指定多个表达式:
    output {
      # Send production errors to pagerduty
      if [loglevel] == "ERROR" and [deployment] == "production" {
        pagerduty {
        ...
        }
      }
    }
    
    在in条件,可以比较字段值:
    filter {
      if [foo] in [foobar] {
        mutate { add_tag => "field in field" }
      }
      if [foo] in "foo" {
        mutate { add_tag => "field in string" }
      }
      if "hello" in [greeting] {
        mutate { add_tag => "string in field" }
      }
      if [foo] in ["hello", "world", "foo"] {
        mutate { add_tag => "field in list" }
      }
      if [missing] in [alsomissing] {
        mutate { add_tag => "shouldnotexist" }
      }
      if !("foo" in ["hello", "world"]) {
        mutate { add_tag => "shouldexist" }
      }
    }
    
    not in示例:
    output {
      if "_grokparsefailure" not in [tags] {
        elasticsearch { ... }
      }
    }
    
    
    在logstash1.5版本开始,有一个特殊的字段,叫做@metadata。@metadata包含的内容不会作为事件的一部分输出
    input { stdin { } }
    
    filter {
      mutate { add_field => { "show" => "This data will be in the output" } }
      mutate { add_field => { "[@metadata][test]" => "Hello" } }
      mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
    }
    
    output {
      if [@metadata][test] == "Hello" {
        stdout { codec => rubydebug }
      }
    }
    输出结果
    $ bin/logstash -f ../test.conf
    Pipeline main started
    asdf
    {
        "@timestamp" => 2016-06-30T02:42:51.496Z,
          "@version" => "1",
              "host" => "windcoder.com",
              "show" => "This data will be in the output",
           "message" => "asdf"
    }
    "asdf"变成message字段内容。条件与@metadata内嵌的test字段内容判断成功,但是输出并没有展示@metadata字段和其内容。
    
    不过,如果指定了metadata => true,rubydebug codec允许显示@metadata字段的内容。
    stdout { codec => rubydebug { metadata => true } }
    输出结果
    $ bin/logstash -f ../test.conf
    Pipeline main started
    asdf
    {
        "@timestamp" => 2016-06-30T02:46:48.565Z,
         "@metadata字段及其子字段内容。" => {
               "test" => "Hello",
            "no_show" => "This data will not be in the output"
        },
          "@version" => "1",
              "host" => "windcoder.com",
              "show" => "This data will be in the output",
           "message" => "asdf"
    }
    
    现在就可以见到@metadata字段及其子字段内容。
    只有rubydebug codec允许显示@metadata字段的内容。
    只要您需要临时字段但不希望它在最终输出中,就可以使用@metadata字段。
    最常见的情景是filter的时间字段,需要一临时的时间戳。如:
    input { stdin { } }
    
    filter {
      grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
      date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
    }
    
    output {
      stdout { codec => rubydebug }
    }
    输出结果
    $ bin/logstash -f ../test.conf
    Pipeline main started
    02/Mar/2014:15:36:43 +0100
    {
        "@timestamp" => 2014-03-02T14:36:43.000Z,
          "@version" => "1",
              "host" => "windcoder.com",
           "message" => "02/Mar/2014:15:36:43 +0100"
    }
    

    一些示例:

    input{
        file {
            path => "/usr/share/logstash/wb.cond/test.log"
            start_position => "beginning"
            sincedb_path => "/dev/null"
        }
    }
    filter{
        mutate {
            gsub =>[
                "message", "'", '"'
            ]
        }
        json {
            source => "message"
        }
        mutate {
            convert => {
                "usdCnyRate" => "float"
                "futureIndex" => "float"
            }
        }
        date {
            match => [ "timestamp", "UNIX_MS" ]
            target => "logdate"
        }
    }
    output{
        stdout{
            codec=>rubydebug
        }
    }
    
    input {
      file {
        path => ["/data/test_logstash.log"]
        type => ["nginx_log"]
        start_position => "beginning"
      }
    }
    filter {
      if [type] =~ "nginx_log" {
        grok {
          match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip}  %{URIPATH:uri} %{GREEDYDATA:args}   %{IP:client_ip}  %{NUMBER:status}" }
        }
        urldecode{
        field =>args
        }
        kv {
        source =>"args"
        field_split =>"&"
        remove_field => [ "args","@timestamp","message","path","@version","path","host" ]
        }
        json {
            source => "yc_log"
            remove_field => [ "yc_log" ]
        }
        mutate {
          add_field => { "lg_value" => "%{lg_vl}" }
        }
        json {
            source => "lg_value"
            remove_field => [ "lg_vl","lg_value" ]
        }
      }
    }
    
    output {
      stdout { codec => rubydebug }
    }
    
    
    

    匹配失败

    对于logstash的过滤,如果在match里面指定的正则语句对输入的日志信息无法过滤,会默认的添加一个tag字段,并且赋值 _grokparsefailure的字段
    对于这种没有匹配成功的日志信息,可以选择丢弃

    对于nginx的正则匹配。匹配不到通过tag字段中的值进行判断移除
    filter {
        grok {
            match => {"message" => "%{IPV4:remote_addr} - %{DATA:nginx.access.user_name} [%{HTTPDATE:nginx.access.time}] "%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} "%{DATA:nginx.access.referrer}" "%{DATA:nginx.access.agent}" "%{DATA:nginx.access.other}""}
    }
        if "_grokparsefailure" in [tags] {
            drop {}
    }
    }
    
  • 相关阅读:
    知识收集
    代码片_笔记
    北理工软件学院2016程序设计方法与实践
    内存的初始化与清零问题
    LeetCode第七题
    KMP算法C代码
    在64位Linux上安装32位gmp大数库
    ASN1编码中的OID
    迷宫问题
    64位linux编译32位程序
  • 原文地址:https://www.cnblogs.com/cizao/p/12549451.html
Copyright © 2011-2022 走看看