zoukankan      html  css  js  c++  java
  • Logstash收集nginx日志之使用grok过滤插件解析日志

    官方使用说明:https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html


    Grok的过滤配置选项和通用选项

    grok支持下述的过滤配置选项

    选项类型是否为必须描述
    break_on_match 布尔型 默认值为true,只会匹配第一个符合匹配条件的值,如果需要匹配多个值,则需要设置为false
    keep_empty_captures 布尔型 默认值为false,如果为true,则保留空字段为事件字段
    match 哈希型 意思为匹配一个字段的哈希值,单一字段可设置匹配多个匹配模式
    named_captures_only 布尔型 默认值为true,意味着只保存从grok中获取的名称
    overwrite 数组 此选项用于复写字段中的值
    pattern_definitions 哈希型 定义被当前过滤器所使用的自动模式的名称和元组名称,如果命名的名称已存在,则会覆盖此前配置
    patterns_dir 数组 指定用于保存定义好的匹配模式的文件目录
    patterns_files_glob 字符串 用于在patterns_dir指定的目录中过滤匹配的文件
    tag_on_failure 数组 默认值为_grokparsefailure,当匹配不成功时追加指定值到tags字段
    tag_on_timeout 字符串 默认值为_groktimeout,当grok正则表达式匹配超时追加的tag
    timeout_millis 数值 默认值为30000毫秒,当正则匹配运行超过指定的时间后,尝试终结此匹配操作。设置为0将关闭超时

    grok的通用选项:下述选项是被所有过滤插件都支持的通用选项

    选项类型是否为必须描述
    add_field 哈希型 如果此过滤选项匹配成功,则会向匹配的事件中添加指定的字段,字段名和内容可以调用相关的变量进行定义命名
    add_tag 数组 用于当过滤成功时,向匹配的事件中添加tag
    enable_metric 布尔型 默认值为true,默认情况下,启用或禁用此功能,能记录特定插件的相关度量值。
    id 字符串 添加一个唯一ID到指定的插件配置中,当有多个同一类型的插件时,可更好地去区别监控logstash
    periodic_flush 布尔型 默认值为false,可选项,用于在规定的间隔时间调用过滤器的刷新功能
    remove_field 数组 当此插件匹配成功时,从事件中移除指定的字段
    remove_tag 数组 当此插件匹配成功时,从事件中移除指定的tags

    grok作为一个logstash的过滤插件,支持根据模式解析文本日志行,拆成字段。

    • nginx日志的配置:
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';
    • logstash中grok的正则(添加在/usr/local/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns文件中)为:

    WZ ([^ ]*)
    NGINXACCESS %{IP:remote_ip} - - [%{HTTPDATE:timestamp}] "%{WORD:method} %{WZ:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:bytes} %{QS:referer} %{QS:agent} %{NUMBER:elapsed} %{NUMBER:serverelapsed} %{QS:xforward}

    推荐一个正则表达式在线自动生成器  http://www.txt2re.com  http://txt2re.com ,用于自定义匹配时的变量引用

    logstash的配置为:

    input{
      file{
        path => "/usr/local/nginx/logs/access.log"
        type => "nginx"
        start_position => "beginning"
      }
    }
    
    filter {  
        grok {  
          match => { "message" => "%{NGINXACCESS}" }
        }  
    } 
    output{
      if [type] == "nginx" {
        elasticsearch {
        hosts=> ["172.17.102.202:9200"]
        index=> "nginx"
        }
      }
    }

    多个Input时,采用条件判断:

    input{
      file{
        path => "/usr/local/nginx/logs/hottopic-access.log"
        type => "hottopic"
        start_position => "beginning"
      }
      file{
        path => "/usr/local/nginx/logs/foodie-access.log"
        type => "foodie"
        start_position => "beginning"
      }
      file{
        path => "/usr/local/nginx/logs/ac-access.log"
        type => "ac"
        start_position => "beginning"
      }
    }
    
    filter {
        grok {
          match => { "message" => "%{MYNGINX}" }
        }
        mutate {
          convert => [ "elapsed", "float" ]
          convert => [ "serverelapsed", "float" ]
        }
    }
    
    output{
      if [type] == "hottopic" {
        elasticsearch {
        hosts=> ["172.17.213.60:9200"]
        index=> "nginx-hottopic-api"
        }
      }
      if [type] == "foodie" {
        elasticsearch {
        hosts=> ["172.17.213.60:9200"]
        index=> "nginx-foodie-api"
        }
      }
      if [type] == "ac" {
        elasticsearch {
        hosts=> ["172.17.213.60:9200"]
        index=> "nginx-ac-api"
        }
      }
    }
    View Code

    添加和移除指定的字段,在grok{ }范围处进行配置

    input {
            beats {
                    port => 5044
                    type => "nginx"
            }
    }
    
    filter {
            if [type] == "nginx" {
            grok {
                    match => { "message" => ["(?<RemoteIP>(d*.d*.d*.d*)) - %{DATA:[nginx][access][user_name]} [%{HTTPDATE:[nginx][access][time]}] "%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} "%{DATA:[nginx][access][referrer]}" "%{DATA:[nginx][access][agent]}""] }
            add_field => {
                    "Device" => "Charles Desktop"
            }
            remove_field => [ "message","beta.version","beta.name" ]
                    }
            }
    }
    
    output {
            if [type] == "nginx" {
                    elasticsearch {
                            hosts => "10.10.10.6:9200"
                            index => "logstash-testlog"
            }       }
    }

    排除字段

    字段的排除需要在filter中进行操作,使用一个叫做 mutate 的工具,具体操作如下

    //比如我们可能需要避免日志中kafka的一些字段占用宝贵的磁盘空间。
    filter {
        mutate {
            remove_field => ["kafka"]
        }
    }

    排除整条相关数据

    比如apache日志中状态为200 的监控服务器不关心的,这里用到drop工具

    filter {
        if [status] == "200" { //不知道这边的多个判断的语法要如何写
            drop{}
        }
    }
  • 相关阅读:
    Leetcode-Minimum Depth of Binary Tree
    Leetcode-Path Sum II
    Leetcode-Path Sum
    Leetcode-Flatten Binary Tree to Linked List
    Leetcode-Populating Next Right Pointer in Binary Tree II
    Leetcode-Pascal's Triangle II
    Leetcode-Pascal's Triangle
    Leetcode-Triangle
    第10月第20天 afnetwork like MKNetworkEngine http post
    第10月第13天 xcode ipa
  • 原文地址:https://www.cnblogs.com/wjoyxt/p/9172370.html
Copyright © 2011-2022 走看看