zoukankan      html  css  js  c++  java
  • Logstash——核心解析插件Grok

     

    前言

    通常来说,各种日志的格式都比较灵活复杂比如nginx访问日志或者并不纯粹是一行一事件比如java异常堆栈,而且还不一定对大部分开发或者运维那么友好,所以如果可以在最终展现前对日志进行解析并归类到各个字段中,可用性会提升很多。

    grok过滤器插件就是用来完成这个功能的。默认可用。

    1. grok的主要选项是match和overwrite,前者用来解析message到相应字段,后者用来重写message,这样原始message就可以被覆盖,对于很多的日志来说,原始的message重复存储一份没有意义。
    2. 虽然Grok过滤器可以用来进行格式化,但是对于多行事件来说,并不适合在filter或者input(multiline codec,如果希望在logstash中处理多行事件,可以参考https://www.elastic.co/guide/en/logstash/current/multiline.html)中处理,因为使用ELK的平台通常日志使用beats input插件,此时在logstash中进行多行事件的处理会导致数据流混乱,所以需要在事件发送到logstash之前就处理好,也就是应该在filebeat中预处理。

    grok支持正则表达式.    正则表达式的验证.

    grok 截取过滤

    grok正则表达式:(?<temMsg>(.*)(?=Report)/?) 获取Report之前的字符
    grok正则表达式:(?<temMsg>(?=Report)(.*)/?) 获取Report之后的字符
    grok{
            match => { 
                     #截取<Report>之前的字符作为temMsg字段的值
                    "message" => "(?<temMsg>(.*)(?=Report)/?)" 
                }
        }
    这个是截取特定的字符集日志,要日志中包含了【Report】关键字
    (注:表达式中(?=Report)中的等于【=】符号如果换成【<=】这表示就不包含本身了,例如(?<temMsg>(.*)(?=Report)/?)可以写成(?<temMsg>(.*)(?<=Report)/?)这样输出的结果就不包含Report了,同理下面的一样)
    grok正则表达式:(?<temMsg>(?<=report).*?(?=msg)) 截取report和msg之间的值 不包含report和msg本身
    grok正则表达式:(?<temMsg>(report).*?(?=msg)) 截取 包含report但不包含msg
    grok正则表达式:(?<temMsg>(?<=report).*?(msg))截取  不包含report但包含msg
    grok正则表达式:(?<temMsg>(report).*?(msg|request))输出以report开头,以msg或者以request结尾的所有包含头尾信息
    grok正则表达式:(?<temMsg>(report).*?(?=(msg|request)))输出以report开头,以msg或者以request结尾的不包含头尾信息
    grok{
            match => { 
                     #截取<Report>之后的和<msg>之前的值作为temMsg字段的值
                    "message" => "(?<temMsg>(?<=report).*?(?=msg))" 
                }
        }
    这个是截取特定的字符集日志,要日志中包含了【report和msg和request】关键字
    之间的表达式只要替换一下就可以使用了
    (注过个表达式中出现异常,在单个的字符串中可以将小括号【()】去掉,例如:(report).*?(?=msg) 可以写成report.*?(?=msg))
    grok正则表达式:(?<MYELF>([sS]{500}))
     grok{
           match => {
                  #截取日志500个字符 作为MYELF的值
                  "message" => "(?<MYELF>([sS]{500}))"  
                 }
         }
        对有所日志截取500个字符,可以加入if()做为判断条件,根据自身项目来
    grok正则表达式:%{LOGLEVEL:level}
    grok {
            #这个patterns_dir大家都应该正对 单独写表达式的地方
            #patterns_dir => "/usr/local/nlp/logstash-6.0.1/config/patterns"
                    match => [
                            "message","%{LOGLEVEL:level}"         
                    ]
            }
      这个比较简单 就不多说了
    结合上面的 这个是对level级别的日志做判断 如果日志中含有DEBUG的,就drop掉
    if [level] == "DEBUG" {
           drop { }
    }
    这个其实和上面差不多,加了一个【~】表示对单条的前后日志做匹配
    if[message]=~"ASPECT"{
           drop { }
    }
    这个是说对temMsg赋值的所有的日志从新命名打印message
    mutate {
            #重命名字段temMsg为message
            rename => {"temMsg" => "message"} 
    }
    #logstash过滤器切割
    filter {                                      
        if [type] == "simple" {
            mutate{
                     split => ["message","|"]     #按 | 进行split切割message
                            add_field =>   {
                                    "requestId" => "%{[message][0]}"
                            }
                            add_field =>   {
                                    "timeCost" => "%{[message][1]}"
                            }
                            add_field =>   {
                                    "responseStatus" => "%{[message][2]}"
                            }
                            add_field =>   {
                                    "channelCode" => "%{[message][3]}"
                            }
                            add_field =>   {
                                    "transCode" => "%{[message][4]}"
                            }
            }
            mutate {
                convert => ["timeCost", "integer"]  #修改timeCost字段类型为整型
            }
        } else if [type] == "detail" {
            grok{
                match => {             
                    #将message里面 TJParam后面的内容,分隔并新增为ES字段和值
                    "message" => ".*TJParam %{PROG:requestId} %{PROG:channelCode} %{PROG:transCode}"
                }
            }
            grok{
                match => { 
                     #截取TJParam之前的字符作为temMsg字段的值
                    "message" => "(?<temMsg>(.*)(?=TJParam)/?)" 
                    #删除字段message
                    remove_field => ["message"]             
                }
            }
            mutate {
                 #重命名字段temMsg为message
                rename => {"temMsg" => "message"}            
            }
        }
    }

    过滤截取完整例子:

    input {
        redis {
            data_type => "list"
            host => "localhost1"
            port => "5100"
            key => "nlp_log_file"
            db => 0                                         
            threads => 1                                    #线程数量
            codec => "json"
        }
        redis {
            data_type => "list"
            host => "localhost2"
            port => "5101"
            key => "nlp_log_file"
            db => 0                                     
            threads => 1                                    #线程数量
            codec => "json"
        }
    }
    
    filter {
           grok {
            #patterns_dir => "/usr/local/nlp/logstash-6.0.1/config/patterns"
                    match => [
                            "message","%{LOGLEVEL:level}"         
                    ]
            }
        grok{
            match => { 
                    #截取<ReportPdf>之前的字符作为temMsg字段的值
                    "message" => "(?<temMsg>(.*)(?=<ReportPdf>)/?)"  
                }
        }
        mutate {
             #重命名字段temMsg为message
            rename => {"temMsg" => "message"}            
        }
            if [level] == "DEBUG" {
                    drop { }
            }
            if[message]=~"ASPECT"{
                    drop { }
            }
         #获取日志文件带RAWT关键字的
        if[message]=~"[RAW]"{   
                 grok{
                    match => {
                             #截取带RAW关键字的日志500个字符 作为MYELF的值
                            "message" => "(?<MYELF>([sS]{500}))"  
                            }
                    }
                     mutate {
                    rename => {"MYELF" => "message"} #重命名字段MYELF为message
                     }
            }
    }
    
    output {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "logstash-%{+YYYY.MM.dd}"  
            action => "index"
            template_overwrite => true
            #user => "elastic"
             #password => "admins-1"
        }
        stdout{codec => dots}
    }

    根据不同情况进行不同的匹配原则

    filter{
        if "start" in [message]{     --message就是指原始消息
            grok{
                match => xxxxxxxxx
            }
        }else if "complete" in [message]{
            grok{
                xxxxxxxxxx
            }
        }else{
            grok{
                xxxxxxx
            }
        }
    }

     

    多项匹配

    filter {
        grok {
             match => [
                "message" , "%{DATA:hostname}|%{DATA:tag}|%{DATA:types}|%{DATA:uid}|%{GREEDYDATA:msg}",
                "message" , "%{DATA:hostname}|%{DATA:tag}|%{GREEDYDATA:msg}"
             ]
            remove_field => ['type','_id','input_type','tags','message','beat','offset']
        }
    }

    正则匹配

    太多使用DATA和GREEDYDAYA会导致性能cpu负载严重。建议多使用正则匹配,或者ruby代码块。

    filter {
         grok {
            match => [
                   "message", "(?<hostname>[a-zA-Z0-9._-]+)|%{DATA:tag}|%{NUMBER:types}|(?<uid>[0-9]+)|%{GREEDYDATA:msg}",
                   "message", "(?<hostname>[a-zA-Z0-9._-]+)|%{DATA:tag}|%{GREEDYDATA:msg}",
            ]
           remove_field => ['type','_id','input_type','tags','message','beat','offset']
        }
    }

    ruby代码块匹配

    太多使用DATA和GREEDYDAYA会导致性能cpu负载严重。建议多使用正则匹配,或者ruby代码块。

    filter {
        ruby {
            code =>'
            arr = event["message"].split("|")
            if arr.length == 5
                event["hostname"] = arr[0]
                event["tag"] = arr[1]
                event["types"] = arr[2]
                event["uid"] = arr[3]
                event["msg"] = arr[4]
            elsif arr.length == 3
                event["hostname"] = arr[0]
                event["tag"] = arr[1]
                event["msg"] = arr[2]
            end'
           remove_field => ['type','_id','input_type','tags','message','beat','offset']
        }
    }

    本人完整例子

    input {
            kafka {
                    bootstrap_servers => "172.xxx.xxx.91:9092,172.16.10.92:9092,172.xxx.xxx.93:9092"
                    topics => ["logstash-log"]
                    consumer_threads => 1
                    decorate_events => true
                    codec => json
            }
    }
    
    filter {
            if [message]=~"ERROR" {
                    # 截取日志级别为ERROR的日志2000个字符作为ERRORMSG的值(因为包含了堆栈信息内容会很长,导致下面[logBegin][logEnd]的正则匹配很慢,然后超时).
                    grok {
                            match => {
                                     "message" => "(?<ERRORMSG>([sS]{0,2000}))"
                            }
                    }
                    #重命名字段ERRORMSG为message,给下面的正则使用
                    mutate {
                            rename => {"ERRORMSG" => "message"}
                    }
            }
    
            grok {
                    match => [
                            "message", "s*%{TIMESTAMP_ISO8601:logTimestamp} [%{DATA:threadName}s*] [%{LOGLEVEL:logLevel}s*] [%{DATA:methodName}s*]s+MessageTree=+(?<traceMsg>(S+)).*",
                            "message", "s*%{TIMESTAMP_ISO8601:logTimestamp} [%{DATA:threadName}s*] [%{LOGLEVEL:logLevel}s*] [%{DATA:methodName}s*]s+warningMessage=+(?<warningId>(S+)).*&+(?<warningMsg>([sS]*})).*",
                            "message", "s*%{TIMESTAMP_ISO8601:logTimestamp} [%{DATA:threadName}s*] [%{LOGLEVEL:logLevel}s*] [%{DATA:methodName}s*]s*(?<logInfo>([sS]*))",
                            "message", "s*(?<logInfo>([sS]*))"
                    ]
                    remove_tag => ["beats_input_codec_plain_applied"]
                    remove_field  => ["message","prospector"]
            }
    
            date {
                    match => ["logTimestamp", "ISO8601"]
                    target => "logTimestamp"
            }
    
    
            #if [traceId] =~ /d/ or [warningId] =~ /[0-9a-z_A-Z_]/ {
            #       mutate {
            #               replace => {"logInfo" => "%{message}"}
            #       }
            #}
    
    }
    
    output {
            if [traceMsg] =~ /S/ {
                    kafka {
                            bootstrap_servers => "172.16.xxx.xxx:9092,172.xxx.xxx.92:9092,172.xxx.xxx.93:9092"
                            topic_id => "logstash-trace"
                            retries => 1
                            compression_type => "snappy"
                            codec => plain{
                                    format => "%{traceMsg}"
                                    charset => "UTF-8"
                            }
                    }
            }
    
            if [warningId] =~ /[0-9a-z_A-Z_]/ {
                    kafka {
                            bootstrap_servers => "172.16.xxx.xxx:9092,172.16.xxx.xxx:9092,172.xxx.xxx.93:9092"
                            topic_id => "warning-topic"
                            retries => 1
                            compression_type => "snappy"
                            codec => plain{
                                    format => "%{warningMsg}"
                                    charset => "UTF-8"
                            }
                    }
            }
    
            if [traceId] =~ /d/ {
                    elasticsearch{
                            hosts => ["10.xxx.xxx.100:9200", "10.xxx.xxx.101:9200", "10.xxx.xxx.102:9200"]
                            index => "%{[fields][product_type]}-logs-transaction-%{+YYYY-MM}"
                            manage_template => false
                            template_name => "business_logs_template"
                    }
    
                    #stdout {
                    #     codec => rubydebug
                    #}
            }
    
            #stdout {
            #     codec => rubydebug
            #}
    
            if "sas" in [tags]{
                    elasticsearch{
                            hosts => ["10.xxx.xxx.100:9200", "10.xxx.xxx.101:9200", "10.xxx.xxx.102:9200"]
                            index => "%{[fields][product_type]}-logs-%{+YYYY-MM-dd}"
                            manage_template => false
                            template_name => "business_logs_template"
                    }
                    #stdout {
                    #       codec => rubydebug
                    #}
            }
    }

    参考:

    https://www.cnblogs.com/JetpropelledSnake/p/9893560.html

    https://blog.csdn.net/cai750415222/article/details/86614854

    https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html  —— Logstash 最佳实践

  • 相关阅读:
    javaweb学习总结(二十九)——EL表达式
    javaweb学习总结(二十八)——JSTL标签库之核心标签
    javaweb学习总结(二十七)——jsp简单标签开发案例和打包
    在Servlet使用getServletContext()获取ServletContext对象出现java.lang.NullPointerException(空指针)异常的解决办法
    javaweb学习总结(二十六)——jsp简单标签标签库开发(二)
    javaweb学习总结(二十五)——jsp简单标签开发(一)
    javaweb学习总结(二十四)——jsp传统标签开发
    javaweb学习总结(二十三)——jsp自定义标签开发入门
    javaweb学习总结(二十二)——基于Servlet+JSP+JavaBean开发模式的用户登录注册
    javaweb学习总结(二十一)——JavaWeb的两种开发模式
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/12579498.html
Copyright © 2011-2022 走看看