zoukankan      html  css  js  c++  java
  • logstash数据处理及格式化功能详解

    Grok正则提取日志

    环境延续我上一篇ELK单机版的filebeat-->redis-->logstash-->elasticsearch-->kibana环境,详情请参考:

     Elasticsearch + Logstash + Kibana +Redis +Filebeat 单机版日志收集环境搭建

    正则表达式

    普通正则表达式

    . 任意一个字符

    * 前面一个字符出现0次或者多次

    [abc] 中括号内任意一个字符

    [^abc] 非中括号内的字符

    [0-9] 表示一个数字

    [a-z]   小写字母

    [A-Z] 大写字母

    [a-zA-Z] 所有字母

    [a-zA-Z0-9] 所有字母+数字

    [^0-9] 非数字

    ^xx xx开头

    xx$ xx结尾

    d 任何一个数字

    s 任何一个空白字符

    扩展正则表达式,在普通正则符号再进行了扩展

    ? 前面字符出现0或者1

    + 前面字符出现1或者多次

    {n} 前面字符匹配n

    {a,b} 前面字符匹配ab

    {,b} 前面字符匹配0次到b

    {a,} 前面字符匹配aa+

    (string1|string2) string1string2

    在Kibana的grokdebugger上进行测试

    在编写grok提取正则配置前可以在Kibana的grokdebugger上进行测试:

    比如我想提取一个如下的Nginx日志:

     192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"

    那么可以按照正则表达式和grok语法在grokdebugger进行如下测试:

    可以看到我的Grok成功提取了我想要的内容,我的Grok匹配规则如下:

    (?<clientip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}) - - [(?<requesttime>[^ ]+ +[0-9]+)] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/d.d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"

    (?<字段名>正则)表示将匹配的内容提取为字段,其他不用提取为字段的地方原样写上或者用正则匹配即可。

    在配置文件中引入Grok提取规则

    vim ./logstash_grok.conf
    # logstash_grok.conf内容
    input {
       redis {
            host => '192.168.1.4'
            port => 6379
            key => "queue"
            data_type => "list"
      }
    }
    filter {
        grok {
            match => {
                "message" => '(?<clientip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}) - - [(?<requesttime>[^ ]+ +[0-9]+)] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/d.d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
            }
        }
    }
    output {
      elasticsearch {
        hosts => ["http://192.168.1.4:9200"]
      }
    }

    启动logstah:

    /opt/es/logstash-7.2.0/bin/logstash -f ./logstash_grok.conf 

    测试是否成功:

    echo '192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"' >> example.log

    在Kinaba中配置index,方便查看:

    点击Management-->Kibana-->Index patterns会出现如下提示:

     点击Create Index Pattern按照如下步骤创建新的Kibana索引:

    输入索引匹配规则,如logstash*会匹配所有logstash开头的ES索引数据,然后点击下一步选择时间字段用于按时间戳筛选数据:

    选择@timestamp将会根据@timestamp字段的时间过滤数据,点击Create index pattern即可在Discover页面选择你创建的Kibana索引来查看数据:

    可以看到我配置的Kibana起作用了。这里要注意的是Kibana采用的是UTC时间,比东八区时间要快8个小时,所以在筛选时间范围的时候我将范围往后扩大了一点,否则可能看不到数据。

    展开最上方的记录可以看到我刚才插入的数据被成功提取了:

    去除字段

     通过上面的图可以看到Grok配置的确起作用了,但是Logstash给我们添加了很多字段,有时候这些字段是不必要的,这时候就要可以使用remove_field配置来去除一些字段。

    修改配置文件如下:

    # logstash_grok.conf内容
    input {
       redis {
            host => '192.168.1.4'
            port => 6379
            key => "queue"
            data_type => "list"
      }
    }
    filter {
        grok {
            match => {
                "message" => '(?<clientip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}) - - [(?<requesttime>[^ ]+ +[0-9]+)] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/d.d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
            }
            remove_field => ["message","@version","path"]
        }
    }
    output {
      elasticsearch {
        hosts => ["http://192.168.1.4:9200"]
      }
    }

    重启重复上述测试,数秒后查看Kibana:

    Logstash确实为我们去除了我们不需要的字段。

    使用日志时间而非插入时间

    由于日志输出和消息队列同步以及ELK中的数据传输都是需要时间的,使用ES自动生成的时间戳可能和日志产生的时间并不一致,而且由于时区问题会产生更大的影响,所以需要自定义时间字段,而非使用自动生成的时间字段。

    更改配置文件如下:

    # logstash_grok.conf内容
    input {
       redis {
            host => '192.168.1.4'
            port => 6379
            key => "queue"
            data_type => "list"
      }
    }
    filter {
        grok {
            match => {
                "message" => '(?<clientip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}) - - [(?<requesttime>[^ ]+ +[0-9]+)] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/d.d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
            }
            remove_field => ["message","@version","path"]
        }
    date {
            match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
        } } output { elasticsearch { hosts
    => ["http://192.168.1.4:9200"] } }

     重复上述测试,扩大时间范围只2018年某月可以看到有一条2019年2月的数据:

    提取json格式的数据

    Grok对于提取非结构化的数据是很方便的,但是对于json格式的数据如果还用Grok来提取未免也太麻烦了点,毕竟采用json这种半结构化数据来输出日志本来就是为了方便处理。还好Logstash早就考虑到了这点,并提供了json格式数据的提取规则。

    修改配置文件以提取json数据:

    # logstash_json.conf
    input {
       redis {
            host => '192.168.1.4'
            port => 6379
            key => "queue"
            data_type => "list"
      }
    }
    filter {
      json {
        source => "message"
        remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
      }
      date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
        }
    }
    output {
      elasticsearch {
        hosts => ["http://192.168.1.4:9200"]
      }
    }

    测试是否配置成功:

    logstash_json.conf启动Logstash:

     /opt/es/logstash-7.2.0/bin/logstash -f ./logstash_json.conf

    向example.log追加一条json数据:

    echo '{"@timestamp":"24/Feb/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"}' >> example.log

    数秒后在kibana查看数据:

    可以看到我们的配置是生效的。

    filebeat监视多个日志

    修改filebeat配置文件:

    vim ./filebeat-7.2.0-linux-x86_64/filebeat.yml
    # filebeat.yml内容
    filebeat.inputs:
    - type: log
      tail_files: true
      backoff: "1s"
      paths:
          - /opt/es/example.log
      fields:
        type: example
      fields_under_root: true
    - type: log
      tail_files: true
      backoff: "1s"
      paths:
          - /opt/es/example2.log
      fields:
        type: example2
      fields_under_root: true
    output:
      redis:
        hosts: ["192.168.1.4:6379"]
        key: 'queue'

    修改logstash配置文件:

    vim ./logstash-7.2.0/config/logstash_muti.conf
    # logstash_muti.conf内容
    input {
       redis {
            host => '192.168.1.4'
            port => 6379
            key => "queue"
            data_type => "list"
      }
    }
    filter {
      if [type] == "example" {
      json {
        source => "message"
        remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
      }
      date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
        }
     }else if [type] == "example2"{
            grok {
            match => {
                "message" => '(?<clientip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}) - - [(?<timestamp>[^ ]+ +[0-9]+)] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/d.d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
            }
            remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
        }
        date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
            }
       }
    }
    output {
      elasticsearch {
        hosts => ["http://192.168.1.4:9200"]
      }
    }

    以新的配置启动filebeat和logstash:

    # 以logstash_muti.conf启动logstash
    /opt/es/logstash-7.2.0/bin/logstash -f ./logstash_muti.conf
    # 以filebeat.yml启动filebeat
    ./filebeat-7.2.0-linux-x86_64/filebeat -e -c ./filebeat-7.2.0-linux-x86_64/filebeat.yml 

    新建一个example2.log文件,路径要和filebeat配置的路径一致,然后在终端执行以下命令进行测试:

    # 测试example.log收集情况
    echo '{"timestamp":"11/Apr/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"}' >> example.log
    # 测试example2.log收集情况
    echo '192.168.237.1 - - [24/Apr/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"' >>example2.log

    数秒后查看kibana数据:

    example.log输入产生的数据:

    example2.log测试产生的数据:

    可以看到我们的配置是成功的。

    这里我们把两个日志都输出到了同一个索引下,生产中一般都会加以区分的,一般是每个日志对应一个索引,要实现这种效果可以在logstash的配置文件的output中进行如下配置:

    output{
      if [type] == "example" {
        elasticsearch {
          hosts => ["http://192.168.1.4:9200"]
          index => "example-%{+YYYY.MM.dd}"
        }
      }
      else if [type] == "example2" {
        elasticsearch {
          hosts => ["http://192.168.1.4:9200"]
          index => "example2-%{+YYYY.MM.dd}"
        }
      }
  • 相关阅读:
    php 正则获取字符串中的汉字(去除字符串中除汉字外的所有字符)
    发送短信倒计时效果实现
    《JavaScript模式》第2章 基本技巧
    《JavaScript模式》第1章 简介
    点击元素,只有它的背景变色
    笔试题之优化代码
    jQuery实现一个全选复选框联动效果
    《深入浅出Node.js》第3章 异步I/O
    《深入浅出Node.js》第2章 模块机制
    《深入浅出Node.js》第1章 Node简介
  • 原文地址:https://www.cnblogs.com/yanshaoshuai/p/11386442.html
Copyright © 2011-2022 走看看