zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Logstash(3)应用之file解析(grok/ruby/kv)

    从nginx日志中进行url解析

    /v1/test?param2=v2&param3=v3&time=2019-03-18%2017%3A34%3A14
    ->
    {'param1':'v1','param2':'v2','param3':'v3','time':'2019-03-18 17:34:14'}

    nginx日志示例:

    1.119.132.168 - - [18/Mar/2019:09:13:50 +0000] "POST /param1/test?param2=1&param3=2&time=2019-03-18%2017%3A34%3A14 HTTP/1.1" 200 929 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "-"

    1 使用grok

    input {

          file {

            path => [ "/var/log/nginx/access.log" ]
            start_position => "beginning"
        }
    }
    filter {
      if [message] =~ /test/ {
        grok {
            match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) [%{HTTPDATE:access_time_raw}] "(?:%{WORD:verb} (/%{PARAMVALUE:param1}/test?param2=%{PARAMVALUE:param2}&param3=%{PARAMVALUE:param3}&time=%{PARAMVALUE:send_time_raw})(?: HTTP/%{NUMBER:http_version})?|-)" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{QS:x_forward_for}" }
            pattern_definitions => { "PARAMVALUE" => "[^& ]*" }
        }
        urldecode {
            all_fields => true 
        }
        date {
            match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "access_time_tmp"
    } ruby { code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s) event.set('send_time', event.get('access_time'))" } if [send_time_raw] { date { match => [ "send_time_raw","yyyy-MM-dd HH:mm:ss"] target => "send_time_tmp"
    timezone => "UTC"
    } ruby { code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)" } } mutate { remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "send_time_raw", "send_time_tmp"] } } else { drop {} } } output { if [param1] and [param2] and [param3] and "_grokparsefailure" not in [tags] { stdout {codec => json} } }

    注意:
    1)对url的参数名和位置硬编码,不灵活
    2)使用自定义pattern:PARAMVALUE
    3)一定要使用urldecode,否则time得到的value为2019-03-18%2017%3A34%3A14,logstash中date插件使用joda解析pattern会报错,因为含有字母A;
    4)如果time为空,则使用access_time;
    5)不匹配的记录drop掉;
    6)只有满足条件的记录才会被output;
    7)在filter和output中使用if-else定义分支;
    8)date插件要注意timezone,否则会按照时区偏移;

    2 使用grok+ruby

    
    

      input {
        file {
          path => [ "/var/log/nginx/access.log" ]
          start_position => "beginning"
        }
      }

    
    filter {
      if [message] =~ /test/ {
        grok {
            match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) [%{HTTPDATE:access_time_raw}] "(?:%{WORD:verb} (%{URIPATHPARAM:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }
        }
        urldecode {
            all_fields => true
        }
        date {
            match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "access_time_tmp"
    } ruby { code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s) event.set('send_time', event.get('access_time'))" } if [request] { ruby {
              init => "
              def convertName(name)
                  result = ''
                  name.each_char{|ch| result += (if ch < 'a' then '_' + ch.downcase else ch end)}
                  result
              end
              "
    code
    => " event.set('param1', event.get('request').split('?')[0].split('/')[1]) pairs = event.get('request').split('?')[1].split('&') pairs.each{ |item| arr=item.split('='); event.set(arr[0], arr[1])} " } if [time] { date { match => [ "time","yyyy-MM-dd HH:mm:ss"] target => "send_time_tmp"
    timezone => "UTC"
    } ruby { code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)" } } } mutate { remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "time", "send_time_tmp"] } } else { drop {} } } output { if [param1] and [param2] and [param3] and "_grokparsefailure" not in [tags] { stdout {codec => json} } }

    注意:
    1)直接使用默认的nginx日志的grok pattern;
    2)在ruby中直接按照key=value进行解析,更灵活;
    3)自定义函数;

    logstash的ruby代码中getter和setter必须使用代码,比如event.get('field'),不能使用event['field'],因为

    [2019-03-19T17:15:32,729][ERROR][logstash.filters.ruby ] Ruby exception occurred: Direct event field references (i.e. event['field'] = 'value') have been disabled in favor of using event get and set methods (e.g. event.set('field', 'value')). Please consult the Logstash 5.0 breaking changes documentation for more details.

    3 使用grek+kv

    input {
        file {
            path => [ "/data/tmp/access.log" ]
            start_position => "beginning"
        }
    }
    
    filter {
      if [message] =~ /dataone/u1/ {
        grok {
            match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) [%{HTTPDATE:access_time_raw}] "(?:%{WORD:verb} (%{URIPATHPARAM:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }
        }
        kv {
          source => "request"
          field_split => "&?"
          value_split => "="
        }
        urldecode {
            all_fields => true 
        }
        date {
            match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "access_time_tmp"
        }
        ruby {
            code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s)
                    event.set('send_time', event.get('access_time'))"
        }
        if [send_time_raw] {
          date {
              match => [ "send_time_raw","yyyy-MM-dd HH:mm:ss"]
              target => "send_time_tmp"
          }
          ruby {
              code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)"
          }
        }
        mutate {
            remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "send_time_raw", "send_time_tmp"]
        }
      } else {
        drop {}
      }
    }

    参考:https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html

  • 相关阅读:
    To select the file to upload we can use the standard HTML input control of type
    Cascading Menu Script using Javascript Explained
    网站首页head区代码规范
    轻松掌握 Java 泛型
    JDK 5.0 中的泛型类型学习
    如何在firefox下获取下列框选中option的text
    是同步方法还是 synchronized 代码? 详解多线程同步规则
    javascript select option对象总结
    Select的动态取值(Text,value),添加,删除。兼容IE,FireFox
    javascript在ie和firefox下的一些差异
  • 原文地址:https://www.cnblogs.com/barneywill/p/10559394.html
Copyright © 2011-2022 走看看