zoukankan      html  css  js  c++  java
  • Logstash收集nginx日志之使用grok过滤插件解析日志

    官方使用说明:https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html


    Grok的过滤配置选项和通用选项

    grok支持下述的过滤配置选项

    选项类型是否为必须描述
    break_on_match 布尔型 默认值为true,只会匹配第一个符合匹配条件的值,如果需要匹配多个值,则需要设置为false
    keep_empty_captures 布尔型 默认值为false,如果为true,则保留空字段为事件字段
    match 哈希型 意思为匹配一个字段的哈希值,单一字段可设置匹配多个匹配模式
    named_captures_only 布尔型 默认值为true,意味着只保存从grok中获取的名称
    overwrite 数组 此选项用于复写字段中的值
    pattern_definitions 哈希型 定义被当前过滤器所使用的自动模式的名称和元组名称,如果命名的名称已存在,则会覆盖此前配置
    patterns_dir 数组 指定用于保存定义好的匹配模式的文件目录
    patterns_files_glob 字符串 用于在patterns_dir指定的目录中过滤匹配的文件
    tag_on_failure 数组 默认值为_grokparsefailure,当匹配不成功时追加指定值到tags字段
    tag_on_timeout 字符串 默认值为_groktimeout,当grok正则表达式匹配超时追加的tag
    timeout_millis 数值 默认值为30000毫秒,当正则匹配运行超过指定的时间后,尝试终结此匹配操作。设置为0将关闭超时

    grok的通用选项:下述选项是被所有过滤插件都支持的通用选项

    选项类型是否为必须描述
    add_field 哈希型 如果此过滤选项匹配成功,则会向匹配的事件中添加指定的字段,字段名和内容可以调用相关的变量进行定义命名
    add_tag 数组 用于当过滤成功时,向匹配的事件中添加tag
    enable_metric 布尔型 默认值为true,默认情况下,启用或禁用此功能,能记录特定插件的相关度量值。
    id 字符串 添加一个唯一ID到指定的插件配置中,当有多个同一类型的插件时,可更好地去区别监控logstash
    periodic_flush 布尔型 默认值为false,可选项,用于在规定的间隔时间调用过滤器的刷新功能
    remove_field 数组 当此插件匹配成功时,从事件中移除指定的字段
    remove_tag 数组 当此插件匹配成功时,从事件中移除指定的tags

    grok作为一个logstash的过滤插件,支持根据模式解析文本日志行,拆成字段。

    • nginx日志的配置:
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';
    • logstash中grok的正则(添加在/usr/local/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns文件中)为:

    WZ ([^ ]*)
    NGINXACCESS %{IP:remote_ip} - - [%{HTTPDATE:timestamp}] "%{WORD:method} %{WZ:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:bytes} %{QS:referer} %{QS:agent} %{NUMBER:elapsed} %{NUMBER:serverelapsed} %{QS:xforward}

    推荐一个正则表达式在线自动生成器  http://www.txt2re.com  http://txt2re.com ,用于自定义匹配时的变量引用

    logstash的配置为:

    input{
      file{
        path => "/usr/local/nginx/logs/access.log"
        type => "nginx"
        start_position => "beginning"
      }
    }
    
    filter {  
        grok {  
          match => { "message" => "%{NGINXACCESS}" }
        }  
    } 
    output{
      if [type] == "nginx" {
        elasticsearch {
        hosts=> ["172.17.102.202:9200"]
        index=> "nginx"
        }
      }
    }

    多个Input时,采用条件判断:

    input{
      file{
        path => "/usr/local/nginx/logs/hottopic-access.log"
        type => "hottopic"
        start_position => "beginning"
      }
      file{
        path => "/usr/local/nginx/logs/foodie-access.log"
        type => "foodie"
        start_position => "beginning"
      }
      file{
        path => "/usr/local/nginx/logs/ac-access.log"
        type => "ac"
        start_position => "beginning"
      }
    }
    
    filter {
        grok {
          match => { "message" => "%{MYNGINX}" }
        }
        mutate {
          convert => [ "elapsed", "float" ]
          convert => [ "serverelapsed", "float" ]
        }
    }
    
    output{
      if [type] == "hottopic" {
        elasticsearch {
        hosts=> ["172.17.213.60:9200"]
        index=> "nginx-hottopic-api"
        }
      }
      if [type] == "foodie" {
        elasticsearch {
        hosts=> ["172.17.213.60:9200"]
        index=> "nginx-foodie-api"
        }
      }
      if [type] == "ac" {
        elasticsearch {
        hosts=> ["172.17.213.60:9200"]
        index=> "nginx-ac-api"
        }
      }
    }
    View Code

    添加和移除指定的字段,在grok{ }范围处进行配置

    input {
            beats {
                    port => 5044
                    type => "nginx"
            }
    }
    
    filter {
            if [type] == "nginx" {
            grok {
                    match => { "message" => ["(?<RemoteIP>(d*.d*.d*.d*)) - %{DATA:[nginx][access][user_name]} [%{HTTPDATE:[nginx][access][time]}] "%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} "%{DATA:[nginx][access][referrer]}" "%{DATA:[nginx][access][agent]}""] }
            add_field => {
                    "Device" => "Charles Desktop"
            }
            remove_field => [ "message","beta.version","beta.name" ]
                    }
            }
    }
    
    output {
            if [type] == "nginx" {
                    elasticsearch {
                            hosts => "10.10.10.6:9200"
                            index => "logstash-testlog"
            }       }
    }

    排除字段

    字段的排除需要在filter中进行操作,使用一个叫做 mutate 的工具,具体操作如下

    //比如我们可能需要避免日志中kafka的一些字段占用宝贵的磁盘空间。
    filter {
        mutate {
            remove_field => ["kafka"]
        }
    }

    排除整条相关数据

    比如apache日志中状态为200 的监控服务器不关心的,这里用到drop工具

    filter {
        if [status] == "200" { //不知道这边的多个判断的语法要如何写
            drop{}
        }
    }
  • 相关阅读:
    hdu 6702 ^&^ 位运算
    hdu 6709 Fishing Master 贪心
    hdu 6704 K-th occurrence 二分 ST表 后缀数组 主席树
    hdu 1423 Greatest Common Increasing Subsequence 最长公共上升子序列 LCIS
    hdu 5909 Tree Cutting FWT
    luogu P1588 丢失的牛 宽搜
    luogu P1003 铺地毯
    luogu P1104 生日
    luogu P1094 纪念品分组
    luogu P1093 奖学金
  • 原文地址:https://www.cnblogs.com/wjoyxt/p/9172370.html
Copyright © 2011-2022 走看看